您可以以固定间隔的微批处理或连续运行 Spark 结构化流式处理应用程序。以下是一些可用于调整流应用程序的选项。
Kafka 配置:
Kafka中的分区数:
您可以增加 Kafka 中的分区数量。因此,更多的消费者可以同时读取数据。根据输入速率和引导服务器的数量将此设置为适当的数字。
Spark 流式传输配置:
驱动和执行器内存配置:
计算每批数据的大小(#records * 每条消息的大小)并相应地设置内存。
执行者数量:
在kafka topic中设置executor的数量为partition的数量。这增加了并行性。同时读取数据的任务数。
限制偏移量:
每个触发间隔处理的最大偏移数的速率限制。指定的总偏移量将按比例分配到不同卷的主题分区中。
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topicName")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", "1000000")
.load()
使用检查点从故障中恢复:
如果出现故障或故意关闭,您可以恢复之前查询的进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。
finalDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
触发器:
流式查询的触发器设置定义了流式数据处理的时间,查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询来执行。