【发布时间】:2021-03-22 21:50:32
【问题描述】:
在我的应用程序中,streamlistner 从多个主题消费了 kafka 消息,但我想为一个主题添加功能性消费者 bean,这可能是一些主题由 streamlistner 消费,而一些主题与消费者 bean 在同一个应用程序中? 当我尝试创建的主题仅由 streamlistner 消费而不是为消费者 bean 创建?
application.yml
spring:
cloud:
function:
definition: printString
stream:
kafka:
binder:
brokers: localhost:9092
zkNodes: localhost:2181
autoCreateTopics: true
autoAddPartitions: true
bindings:
printString-in-0:
destination: function-input-topic
group: abc
consumer:
maxAttempts: 1
partitioned: true
concurrency: 2
xyz-input:
group: abc
destination: xyz-input-input
consumer:
maxAttempts: 1
partitioned: true
concurrency: 2
主题是为 xyz 输入主题创建的,但不是为函数输入主题创建的
消费豆
import java.util.function.Consumer;
@Component
public class xyz {
@Bean
Consumer<String> printString() {
return System.out::print;
}
}
kafkaConfig 接口
public interface KafkaConfig {
@Input("xyz-input")
SubscribableChannel inbound();
}
【问题讨论】:
标签: apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka