【问题标题】:Manual Acknowledgement (Checkpointing) of Messages: Spring Cloud Stream Kenesis Binder消息的手动确认(检查点):Spring Cloud Stream Kenesis Binder
【发布时间】:2021-06-03 17:44:23
【问题描述】:

我们正在尝试将使用来自 Kafka 的消息的 Spring Cloud Stream 应用程序移植到 AWS Kenesis。我们需要手动确认来处理某些超时条件。

对于 Kafka,我们使用属性 autocommitoffset 为 false 并使用 ACKNOWLEDGMENT 标头来处理手动确认。

我浏览了 Spring Cloud Stream 的文档 经历了以下: https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/ https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc

但找不到任何解决方案。任何指针都会非常有帮助。

【问题讨论】:

    标签: spring-cloud-stream spring-cloud-dataflow


    【解决方案1】:

    经过一番搜索,找到了解决方案:

    在 Kenesis 中,shard 相当于 partitioncheckpoint 相当于 offset

    在应用程序 Yml 中:

    spring:
      cloud:
        stream:
          kinesis:
            bindings:
              consumer-in-0:
                consumer:
                  checkpointMode: manual
    

    检查点示例代码

     @Bean
        public Consumer<Message<String>> consume() {
            return message -> {
                System.out.println("message received : "+ message.getPayload());
                System.out.println("message headers : "+ message.getHeaders());
                Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
                checkPointer.checkpoint();
               
            };
        }
    

    推荐 Kenesis Consumer Binder Properties

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-06-20
    • 2020-07-26
    • 1970-01-01
    • 2016-12-27
    • 2023-03-29
    • 2022-11-19
    • 2018-03-09
    • 2019-04-19
    相关资源
    最近更新 更多