【问题标题】:Use cases for using multiple queries for Spark Structured streaming使用多个查询进行 Spark 结构化流式传输的用例
【发布时间】:2021-02-19 13:47:16
【问题描述】:

我需要从多个 Kafka 主题[基于 Avro] 进行流式传输,并将它们放入 Greenplum 中,并对有效负载进行少量修改。

Kaka 主题在配置文件中定义为一个列表,每个 Kafka 主题将有一个目标表。

我正在寻找单个 Spark 结构化应用程序和配置文件中的更新以收听新主题或停止。听主题。

我正在寻求帮助,因为我对使用单个查询与多个查询感到困惑:

val query1 = df.writeStream.start()
val query2 = df.writeStream.start()

spark.streams.awaitAnyTermination()

df.writeStream.start().awaitAnyTermination()

在哪些用例中应该使用多个查询而不是单个查询

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-streaming spark-structured-streaming


    【解决方案1】:

    显然,您可以使用正则表达式模式来使用来自不同 kafka 主题的数据。

    假设您有“topic-ingestion1”、“topic-ingestion2”之类的主题名称 - 然后您可以创建一个正则表达式模式来使用以“*ingestion”结尾的所有主题的数据。

    一旦以您的正则表达式模式的格式创建新主题 - spark 将自动开始从新创建的主题流式传输数据。

    参考: [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#consumer-caching]

    您可以使用此参数来指定您的缓存超时。 “spark.kafka.consumer.cache.timeout”。

    来自火花文档:

    spark.kafka.consumer.cache.timeout - 最短时间 消费者在有资格被驱逐之前可能会在池中闲置 由驱逐者。

    假设您有多个接收器从 kafka 读取数据并将其写入两个不同的位置,例如 hdfs 和 hbase - 那么您可以将应用程序逻辑分支到两个 writeStreams。

    如果接收器 (Greenplum) 支持批处理操作模式 - 那么您可以查看 spark 结构化流中的 forEachBatch() 函数。这将允许我们为这两个操作重用相同的 batchDF。

    参考: [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#consumer-caching]

    【讨论】:

    • 感谢您的关注。我只有一个目标/接收器,即 Greenplum,但每个主题有效负载将在 GP 中具有相应的表。所以我应该对每个主题或单个主题有不同的查询。将运行多个查询比单个查询更有优势。现在,我正在使用单个查询,在 foreachBatch 内部,我正在执行 select(TOPIC).distinct.forEach 以获取主题虎钳数据,然后从主题中获取表名以写入 jdbc。这是一个好方法吗?
    • 每次调用 writeStreams.start() - 都会触发一个新的独立流式查询(消耗更多资源)。因此,foreachBatch() Sink - 如果您想处理一次数据并将输出写入多个接收器,这是理想的方式
    • 嗨@JDev,这澄清了你的问题吗?!
    猜你喜欢
    • 2020-08-11
    • 1970-01-01
    • 2020-07-17
    • 2018-05-27
    • 1970-01-01
    • 2021-02-06
    • 2018-08-11
    • 2019-12-10
    • 1970-01-01
    相关资源
    最近更新 更多