【问题标题】:Multiple operations/aggregations on the same Dataframe/Dataset in Spark Structured StreamingSpark结构化流中同一数据帧/数据集上的多个操作/聚合
【发布时间】:2019-08-09 16:48:03
【问题描述】:

我使用 Spark 2.3.2。

我正在从 Kafka 接收数据。我必须对同一数据进行多次聚合。然后所有聚合结果将进入同一个数据库(列或表可能会更改)。例如:

val kafkaSource = spark.readStream.option("kafka") ...
val agg1 = kafkaSource.groupBy().agg ...
val agg2 = kafkaSource.groupBy().mapgroupswithstate() ...
val agg3 = kafkaSource.groupBy().mapgroupswithstate() ...

但是当我尝试为每个聚合结果调用 writeStream 时:

aggr1.writeStream().foreach().start()
aggr2.writeStream().foreach().start()
aggr3.writeStream().foreach().start()

Spark 在每个 writeStream 中独立接收数据。这种方式有效吗?

我可以用一个 writeStream 进行多个聚合吗?如果可以的话,这种方式效率高吗?

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-structured-streaming


    【解决方案1】:

    每个“writestream”操作都会产生一个新的流式查询。每个流式查询都将从源中读取并执行整个查询计划。与 DStream 不同,没有可用的缓存/持久选项。

    在 spark 2.4 中,引入了一个新的 API “forEachBatch”,以更有效的方式解决这类场景。

    【讨论】:

      【解决方案2】:

      缓存可用于避免多次读取:

      kafkaSource.writeStream.foreachBatch((df, id) => {
        df.persist()
        val agg1 = df.groupBy().agg ...
        val agg2 = df.groupBy().mapgroupswithstate() ...
        val agg3 = df.groupBy().mapgroupswithstate() ...
        df.unpersist()
      }).start()
      

      【讨论】:

      • 我使用 Spark 2.3.2。而且它不支持这个解决方案。
      猜你喜欢
      • 2016-04-29
      • 1970-01-01
      • 2018-11-08
      • 2018-12-16
      • 2021-10-23
      • 2023-01-12
      • 2017-05-12
      相关资源
      最近更新 更多