【发布时间】:2017-01-20 08:56:19
【问题描述】:
TL;DR - 我有一个看起来像 PySpark 应用程序中的字符串 DStream。我想将它作为 DStream[String] 发送到 Scala 库。不过,Py4j 不会转换字符串。
我正在开发一个 PySpark 应用程序,该应用程序使用 Spark Streaming 从 Kafka 中提取数据。我的消息是字符串,我想在 Scala 代码中调用一个方法,并传递一个 DStream[String] 实例。但是,我无法在 Scala 代码中接收正确的 JVM 字符串。在我看来,Python 字符串没有被转换成 Java 字符串,而是被序列化了。
我的问题是:如何从 DStream 对象中获取 Java 字符串?
这是我想出的最简单的 Python 代码:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
我在 PySpark 中运行此代码,并将路径传递给我的 JAR:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
在 Scala 方面,我有:
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
现在,假设我将一些数据发送到 Kafka:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Scala 代码中的println 语句打印如下内容:
[B@758aa4d9
我希望得到foo bar。
现在,如果我将 Scala 代码中的简单 println 语句替换为以下内容:
rdd.foreach(v => println(v.getClass.getCanonicalName))
我明白了:
java.lang.ClassCastException: [B cannot be cast to java.lang.String
这表明字符串实际上是作为字节数组传递的。
如果我只是尝试将此字节数组转换为字符串(我知道我什至没有指定编码):
def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(bytes => println(new String(bytes)))
})
}
我得到了一些看起来的东西(特殊字符可能会被去掉):
�]qXfoo barqa.
这表明 Python 字符串已被序列化(腌制?)。我怎样才能检索到正确的 Java 字符串?
【问题讨论】:
标签: apache-spark pyspark rdd