【发布时间】:2021-10-23 23:46:14
【问题描述】:
我有一个带有 Actuator 和 Kafka 活页夹的 Spring Cloud Stream 项目。我正在探索bindings/ 执行器,并试图阻止生产者作为练习。我通过 curl 发出以下 POST 请求:
curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'
实际结果:
查询返回 204。生产者的状态(从 GET /actuator/bindings/producer-out-0 看到)现在是 stopped。但是,生产者仍在生产消息,这可以从主题上的日志记录和消费者活动中看出。
预期结果: 我希望生产者停止生产消息。 (我也尝试过使用 PAUSED 状态,它也返回 204,但是错误日志表明无法暂停此生产者。)
我是否误解了这个执行器的工作原理?当生产者停止时,是否期望 S.C.S.会继续投票给那个制片人吗?我知道的唯一文档是here,但据我所知,它并没有回答我的问题。
背景:
我正在使用 spring-boot-starter-parent 2.5.3 并将 starter-web 和 starter-actuator 列为依赖项。我不认为我缺少任何东西。
这是生产者/消费者对。如您所见,我正在使用可轮询的供应商。
@Configuration
@Profile("numbers")
public class NumberHandlers {
private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);
@Bean
public Supplier<Integer> producer() {
// Needed an effectively-final mutable integer. Side-bar comments welcome. :P
var counter = new AtomicInteger();
return () -> {
var n = counter.getAndIncrement();
LOGGER.info("Producing number: " + n);
return n;
};
}
@Bean
public Consumer<Integer> consumer() {
return it -> LOGGER.info("Consuming number: " + it);
}
}
当我传入numbers 个人资料时,这些都是活动的。我的配置如下。
application.yml:
server:
port: 8081
spring:
cloud:
stream:
kafka:
binder:
brokers: ${env.kafka.bootstrapservers:localhost}
management:
endpoints:
web:
exposure:
include: 'bindings'
... 和 application-numbers.yml:
spring:
cloud:
stream:
poller:
fixedDelay: 5000
bindings:
producer-out-0:
destination: numbers-raw
producer:
partitionCount: 3
consumer-in-0:
destination: numbers-raw
kafka:
bindings:
producer-out-0:
producer:
topic.properties:
# These look weird because they're done as an exercise.
retention.bytes: 10000
retention.ms: 172800000
function:
definition: producer;consumer
我正在使用主机网络上的 docker-compose kafka 和 zookeeper 在 localhost 环境中进行测试。
谢谢!
【问题讨论】:
标签: spring-kafka spring-cloud-stream spring-boot-actuator