【问题标题】:How do I serialize a LabeledPoint RDD in PySpark?如何在 PySpark 中序列化 LabeledPoint RDD?
【发布时间】:2015-11-12 15:49:11
【问题描述】:

使用 PySpark 的 saveAsHadoopFile() 时出现错误,使用 saveAsSequenceFile() 时出现相同的错误。我需要保存 (key,val) 的 RDD,其中键是字符串,val 是 LabeledPoint RDD (label, SparseVector)。错误如下所示。谷歌搜索几个来源似乎我应该能够在 IPython 笔记本中做到这一点。我需要序列化这个大型 RDD,以便我可以在 Java 中处理它,因为 Spark 的一些 MLLib 功能还不能用于 python。根据这个post,这应该是可行的。

看着这个page我明白了:

_picklable_classes = [
    'LinkedList',
    'SparseVector',
    'DenseVector',
    'DenseMatrix',
    'Rating',
    'LabeledPoint',
]

所以我真的不知道为什么会出现这个错误。

代码: 标签DataRDD.saveAsSequenceFile('/tmp/pysequencefile/')

错误:

Py4JJavaError:调用 z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile 时出错。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 527.0 中的任务 0 失败 1 次,最近一次失败:阶段 527.0 中丢失任务 0.0(TID 1454,本地主机):net.razorvine.pickle.PickleException:构造 ClassDict 的预期零参数(用于 numpy.dtype) 在 net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)

编辑:我发现了这个:

public class More ...ClassDictConstructor implements IObjectConstructor     {
12
13  String module;
14  String name;
15
16  public More ...ClassDictConstructor(String module, String name) {
17      this.module = module;
18      this.name = name;
19  }
20
21  public Object More ...construct(Object[] args) {
22      if (args.length > 0)
23          throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")");
24      return new ClassDict(module, name);
25  }
26}

我没有直接使用上面的construct()方法..所以我不知道为什么我尝试的saveAs..方法在它不需要时传递参数。

编辑 2:按照 zero323 的建议(谢谢)处理了一个小故障。当我尝试 zero323 写的内容时出现错误(见下文)。但是,当我派生一个更简单的 RDD 时,它可以工作并将这个更简单的 RDD 保存到 .parquet 文件的目录中(将其分解为几个 .parquet 文件)。更简单的RDD如下:

simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features))
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file")

尝试保存labeledDataRDD时出错:

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
    831         raise TypeError("Can not infer schema for type: %s" % type(row))
    832 
--> 833     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    834     return StructType(fields)
    835 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
    808             return _infer_schema(obj)
    809         except TypeError:
--> 810             raise TypeError("not supported type: %s" % type(obj))
    811 
    812 

TypeError: not supported type: <type 'numpy.unicode_'>

【问题讨论】:

  • python 还没有提供哪些功能?
  • 看看here。我想他们会在 Spark 1.6 中添加这个。我正在使用最新的 Spark 1.5.1。

标签: python apache-spark pyspark apache-spark-mllib


【解决方案1】:

问题的根源不是酸洗本身。如果是,您将看不到net.razorvine.pickle.PickleException。如果您查看saveAsSequenceFile 文档,您会发现它需要两个步骤:

  1. Pyrolite 用于将腌制的 Python RDD 转换为 Java 对象的 RDD。
  2. 此 Java RDD 的键和值被转换为 Writables 并写出。

您的程序在第一步失败,但即使没有,我也不确定预期的 Java 对象是什么以及如何读回它。

我不会使用序列文件,而是将数据写入 Parquet 文件:

from pyspark.mllib.regression import LabeledPoint

rdd = sc.parallelize([
   ("foo", LabeledPoint(1.0, [1.0, 2.0, 3.0])),
   ("bar", LabeledPoint(2.0, [4.0, 5.0, 6.0]))])

sqlContext.createDataFrame(rdd, ("k", "v")).write.parquet("a_parquet_file")

读回并转换:

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

val rdd: RDD[(String, LabeledPoint)] = sqlContext.read.parquet("a_parquet_file")
  .select($"k", $"v.label", $"v.features")
  .map{case Row(k: String, label: Double, features: Vector) =>
    (k, LabeledPoint(label, features))}

rdd.sortBy(_._1, false).take(2)

// Array[(String, org.apache.spark.mllib.regression.LabeledPoint)] = 
//  Array((foo,(1.0,[1.0,2.0,3.0])), (bar,(2.0,[4.0,5.0,6.0])))

或者如果您更喜欢类似 Java 的方法:

def rowToKeyLabeledPointPair(r: Row): Tuple2[String, LabeledPoint] = {
  // Vector -> org.apache.spark.mllib.linalg.Vector
  Tuple2(r.getString(0), LabeledPoint(r.getDouble(1), r.getAs[Vector](2)))
}

sqlContext.read.parquet("a_parquet_file")
  .select($"k", $"v.label", $"v.features")
  .map(rowToKeyLabeledPointPair)

编辑

一般来说,NumPy 类型不支持作为 Spark SQL 中的独立值。如果你在 RDD 中有 Numpy 类型,你必须先将它们转换为标准的 Python 类型:

tmp = rdd.map(lambda kv: (str(kv[0]), kv[1]))
sqlContext.createDataFrame(tmp, ("k", "v")).write.parquet("a_parquet_file")

【讨论】:

  • 您提供了我在 Java 中难以复制的 Scala 代码...我正在尝试从 parquet 文件中读取 java 端的数据帧。 ` DataFrame df = sqlContext.read().parquet("labeledData_parquet_file"); JavaRDD rows = df.toJavaRDD().map(这里发生了什么?);`。 Vector 是一个 mllib 向量。问题是dataFrame包含一个sql Row而不是一个mllib Vector,我需要一个(双标签,SparseVector)的JavaRDD。
  • Row 提供了大量类型化的 getter 方法。您可以将getDoublegetString 用于键和标签,将getAs 用于向量。
猜你喜欢
  • 2021-06-14
  • 2015-09-04
  • 2018-10-20
  • 2015-10-16
  • 2020-12-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-01
相关资源
最近更新 更多