【发布时间】:2015-11-12 15:49:11
【问题描述】:
使用 PySpark 的 saveAsHadoopFile() 时出现错误,使用 saveAsSequenceFile() 时出现相同的错误。我需要保存 (key,val) 的 RDD,其中键是字符串,val 是 LabeledPoint RDD (label, SparseVector)。错误如下所示。谷歌搜索几个来源似乎我应该能够在 IPython 笔记本中做到这一点。我需要序列化这个大型 RDD,以便我可以在 Java 中处理它,因为 Spark 的一些 MLLib 功能还不能用于 python。根据这个post,这应该是可行的。
看着这个page我明白了:
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
]
所以我真的不知道为什么会出现这个错误。
代码: 标签DataRDD.saveAsSequenceFile('/tmp/pysequencefile/')
错误:
Py4JJavaError:调用 z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile 时出错。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 527.0 中的任务 0 失败 1 次,最近一次失败:阶段 527.0 中丢失任务 0.0(TID 1454,本地主机):net.razorvine.pickle.PickleException:构造 ClassDict 的预期零参数(用于 numpy.dtype) 在 net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
编辑:我发现了这个:
public class More ...ClassDictConstructor implements IObjectConstructor {
12
13 String module;
14 String name;
15
16 public More ...ClassDictConstructor(String module, String name) {
17 this.module = module;
18 this.name = name;
19 }
20
21 public Object More ...construct(Object[] args) {
22 if (args.length > 0)
23 throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")");
24 return new ClassDict(module, name);
25 }
26}
我没有直接使用上面的construct()方法..所以我不知道为什么我尝试的saveAs..方法在它不需要时传递参数。
编辑 2:按照 zero323 的建议(谢谢)处理了一个小故障。当我尝试 zero323 写的内容时出现错误(见下文)。但是,当我派生一个更简单的 RDD 时,它可以工作并将这个更简单的 RDD 保存到 .parquet 文件的目录中(将其分解为几个 .parquet 文件)。更简单的RDD如下:
simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features))
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file")
尝试保存labeledDataRDD时出错:
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
831 raise TypeError("Can not infer schema for type: %s" % type(row))
832
--> 833 fields = [StructField(k, _infer_type(v), True) for k, v in items]
834 return StructType(fields)
835
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
808 return _infer_schema(obj)
809 except TypeError:
--> 810 raise TypeError("not supported type: %s" % type(obj))
811
812
TypeError: not supported type: <type 'numpy.unicode_'>
【问题讨论】:
-
python 还没有提供哪些功能?
-
看看here。我想他们会在 Spark 1.6 中添加这个。我正在使用最新的 Spark 1.5.1。
标签: python apache-spark pyspark apache-spark-mllib