【问题标题】:Spring Cloud Stream with Kafka Binder: /bindings Actuator API does not stop producerSpring Cloud Stream with Kafka Binder:/bindings Actuator API 不会停止生产者
【发布时间】:2021-10-23 23:46:14
【问题描述】:

我有一个带有 Actuator 和 Kafka 活页夹的 Spring Cloud Stream 项目。我正在探索bindings/ 执行器,并试图阻止生产者作为练习。我通过 curl 发出以下 POST 请求:

curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'

实际结果: 查询返回 204。生产者的状态(从 GET /actuator/bindings/producer-out-0 看到)现在是 stopped。但是,生产者仍在生产消息,这可以从主题上的日志记录和消费者活动中看出。

预期结果: 我希望生产者停止生产消息。 (我也尝试过使用 PAUSED 状态,它也返回 204,但是错误日志表明无法暂停此生产者。)

我是否误解了这个执行器的工作原理?当生产者停止时,是否期望 S.C.S.会继续投票给那个制片人吗?我知道的唯一文档是here,但据我所知,它并没有回答我的问题。

背景

我正在使用 spring-boot-starter-parent 2.5.3 并将 starter-web 和 starter-actuator 列为依赖项。我不认为我缺少任何东西。

这是生产者/消费者对。如您所见,我正在使用可轮询的供应商。

@Configuration
@Profile("numbers")
public class NumberHandlers {
  private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);

  @Bean
  public Supplier<Integer> producer() {
    // Needed an effectively-final mutable integer. Side-bar comments welcome. :P
    var counter = new AtomicInteger();
    return () -> {
      var n = counter.getAndIncrement();
      LOGGER.info("Producing number: " + n);
      return n;
    };
  }

  @Bean
  public Consumer<Integer> consumer() {
    return it -> LOGGER.info("Consuming number: " + it);
  }
}

当我传入numbers 个人资料时,这些都是活动的。我的配置如下。

application.yml:

server:
  port: 8081
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${env.kafka.bootstrapservers:localhost}
management:
  endpoints:
    web:
      exposure:
        include: 'bindings'

... 和 application-numbers.yml:

spring:
  cloud:
    stream:
      poller:
        fixedDelay: 5000
      bindings:
        producer-out-0:
          destination: numbers-raw
          producer:
            partitionCount: 3
        consumer-in-0:
          destination: numbers-raw
      kafka:
        bindings:
          producer-out-0:
            producer:
              topic.properties:
                # These look weird because they're done as an exercise.
                retention.bytes: 10000
                retention.ms: 172800000
    function:
      definition: producer;consumer

我正在使用主机网络上的 docker-compose kafka 和 zookeeper 在 localhost 环境中进行测试。

谢谢!

【问题讨论】:

    标签: spring-kafka spring-cloud-stream spring-boot-actuator


    【解决方案1】:

    目前不支持生产者绑定的生命周期控制,仅支持消费者绑定。

    【讨论】:

    猜你喜欢
    • 2021-02-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-16
    • 1970-01-01
    • 1970-01-01
    • 2018-03-09
    相关资源
    最近更新 更多