【问题标题】:hadoop bz2 library in Spark job fails when running on multiple cores在多核上运行时 Spark 作业中的 hadoop bz2 库失败
【发布时间】:2015-02-01 15:47:44
【问题描述】:

我目前在使用 Spark 和读取 bz2 文件时遇到问题。我正在使用 Spark 1.2.0(为 hadoop 2.4 预构建,但文件当前在本地只读)。为了测试,有大约 1500 个文件,每个文件大约 50KB 大小。

以下脚本 count_loglines.py 说明了问题:

 from pyspark import SparkConf, SparkContext
 spark_conf = SparkConf().setAppName("SparkTest")
 sc = SparkContext(conf=spark_conf)

 overall_log_lines = sc.textFile('/files/bzipped/*.log.bz2')
 line_count = overall_log_lines.count()
 print line_count

在一个内核上本地运行脚本,它按预期工作。

spark/bin/spark-submit --master local[1] count_log_lines.py

使用 2 个内核运行脚本

spark/bin/spark-submit --master local[2] count_log_lines.py

以 hadoop bzip2 库的错误消息结尾,例如

 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 60 in stage 0.0 failed 1 times, most recent failure: Lost task 60.0 in stage 0.0 (TID 60, localhost): java.io.IOException: unexpected end of stream
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.bsGetBit(CBZip2InputStream.java:626)

当我事先解压缩文件并读取未压缩的日志文件而不是 bzip 压缩的文件时,即 sc.textFile('/files/unzipped/*.log') 脚本按预期工作,也在多个内核上。

我的问题:这里有什么问题?为什么 Spark 作业在多个内核上运行时无法正确读取 bz2 文件?

感谢您的帮助!

【问题讨论】:

  • 我也有同样的问题。有人有答案吗?
  • 你用的是什么版本的hadoop?
  • 您可能遇到了 Hadoop 中的错误:issues.apache.org/jira/browse/HADOOP-10614
  • 嗨,您是否得到了同样的答案。请让我们知道您的 cmets

标签: apache-spark bzip2


【解决方案1】:

我不确定文本文件是否支持 bz2 文件。

您可能会查看 pyspark newAPIHadoopFile 或 hadoopfile API。如果拆分后的 bz2 文件包含文本(例如日志),您可以使用:

stdout = sc.newAPIHadoopFile(path="/HDFSpath/to/folder/containing/bz2/", inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat", keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text", keyConverter=None, valueConverter=None, conf=None, batchSize=5)

来源:http://spark.apache.org/docs/1.2.0/api/python/pyspark.html

hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

从 HDFS、本地文件系统(在所有节点上可用)或任何 Hadoop 支持的文件系统 URI 中读取带有任意键和值类的“旧”Hadoop InputFormat。机制与 sc.sequenceFile 相同。

Hadoop 配置可以作为 Python 字典传入。这将被转换为 Java 中的配置。

参数: path – Hadoop 文件的路径 inputFormatClass – Hadoop InputFormat 的完全限定类名(例如“org.apache.hadoop.mapred.TextInputFormat”) keyClass – 关键可写类的完全限定类名(例如“org.apache.hadoop.io.Text”) valueClass – 值可写类的完全限定类名(例如“org.apache.hadoop.io.LongWritable”) keyConverter –(默认无) valueConverter –(默认无) conf – Hadoop 配置,作为 dict 传入(默认无) batchSize – 表示为单个 Java 对象的 Python 对象的数量。 (默认0,自动选择batchSize)

newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

从 HDFS、本地文件系统(在所有节点上可用)或任何 Hadoop 支持的文件系统 URI 中读取具有任意键和值类的“新 API”Hadoop InputFormat。机制与 sc.sequenceFile 相同。

Hadoop 配置可以作为 Python 字典传入。这将被转换为 Java 中的配置

参数: path – Hadoop 文件的路径 inputFormatClass – Hadoop InputFormat 的完全限定类名(例如“org.apache.hadoop.mapreduce.lib.input.TextInputFormat”) keyClass – 关键可写类的完全限定类名(例如“org.apache.hadoop.io.Text”) valueClass – 值可写类的完全限定类名(例如“org.apache.hadoop.io.LongWritable”) keyConverter –(默认无) valueConverter –(默认无) conf – Hadoop 配置,作为 dict 传入(默认无) batchSize – 表示为单个 Java 对象的 Python 对象的数量。 (默认0,自动选择batchSize)

Rgs,

K

【讨论】:

    猜你喜欢
    • 2013-06-05
    • 1970-01-01
    • 1970-01-01
    • 2018-04-18
    • 1970-01-01
    • 2015-05-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多