【问题标题】:spark pyspark mllib model - when prediction rdd is generated using map, it throws exception on collect()spark pyspark mllib 模型 - 当使用 map 生成预测 rdd 时,它会在 collect() 上引发异常
【发布时间】:2015-11-20 04:14:14
【问题描述】:

我正在使用 spark 1.2.0(无法升级,因为我无法控制它)。我正在使用 mllib 构建模型

points = labels.zip(tfidf).map(lambda t: LabeledPoint(t[0], t[1] ))
train_data, test_data = points.randomSplit([0.6, 0.4], 17)

iterations = 3
model = LogisticRegressionWithSGD.train(train_data, iterations)

labelsAndPreds = test_data.map(lambda p: (p.label, model.predict(p.features)) )
print("labels = "+str(labelsAndPreds.collect()))

当我运行此代码时,我在 collect() 上收到 NullPointerException。事实上,对预测数据结果的任何操作都会引发此异常。

15/08/26 04:02:43 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 26, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:43 INFO BlockManagerInfo: Added broadcast_15_piece0 in memory on dojo3s10118.rtp1.hadoop.fmr.com:41145 (size: 9.6 KB, free: 529.8 MB)
15/08/26 04:02:43 INFO BlockManagerInfo: Added broadcast_14_piece0 in memory on dojo3s10118.rtp1.hadoop.fmr.com:41145 (size: 68.0 B, free: 529.8 MB)
15/08/26 04:02:43 WARN TaskSetManager: Lost task 0.0 in stage 17.0 (TID 26, dojo3s10118.rtp1.hadoop.fmr.com): java.lang.NullPointerException
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

15/08/26 04:02:43 INFO TaskSetManager: Starting task 0.1 in stage 17.0 (TID 27, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.1 in stage 17.0 (TID 27) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 1]
15/08/26 04:02:44 INFO TaskSetManager: Starting task 0.2 in stage 17.0 (TID 28, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.2 in stage 17.0 (TID 28) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 2]
15/08/26 04:02:44 INFO TaskSetManager: Starting task 0.3 in stage 17.0 (TID 29, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.3 in stage 17.0 (TID 29) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 3]
15/08/26 04:02:44 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job
15/08/26 04:02:44 INFO YarnClientClusterScheduler: Removed TaskSet 17.0, whose tasks have all completed, from pool
15/08/26 04:02:44 INFO YarnClientClusterScheduler: Cancelling stage 17
15/08/26 04:02:44 INFO DAGScheduler: Job 8 failed: collect at /home/a560975/spark-exp/./ml-py-exp-2.py:102, took 0.209401 s
Traceback (most recent call last):
  File "/home/a560975/spark-exp/./ml-py-exp-2.py", line 102, in <module>
    print("labels = "+str(labelsAndPreds.collect()))
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o118.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 29, dojo3s10118.rtp1.hadoop.fmr.com
): java.lang.NullPointerException
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)

如果不是做test_data.map(lambda p: (p.label, model.predict(p.features)) ) 我执行以下操作

for lp in test_data.collect():
    print("predicted = "+str(model.predict(lp.features)))

那么预测不会抛出任何异常,但这不是并行的。 当我尝试通过 map 函数进行模型预测时,为什么会出现异常?我该如何克服它?

我已经尝试sc.broadcast(model) 广播模型,但我仍然看到同样的问题。请帮忙。

【问题讨论】:

标签: apache-spark pyspark rdd apache-spark-mllib


【解决方案1】:

如果你使用 Python ,原因是“在 Python 中,predict 目前不能在 RDD 转换或动作中使用。而是直接在 RDD 上调用 predict。”。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-17
    • 2015-10-19
    • 1970-01-01
    • 1970-01-01
    • 2020-08-13
    • 2016-12-05
    相关资源
    最近更新 更多