【问题标题】:Pyspark StreamingPyspark 流媒体
【发布时间】:2018-08-17 09:25:03
【问题描述】:

我为 pyspark 上的流式虹膜分类编写了这段代码,但我收到了这个错误“'RDD' object has no attribute '_jdf'”。我已将 RDD 更改为数据框,但它告诉“RDD 不是可迭代的”。请帮我解决!!! 非常感谢。

这是我的代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.ml import PipelineModel, Pipeline
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *

conf = SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
sc = SparkContext.getOrCreate(conf = conf)
ssc = StreamingContext(sc,1)



lines = ssc.socketTextStream("localhost", 8889)

#Load ML model
sameModel = PipelineModel.load("g:/Demo/DecisionTree_Model1")

#Predict the type of iris from features
result = line.foreachRDD(lambda rdd: sameModel.transform(rdd))

ssc.start()
ssc.awaitTermination()

错误:“RDD”对象没有属性“_jdf”

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-6-18f3db416f1c> in <module>()
  1 ssc.start()
----> 2 ssc.awaitTermination()

E:\Spark\spark\python\pyspark\streaming\context.py in awaitTermination(self, 
timeout)
204         """
205         if timeout is None:
--> 206             self._jssc.awaitTermination()
207         else:
208             self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))

E:\Spark\spark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in 
__call__(self, *args)
1255         answer = self.gateway_client.send_command(command)
1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
1258 
1259         for temp_arg in temp_args:

E:\Spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
 61     def deco(*a, **kw):
 62         try:
---> 63             return f(*a, **kw)
 64         except py4j.protocol.Py4JJavaError as e:
 65             s = e.java_exception.toString()

E:\Spark\spark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
326                 raise Py4JJavaError(
327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
329             else:
330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o35.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "E:\Spark\spark\python\pyspark\streaming\util.py", line 65, in call
r = self.func(t, *rdds)
File "E:\Spark\spark\python\pyspark\streaming\dstream.py", line 159, in 
<lambda>
func = lambda t, rdd: old_func(rdd)
File "<ipython-input-5-64e27204db5a>", line 1, in <lambda>
result = lines.foreachRDD(lambda rdd: sameModel.transform(rdd))
File "E:\Spark\spark\python\pyspark\ml\base.py", line 173, in transform
return self._transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\pipeline.py", line 262, in _transform
dataset = t.transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\base.py", line 173, in transform
return self._transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\wrapper.py", line 305, in _transform
return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
AttributeError: 'RDD' object has no attribute '_jdf'

at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

    标签: pyspark load


    【解决方案1】:

    下面的代码展示了如何加载预训练模型。使用套接字源启动 Spark 流并在其上使用转换。然后下沉到控制台。

    spark = SparkSession \
    .builder \
    .appName("transform ml") \
    .getOrCreate()
    
    model = PipelineModel.load("./model")
    
    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()
    
    random = Random()
    words = lines.select(f.lit(random.randint(1, 10000))
                         .alias("id"),
                         lines.value.alias("text")
                         )
    
    prediction = model.transform(words)
    
    query = prediction \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .start()
    
    query.awaitTermination()
    

    【讨论】:

      猜你喜欢
      • 2018-07-28
      • 2017-04-15
      • 2014-04-12
      • 1970-01-01
      • 2012-06-16
      • 2010-12-28
      • 2015-03-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多