【问题标题】:Joining two streams from same spark streaming dataset加入来自同一火花流数据集的两个流
【发布时间】:2018-08-02 04:17:46
【问题描述】:

Spark 结构化流 (2.3) 中是否允许连接来自同一输入流数据集的两个流?

例如在下面的示例查询中,连接了两个流。 我在 Azure eventthub spark 客户端中收到 IllegalStateException。

这会起作用吗?

eventhubs = spark.readStream ... .createOrReplaceTempView("Input")

spark.sql("SELECT temperature, time, device,  category FROM Input").createOrReplaceTempView("devices1")

spark.sql("SELECT temperature, time, device,  category FROM Input").createOrReplaceTempView("devices2")

val d1 = spark.sql("SELECT * FROM devices1 WHERE device=0")
val d2 = spark.sql("SELECT * FROM devices2 WHERE device=1")


val output = d1.join(
                        d2,
                        expr("""
                          devices1.category = devices2.category AND
                          devices1.time >= devices2.time AND
                          devices1.time <= devices2.time + interval 1 seconds
                          """),
                        joinType = "inner"      
                       )

display(output)

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    据我所知,Spark Structured Streaming 上的半连接是允许的,但仅限于附加输出模式。

    这是一个例子:

    class ExampleTest extends SparkBaseSpec {
    
      import spark.implicits._
    
      private val data: DataFrame = spark.range(1, 5).toDF
    
      data.write.parquet("/tmp/streaming/")
    
      val readStr = spark.readStream.schema(data.schema).parquet("/tmp/streaming/")
    
      val df = readStr
        .select($"id".as("id1"))
        .where("id1<50")
        .join(readStr.select($"id".as("id2")).where("id2<50"), $"id1" === $"id2")
    
      df.printSchema()
    
      val stream = df.writeStream
        .option("checkpointLocation", "/tmp/spark-streaming-checkpoint")
        .format("console")
        .outputMode("append")
        .start
    
      spark.range(20, 25).toDF.write.mode("append").parquet("/tmp/streaming/")
    
      stream.awaitTermination(30000)
    
    }
    
    root
     |-- id1: long (nullable = false)
     |-- id2: long (nullable = false)
    
    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +---+---+
    |id1|id2|
    +---+---+
    |  1|  1|
    |  3|  3|
    |  2|  2|
    |  4|  4|
    +---+---+
    
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +---+---+
    |id1|id2|
    +---+---+
    | 22| 22|
    | 21| 21|
    | 23| 23|
    | 20| 20|
    | 24| 24|
    +---+---+
    

    顺便说一下,你不需要创建两个临时视图,一个就足够了。

    希望对你有帮助!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-07-07
      • 1970-01-01
      • 1970-01-01
      • 2019-10-11
      • 2020-04-18
      • 2020-06-21
      • 2019-04-29
      • 2016-08-21
      相关资源
      最近更新 更多