【问题标题】:Kafka DSL - Manual Commit with MessageKafka DSL - 使用消息手动提交
【发布时间】:2016-09-04 07:16:34
【问题描述】:

我们正在尝试使用以下代码使用 Spring DSL Kafka 实现手动提交。我们找不到任何对它的引用。

我们可以将消费者属性添加为“Auto.commit”作为“False”,但我们希望在成功处理消息后提交消息。有人可以帮忙吗?

@Bean
        IntegrationFlow consumer()  {
    KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka
                    .inboundChannelAdapter(new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
                    .consumerProperties(
                            props -> props.put("zookeeper.session.timeout.ms", "500").put("zookeeper.sync.time.ms", "250").
                            put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").
                            put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").
                            put("auto.offset.reset", "smallest").
                            put("auto.commit.interval.ms", "100")
                            )
                    .addConsumer(this.kafkaConfig.getConsumerGroup(),
                            metadata -> metadata.consumerTimeout(100)
                                    .topicStreamMap(m -> m.put(this.kafkaConfig.getTopicRead(), 1)).maxMessages(1));

            Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
            return IntegrationFlows.from(messageSourceSpec, endpointConfigurer)
                    .<Map<String, List<byte[]>>> handle((payload, headers) -> {
                        payload.entrySet().forEach(e -> processMessage((ConcurrentHashMap<Integer, List<byte[]>>) e.getValue()));
                        return null;
                    }).get();
        }

版本:

<dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
            <version>1.1.0.RELEASE</version>
        </dependency>

【问题讨论】:

  • headers 中有一个 Acknowledge 对象可以让你提交。

标签: spring spring-boot spring-integration apache-kafka kafka-consumer-api


【解决方案1】:

请参阅相关的 GH 问题 https://github.com/spring-projects/spring-kafka/issues/69

解决方案听起来像:

仔细观察,我会说您必须手动提交偏移量:

ConsumerConnector
...
/**
   *  Commit the offsets of all broker partitions connected by this connector.
   */
  public void commitOffsets();
  public void commitOffsets(boolean retryOnFailure);

这是 KafkaHighLevelConsumerMessageSource 所基于的 Kafka Consumer 桥。

您可以通过KafkaConsumerContext bean 访问它。不幸的是,它注册了一些生成的名称,例如 org.springframework.integration.kafka.support.KafkaConsumerContext#0 ,但如果您只有一个这样的 Kafka 消费者 (KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec),您可以按类型获取该 bean(或只是 @Autowired 它)。

之后你可以这样:

kafkaConsumerContext.getConsumerConfiguration(this.kafkaConfig.getConsumerGroup())
                                    .getConsumerConnector()
                                    .commitOffsets();

【讨论】:

    猜你喜欢
    • 2021-03-21
    • 2017-10-07
    • 1970-01-01
    • 1970-01-01
    • 2017-09-10
    • 1970-01-01
    • 2016-04-26
    • 1970-01-01
    • 2018-11-23
    相关资源
    最近更新 更多