【问题标题】:How to set optimal config values - trigger time, maxOffsetsPerTrigger - for Spark Structured Streaming while reading messages from Kafka?如何在从 Kafka 读取消息时为 Spark Structured Streaming 设置最佳配置值 - 触发时间、maxOffsetsPerTrigger?
【发布时间】:2019-10-30 18:56:16
【问题描述】:

我有一个结构化流应用程序从 Kafka 读取消息。每天的消息总数约为 180 亿条,每分钟的峰值消息数 = 12,500,000。 最大消息大小为 2 KB。

如何确保我的结构化流媒体应用能够处理如此大量和如此高速的数据?基本上,我只想知道如何设置最佳触发时间、maxOffsetsPerTrigger 或任何其他使工作顺利进行并能够处理故障和重新启动的配置。

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming spark-structured-streaming


    【解决方案1】:

    您可以以固定间隔的微批处理或连续运行 Spark 结构化流式处理应用程序。以下是一些可用于调整流应用程序的选项。

    Kafka 配置:

    Kafka中的分区数:

    您可以增加 Kafka 中的分区数量。因此,更多的消费者可以同时读取数据。根据输入速率和引导服务器的数量将此设置为适当的数字。

    Spark 流式传输配置:

    驱动和执行器内存配置:

    计算每批数据的大小(#records * 每条消息的大小)并相应地设置内存。

    执行者数量:

    在kafka topic中设置executor的数量为partition的数量。这增加了并行性。同时读取数据的任务数。

    限制偏移量:

    每个触发间隔处理的最大偏移数的速率限制。指定的总偏移量将按比例分配到不同卷的主题分区中。

      val df = spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
        .option("subscribe", "topicName")
        .option("startingOffsets", "latest")
        .option("maxOffsetsPerTrigger", "1000000")
        .load()
    

    使用检查点从故障中恢复:

    如果出现故障或故意关闭,您可以恢复之前查询的进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。

    finalDF
      .writeStream
      .outputMode("complete")
      .option("checkpointLocation", "path/to/HDFS/dir")
      .format("memory")
      .start()
    

    触发器:

    流式查询的触发器设置定义了流式数据处理的时间,查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询来执行。

    【讨论】:

    • 嗨@SantoshK,感谢您的帮助。我可以知道我是否应该更喜欢设置适当的触发间隔(比如 2 分钟),或者设置 maxOffsetsPerTrigger。以及如何找出最佳值?假设我每秒有 1,00,000 条消息流入 Kafka,我必须处理这些消息,我如何确定最佳调整值?
    猜你喜欢
    • 1970-01-01
    • 2017-01-26
    • 2020-12-30
    • 2019-04-27
    • 2021-12-05
    • 2021-05-05
    • 2023-03-08
    • 2017-12-11
    • 2021-01-05
    相关资源
    最近更新 更多