【问题标题】:Transforming PySpark RDD with Scala使用 Scala 转换 PySpark RDD
【发布时间】:2017-01-20 08:56:19
【问题描述】:

TL;DR - 我有一个看起来像 PySpark 应用程序中的字符串 DStream。我想将它作为 DStream[String] 发送到 Scala 库。不过,Py4j 不会转换字符串。

我正在开发一个 PySpark 应用程序,该应用程序使用 Spark Streaming 从 Kafka 中提取数据。我的消息是字符串,我想在 Scala 代码中调用一个方法,并传递一个 DStream[String] 实例。但是,我无法在 Scala 代码中接收正确的 JVM 字符串。在我看来,Python 字符串没有被转换成 Java 字符串,而是被序列化了。

我的问题是:如何从 DStream 对象中获取 Java 字符串?


这是我想出的最简单的 Python 代码:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()

我在 PySpark 中运行此代码,并将路径传递给我的 JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar

在 Scala 方面,我有:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}

现在,假设我将一些数据发送到 Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN

Scala 代码中的println 语句打印如下内容:

[B@758aa4d9

我希望得到foo bar

现在,如果我将 Scala 代码中的简单 println 语句替换为以下内容:

rdd.foreach(v => println(v.getClass.getCanonicalName))

我明白了:

java.lang.ClassCastException: [B cannot be cast to java.lang.String

这表明字符串实际上是作为字节数组传递的。

如果我只是尝试将此字节数组转换为字符串(我知道我什至没有指定编码):

      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }

我得到了一些看起来的东西(特殊字符可能会被去掉):

�]qXfoo barqa.

这表明 Python 字符串已被序列化(腌制?)。我怎样才能检索到正确的 Java 字符串?

【问题讨论】:

    标签: apache-spark pyspark rdd


    【解决方案1】:

    长话短说,没有支持的方式来做这样的事情。不要在生产中尝试这个。您已收到警告。

    一般来说,Spark 不会将 Py4j 用于驱动程序上的一些基本 RPC 调用,并且不会在任何其他机器上启动 Py4j 网关。当需要时(主要是 MLlib 和 SQL 的某些部分),Spark 使用Pyrolite 序列化在 JVM 和 Python 之间传递的对象。

    API 的这一部分是私有的 (Scala) 或内部的 (Python),因此不适合一般用途。虽然理论上您可以按批次访问它:

    package dummy
    
    import org.apache.spark.api.java.JavaRDD
    import org.apache.spark.streaming.api.java.JavaDStream
    import org.apache.spark.sql.DataFrame
    
    object PythonRDDHelper {
      def go(rdd: JavaRDD[Any]) = {
        rdd.rdd.collect {
          case s: String => s
        }.take(5).foreach(println)
      }
    }
    

    完整的流:

    object PythonDStreamHelper {
      def go(stream: JavaDStream[Any]) = {
        stream.dstream.transform(_.collect {
          case s: String => s
        }).print
      }
    }
    

    或将单个批次公开为DataFrames(可能是最不邪恶的选择):

    object PythonDataFrameHelper {
      def go(df: DataFrame) = {
        df.show
      }
    }
    

    并按如下方式使用这些包装器:

    from pyspark.streaming import StreamingContext
    from pyspark.mllib.common import _to_java_object_rdd
    from pyspark.rdd import RDD
    
    ssc = StreamingContext(spark.sparkContext, 10)
    spark.catalog.listTables()
    
    q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 
    
    # Reserialize RDD as Java RDD<Object> and pass 
    # to Scala sink (only for output)
    q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
        _to_java_object_rdd(rdd)
    ))
    
    # Reserialize and convert to JavaDStream<Object>
    # This is the only option which allows further transformations
    # on DStream
    ssc._jvm.dummy.PythonDStreamHelper.go(
        q.transform(lambda rdd: RDD(  # Reserialize but keep as Python RDD
            _to_java_object_rdd(rdd), ssc.sparkContext
        ))._jdstream
    )
    
    # Convert to DataFrame and pass to Scala sink.
    # Arguably there are relatively few moving parts here. 
    q.foreachRDD(lambda rdd: 
        ssc._jvm.dummy.PythonDataFrameHelper.go(
            rdd.map(lambda x: (x, )).toDF()._jdf
        )
    )
    
    ssc.start()
    ssc.awaitTerminationOrTimeout(30)
    ssc.stop()
    

    这不受支持,未经测试,因此除了使用 Spark API 进行实验外,其他任何东西都没有用。

    【讨论】:

    • 我很高兴能帮上忙。我可能在这里夸大了一点。如果您的目标是构建独立于语言的扩展,那么您真的无法避免修补内部结构,但开发人员在这里做出了有意识的决定,并且不适合胆小的人。
    • 嗨@zero323 我在这里做同样的过程,但是在这个过程中遇到了很大的问题,我创建了一个对象来与一个kerberized kafka 通信我的python 应用程序。但是当我创建对象时,spark的jvm在对象中找不到我的函数。如果我创建一个类,它会找到该类。但由于错误无法发送 rdd 对象:pyKafka([org.apache.spark.api.java.JavaRDD, class java.lang.String]) does not exist 我正在按照步骤操作。可以穿什么?
    猜你喜欢
    • 1970-01-01
    • 2023-03-13
    • 2016-08-21
    • 1970-01-01
    • 1970-01-01
    • 2016-01-03
    • 2016-05-29
    • 2016-03-16
    • 1970-01-01
    相关资源
    最近更新 更多