【问题标题】:Max offsets in each Micro Batch每个微批次中的最大偏移量
【发布时间】:2021-02-22 08:53:28
【问题描述】:

我在默认触发器中执行流式传输。我的目标是限制每次执行中的读取量,以避免大量的微批处理。有时我的 Spark Jobs 整个周末都会停止,所以当我重新启动它们时,它们需要很长时间才能完成第一个。我还保留了 Dataframes,因为这是写在 2 个数据库中的。测试了两种方法。

官方文档说 ma​​xOffsetsPerTrigger 限制了每个触发间隔处理的偏移量,但这对我不起作用。我是不是误解了这个参数的意思?

  val read = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", "1")
  .load()

另外,我阅读了this 的答案,但我不知道在哪里以及如何正确设置 ma​​x.poll.records。我尝试了 readStream 的选项,但没有成功。代码如下:

  val read = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("max.poll.records", "1")
  .load()

主要功能:

override def execute(spark: SparkSession, args: Array[String]): Unit = {
    val basePath: String = args(0)
    val kafkaServers: String = args(1)
    val kafkaTopic: String = args(2)
    val checkpoint: String = args(3)

    val read = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServers)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("maxOffsetsPerTrigger", "1")
      .load()

    val transformed = read
      .transform(applySchema)
      .transform(conversions)
      .transform(dropDuplicates)
      .transform(partitioning)

    val sink = new FileSystemSink(basePath)

    val query = transformed
      .writeStream
      .outputMode(OutputMode.Append)
      .foreachBatch(sink.writeOnS3 _)
      .option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
      .start()

    query.awaitTermination()
  }

除了上面的问题,限制偏移量的写法是什么?

Spark 版本:2.4.5。

【问题讨论】:

  • maxOffsetsPerTrigger 的配置对我来说很好,你对它有正确的理解。 max.poll.records 适用于普通的 KafkaConsumers,不适用于 Spark 结构化流。你能展示完整的代码来重现你的问题吗?
  • 当然可以。我将编辑问题。
  • 我认为没有什么不同:/
  • maxOffsetsPerTrigger 上设置的值是每个分区吗?

标签: apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

我再次测试,ma​​xOffsetsPerTrigger 工作得很好。我误解了触发器的结果,现在它是有道理的。参数表示读取的总偏移量,而不是每个分区的偏移量。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-08-06
    • 2019-12-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-09
    • 2015-10-05
    • 1970-01-01
    相关资源
    最近更新 更多