【问题标题】:Avoiding multiple streaming queries避免多个流式查询
【发布时间】:2018-11-15 09:31:01
【问题描述】:

我有一个结构化的流式查询,它下沉到 Kafka。此查询具有复杂的聚合逻辑。

我想将此查询的输出 DF 下沉到多个 Kafka 主题,每个主题都分区在不同的“键”列上。我不想为每个不同的 Kafka 主题设置多个 Kafka 接收器,因为这意味着运行多个流式查询 - 每个 Kafka 主题一个,特别是因为我的聚合逻辑很复杂。

问题:

  1. 有没有办法将结构化流式查询的结果输出到多个 Kafka 主题,每个主题都有不同的键列,但不必执行多个流式查询?

  2. 如果不是,那么级联多个查询是否有效,这样第一个查询进行复杂的聚合并将输出写入 Kafka,然后其他查询只读取第一个查询的输出并将其主题写入Kafka 从而避免再次进行复杂的聚合?

提前感谢您的帮助。

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    所以答案有点盯着我的眼睛。它也被记录在案。下方链接。

    一个查询可以写入多个 Kafka 主题。如果您要编写的数据框有一个名为“主题”的列(以及“键”和“值”列),它将一行的内容写入该行中的主题。这会自动起作用。所以你唯一需要弄清楚的是如何生成该列的值。

    这已记录在案 - https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

    【讨论】:

    • 您的解决方案是否适合您?我需要在 Spark 结构化流上执行多级聚合。你能建议一种方法吗?
    【解决方案2】:

    我也在寻找这个问题的解决方案,就我而言,它不一定是 kafka sink。我想在 sink1 中写入数据帧的一些记录,而在 sink2 中写入一些其他记录(取决于某些条件,在 2 个流式查询中不读取相同的数据两次)。 目前,按照当前的实现似乎不可能(DataSource.scala 中的 createSink() 方法提供对单个接收器的支持)。

    然而,在 Spark 2.4.0 中,有一个新的 api 即将到来:foreachBatch(),它将处理一个数据帧微批处理,该微批处理可用于缓存数据帧、写入不同的接收器或在重新取消缓存之前多次处理。 像这样的:

    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      batchDF.cache()
      batchDF.write.format(...).save(...)  // location 1
      batchDF.write.format(...).save(...)  // location 2
      batchDF.uncache()
    }
    

    现在这个功能在 databricks 运行时可用: https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch

    编辑 18 年 11 月 15 日: 它现在在 Spark 2.4.0 (https://issues.apache.org/jira/browse/SPARK-24565) 中可用

    【讨论】:

    • 那么,如果我的来源是 KAFKA,我会自动获得一个缓存,并且所有接收器都会看到多个来源的相同消息?
    • 所有 sinks 将在同一个 microbatch 中看到相同的消息,并且只会从 Source(不管是 kafka 还是其他东西)读取数据一次。
    【解决方案3】:

    没有办法在开箱即用的结构化流中进行一次读取和多次写入。唯一的方法是实现将写入多个主题的自定义接收器。

    每当您调用 dataset.writeStream().start() 时,spark 都会启动一个新流,该流从源 (readStream()) 读取并写入接收器 (writeStream())。

    即使您尝试级联,spark 也会创建两个单独的流,每个流具有一个源和一个接收器。也就是说,它会读取、处理和写入两次数据:

    Dataset df = <aggregation>; 
    StreamingQuery sq1 = df.writeStream()...start(); 
    StreamingQuery sq2 = df.writeStream()...start();
    

    有一种方法可以在 spark 流式传输中缓存读取数据,但此选项尚不适用于结构化流式传输。

    【讨论】:

    • Yuriy,尽管您说的开箱即用的多个写入在流式传输结构中不可用是正确的,但对于 Kafka,这是开箱即用的。请在下面查看我的答案。
    猜你喜欢
    • 1970-01-01
    • 2015-02-16
    • 1970-01-01
    • 2020-09-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-23
    相关资源
    最近更新 更多