【发布时间】:2016-12-21 09:56:32
【问题描述】:
我正在使用以下代码来规范化 PySpark DataFrame
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
cols = ["a", "b", "c"]
df = spark.createDataFrame([(1, 0, 3), (2, 3, 2), (1, 3, 1), (3, 0, 3)], cols)
Pipeline(stages=[
VectorAssembler(inputCols=cols, outputCol='features'),
StandardScaler(withMean=True, inputCol='features', outputCol='scaledFeatures')
]).fit(df).transform(df).select(cols + ['scaledFeatures']).head()
这给出了预期的结果:
Row(a=1, b=0, c=3, scaledFeatures=DenseVector([-0.7833, -0.866, 0.7833]))
但是,当我在从 parquet 文件加载的(大得多的)数据集上运行管道时,我收到以下异常:
16/12/21 09:47:50 WARN TaskSetManager: Lost task 0.0 in stage 60.0 (TID 6370, 10.231.153.67): org.apache.spark.SparkException: Failed to execute user defined function($anonfu
n$2: (vector) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply2_2$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.IllegalArgumentException: Do not support vector type class org.apache.spark.mllib.linalg.SparseVector
at org.apache.spark.mllib.feature.StandardScalerModel.transform(StandardScaler.scala:160)
at org.apache.spark.ml.feature.StandardScalerModel$$anonfun$2.apply(StandardScaler.scala:167)
at org.apache.spark.ml.feature.StandardScalerModel$$anonfun$2.apply(StandardScaler.scala:167)
... 13 more
我注意到这里的 VectorAssembler 已将我的列转换为 mllib.linalg.SparseVector 而不是第一种情况中使用的 DenseVector。
有什么办法可以解决这个问题吗?
【问题讨论】:
-
您使用的是哪个版本的 spark?
-
火花 2.0.1。很确定这个答案stackoverflow.com/questions/35844330/… 是关键。目前正在尝试将 SparseVector 转换为 DenseVector,但这也不是直截了当的。
-
“b = DenseVector(a.toArray())”不是一个直接的解决方案吗?
-
Spark 还是个新手。我正在弄清楚如何将该转换应用于数据框中的列。 udf 是最好的选择吗?例如。 asDense = udf(lambda s: DenseVector(s.toArray()), VectorUDT()) df = df.withColumn('features', asDense(df.features))
-
也可以将其添加为管道中的转换,但我不确定如何添加任意转换......
标签: apache-spark pyspark spark-dataframe apache-spark-mllib pyspark-sql