【问题标题】:Reading data from S3 using pyspark throws java.lang.NumberFormatException: For input string: "100M"使用 pyspark 从 S3 读取数据抛出 java.lang.NumberFormatException: For input string: "100M"
【发布时间】:2020-05-27 02:14:07
【问题描述】:

我正在使用以下代码从 S3 读取一些 json 数据:

df = spark_sql_context.read.json("s3a://test_bucket/test.json")
df.show()

以上代码抛出如下异常:

py4j.protocol.Py4JJavaError: An error occurred while calling o64.json.
: java.lang.NumberFormatException: For input string: "100M"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1538)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

我已经阅读了有关此主题的其他几篇 SO 帖子(例如 this onethis),并且已经完成了他们提到的所有操作,但似乎没有任何东西可以解决我的问题。

我正在使用spark-2.4.4-bin-without-hadoophadoop-3.1.2。至于 jar 文件,我有:

  • aws-java-sdk-bundle-1.11.199.jar
  • hadoop-aws-3.0.0.jar
  • hadoop-common-3.0.0.jar

另外,使用以下spark-submit 命令运行代码:

/opt/spark-2.4.4-bin-without-hadoop/bin/spark-submit 
--conf spark.app.name=read_json --master yarn --deploy-mode client --num-executors 2 
--executor-cores 2 --executor-memory 2G --driver-cores 2 --driver-memory 1G 
--jars /home/my_project/jars/aws-java-sdk-bundle-1.11.199.jar,
/home/my_project/jars/hadoop-aws-3.0.0.jar,/home/my_project/jars/hadoop-common-3.0.0.jar 
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.rpc.askTimeout=600s" /home/my_project/read_json.py

我可能在这里遗漏了什么?

【问题讨论】:

  • 这个错误似乎不言自明。 100M 是字符串,不是数字。
  • @Andrew 这是一个已知问题:issues.apache.org/jira/browse/HADOOP-13680 并且应该在我使用的 hadoop 版本中得到修复,这就是我首先问这个问题的原因

标签: apache-spark hadoop amazon-s3 pyspark


【解决方案1】:

从堆栈跟踪中,当它尝试读取配置选项之一时会引发错误,因此问题在于现在需要数字格式的默认配置选项之一。

在我将以下配置参数添加到spark-submit 命令后,错误得到解决:

--conf fs.s3a.multipart.size=104857600

Tuning S3A Uploads

【讨论】:

  • 在解析 .xml 文件(更准确地说是 core-default.xml)时会发生这种情况,与配置无关。无论如何我尝试了您的解决方案,但并没有解决问题。
  • @ahajib core-default.xml 是一个提供默认值的hadoop配置文件。其中一个参数包含“100M”,在您正在加载的 S3AFileSystem 类的版本中,它应该是数字。将其设为数字​​(使用 --conf 参数覆盖)或加载与您的 EMR 集群匹配的版本。
  • 通过spark-defaults.conf(或命令行上的--conf)设置Hadoop 属性通常需要spark.hadoop. 前缀,否则prop 不会推送到Hadoop 库(通常被忽略火花本身)。你应该试试--conf spark.hadoop.fs.s3a.multipart.size=104857600
【解决方案2】:

我将发布我最终为解决此问题所做的工作,以供任何可能看到相同异常的人使用:

我在 hadoop-env.sh 中添加了 hadoop-awsHADOOP_OPTIONAL_TOOLS。我还删除了 spark 中 s3a 的所有配置,除了访问/秘密,一切正常。更改前我的代码:

# Setup the Spark Process
conf = SparkConf() \
       .setAppName(app_name) \
       .set("spark.hadoop.mapred.output.compress", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \
       .set("spark.hadoop.mapred.output.compression.`type", "BLOCK") \
       .set("spark.speculation", "false")\
       .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")\
       .set("com.amazonaws.services.s3.enableV4", "true")

# Some other configs

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.access.key", s3_key
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", s3_secret
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.multipart.size", "104857600"
)

之后:

# Setup the Spark Process
conf = SparkConf() \
       .setAppName(app_name) \
       .set("spark.hadoop.mapred.output.compress", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \
       .set("spark.hadoop.mapred.output.compression.`type", "BLOCK") \
       .set("spark.speculation", "false")

# Some other configs

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.access.key", s3_key
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", s3_secret
)

这可能意味着这是一个类路径问题。 hadoop-aws 没有被添加到类路径中,因此在幕后它默认为 S3AFileSystem.java 的其他一些实现。 Hadoop和spark在这个领域是一个巨大的痛苦,因为有很多不同的地方和方式来加载东西,而java也特别关注顺序,因为如果它没有按照正确的顺序发生,它就会随随便便一起去。最后加载。希望这可以帮助其他面临同样问题的人。

【讨论】:

  • Hadoop.mapred 属性已被弃用,顺便说一句,您可以将它们放在 xml 文件中而不是代码中
  • @cricket_007 感谢您指出这一点。我也会确保解决这个问题。
  • SparkConf().set("spark.hadoop.fs.s3a.multipart.size", "104857600") 应该可以解决问题,因为 spark.hadoop. 前缀意味着 “自动将该属性推送到 Hadoop conf(并覆盖在 XML 文件中静态设置的通用值)”
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-06-16
  • 2016-05-26
  • 2022-01-11
  • 2021-03-21
  • 2018-08-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多