【问题标题】:Spark streaming with broadcast joins带有广播连接的 Spark 流式传输
【发布时间】:2016-02-19 16:03:06
【问题描述】:

我有一个 spark 流用例,我计划在每个执行器上保持广播和缓存数据集。流式传输中的每个微批次都将从 RDD 中创建一个数据帧并加入该批次。下面给出的我的测试代码将为每个批次执行广播操作。有没有办法只播放一次?

val testDF = sqlContext.read.format("com.databricks.spark.csv")
                .schema(schema).load("file:///shared/data/test-data.txt") 

val lines = ssc.socketTextStream("DevNode", 9999)

lines.foreachRDD((rdd, timestamp) => {
    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
    val resultDF = recordDF.join(broadcast(testDF), "Age")
    resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
    }

对于每个批次,都会读取此文件并执行广播。

16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27

16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27

对广播数据集有任何建议吗?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-streaming broadcast


    【解决方案1】:

    看起来现在广播的表没有被重用。见:SPARK-3863

    foreachRDD循环外执行广播:

    val testDF = broadcast(sqlContext.read.format("com.databricks.spark.csv")
     .schema(schema).load(...))
    
    lines.foreachRDD((rdd, timestamp) => { 
      val recordDF = ???
      val resultDF = recordDF.join(testDF, "Age")
      resultDF.write.format("com.databricks.spark.csv").save(...)
    }
    

    【讨论】:

    • 我也尝试过这种方法,但它没有按预期播放。可能是因为 foreachRDD 是在驱动程序的上下文中执行的。顺便说一句,我们必须在 join 语句中使用 testDF.value。我认为这是一个错字。
    • 谢谢!附带说明一下, sc.broadcast(someDataFrame) 会在广播之前将数据带到驱动程序,还是会从每个执行程序进行比特流式广播?我总是在 SQL 中使用广播提示。想知道有什么区别。
    • 这行不通。我的意思是你可以广播DataFrame(毕竟是Serializable),但你不能在DDS上嵌套操作。您可以简单地收集、转换为比Array[Row] 更有用的东西并广播本地数据结构。然后只需使用 UDF。
    猜你喜欢
    • 2018-01-12
    • 2015-10-08
    • 2018-11-12
    • 2017-09-15
    • 1970-01-01
    • 2014-07-19
    • 1970-01-01
    • 2012-03-23
    • 2019-06-27
    相关资源
    最近更新 更多