【问题标题】:Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties当我们使用 auto.offset.reset=latest kafka 属性时是否需要 FlinkKafkaConsumer setStartFromLatest() 方法
【发布时间】:2021-08-05 10:35:02
【问题描述】:

在我的 flink 应用程序中,我有 kafka 数据源。我正在使用 kafka 属性auto.offset.reset=latest。我想知道是否需要使用FlinkKafkaConsumer.setStartFromLatest()。它们相似吗?我可以使用其中任何一个吗?以下是来自 flink 代码的文档。但是不清楚这个方法和kafka属性有什么关系。

/**
 * Specifies the consumer to start reading from the latest offset for all partitions. This lets
 * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
 *
 * <p>This method does not affect where partitions are read from when the consumer is restored
 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
 * only the offsets in the restored state will be used.
 *
 * @return The consumer object, to allow function chaining.
 */
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
    this.startupMode = StartupMode.LATEST;
    this.startupOffsetsTimestamp = null;
    this.specificStartupOffsets = null;
    return this;
}

【问题讨论】:

    标签: apache-kafka apache-flink flink-streaming


    【解决方案1】:

    如果调用setStartFromLatest(),则无需将auto.offset.reset=latest 放入属性映射中。

    在内部,Flink 使用 Kafka 消费者客户端的 assign 方法来管理 Flinks 任务的分区分配。它使用startupMode 的值来初始化Kafka 消费者。 startupMode通过setStartFrom...方法设置,默认为GROUP_OFFSETS

    如果使用FlinkKafkaConsumer,则必须将consumer group id 放入属性中。另一种选择是使用KafkaSouce.builder()(sample code),它提供了设置这些东西的功能。

    【讨论】:

      猜你喜欢
      • 2022-11-14
      • 1970-01-01
      • 2011-08-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-13
      • 2012-03-14
      相关资源
      最近更新 更多