【发布时间】:2021-02-22 08:53:28
【问题描述】:
我在默认触发器中执行流式传输。我的目标是限制每次执行中的读取量,以避免大量的微批处理。有时我的 Spark Jobs 整个周末都会停止,所以当我重新启动它们时,它们需要很长时间才能完成第一个。我还保留了 Dataframes,因为这是写在 2 个数据库中的。测试了两种方法。
官方文档说 maxOffsetsPerTrigger 限制了每个触发间隔处理的偏移量,但这对我不起作用。我是不是误解了这个参数的意思?
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
另外,我阅读了this 的答案,但我不知道在哪里以及如何正确设置 max.poll.records。我尝试了 readStream 的选项,但没有成功。代码如下:
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("max.poll.records", "1")
.load()
主要功能:
override def execute(spark: SparkSession, args: Array[String]): Unit = {
val basePath: String = args(0)
val kafkaServers: String = args(1)
val kafkaTopic: String = args(2)
val checkpoint: String = args(3)
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
val transformed = read
.transform(applySchema)
.transform(conversions)
.transform(dropDuplicates)
.transform(partitioning)
val sink = new FileSystemSink(basePath)
val query = transformed
.writeStream
.outputMode(OutputMode.Append)
.foreachBatch(sink.writeOnS3 _)
.option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
.start()
query.awaitTermination()
}
除了上面的问题,限制偏移量的写法是什么?
Spark 版本:2.4.5。
【问题讨论】:
-
maxOffsetsPerTrigger的配置对我来说很好,你对它有正确的理解。max.poll.records适用于普通的 KafkaConsumers,不适用于 Spark 结构化流。你能展示完整的代码来重现你的问题吗? -
当然可以。我将编辑问题。
-
我认为没有什么不同:/
-
maxOffsetsPerTrigger 上设置的值是每个分区吗?
标签: apache-spark apache-kafka spark-structured-streaming