【发布时间】:2018-08-17 09:25:03
【问题描述】:
我为 pyspark 上的流式虹膜分类编写了这段代码,但我收到了这个错误“'RDD' object has no attribute '_jdf'”。我已将 RDD 更改为数据框,但它告诉“RDD 不是可迭代的”。请帮我解决!!! 非常感谢。
这是我的代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.ml import PipelineModel, Pipeline
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *
conf = SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
sc = SparkContext.getOrCreate(conf = conf)
ssc = StreamingContext(sc,1)
lines = ssc.socketTextStream("localhost", 8889)
#Load ML model
sameModel = PipelineModel.load("g:/Demo/DecisionTree_Model1")
#Predict the type of iris from features
result = line.foreachRDD(lambda rdd: sameModel.transform(rdd))
ssc.start()
ssc.awaitTermination()
错误:“RDD”对象没有属性“_jdf”
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-18f3db416f1c> in <module>()
1 ssc.start()
----> 2 ssc.awaitTermination()
E:\Spark\spark\python\pyspark\streaming\context.py in awaitTermination(self,
timeout)
204 """
205 if timeout is None:
--> 206 self._jssc.awaitTermination()
207 else:
208 self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
E:\Spark\spark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in
__call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
E:\Spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
E:\Spark\spark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in
get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o35.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "E:\Spark\spark\python\pyspark\streaming\util.py", line 65, in call
r = self.func(t, *rdds)
File "E:\Spark\spark\python\pyspark\streaming\dstream.py", line 159, in
<lambda>
func = lambda t, rdd: old_func(rdd)
File "<ipython-input-5-64e27204db5a>", line 1, in <lambda>
result = lines.foreachRDD(lambda rdd: sameModel.transform(rdd))
File "E:\Spark\spark\python\pyspark\ml\base.py", line 173, in transform
return self._transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\pipeline.py", line 262, in _transform
dataset = t.transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\base.py", line 173, in transform
return self._transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\wrapper.py", line 305, in _transform
return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
AttributeError: 'RDD' object has no attribute '_jdf'
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
【问题讨论】: