【发布时间】:2021-09-11 02:31:18
【问题描述】:
我正在从事一个有 2 项服务的项目:读取、转换消息,然后写入另一个 Kafka。这两种服务的 Kafka 配置是不同的。这是我的 application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
sourcetopic1: topic1
destinationtopic1 : topic2
sourcetopic2: topic3
destinationtopic2 : topic4
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: TestCollector
client-id:TestCollector01
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
这些是我的两个服务的配置文件:
-
Service1KafkaConfig
public class KafkaConfig { @Bean public ReceiverOptions<String, String> kafkaReceiverOptions(@Value("${spring.kafka.sourcetopic1}") String topic, KafkaProperties kafkaProperties) { ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList(topic)); } @Bean public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) { return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions); }}
-
服务配置
public class Service2KafkaConfig { @Bean public ReceiverOptions<String, String> service2KafkaReceiverOptions(@Value("${spring.kafka.sourcetopic3}") String topic, KafkaProperties kafkaProperties) { ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList(topic)); } @Bean public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) { return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions); }}
我在各自的服务中自动装配这些 bean:
Service1:我没有为 service1 添加 ProcessRecord 方法,因为我觉得这个问题不需要它。如果需要,请告诉我。
@Slf4j
@Service
public class Service1 implements CommandLineRunner {
@Autowired
public ReactiveKafkaConsumerTemplate<String, String> service1KafkaConsumerTemplate;
public Flux<String> consume1() {
return service1KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.donOnNext(s->ProcessRecord(s))
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric1[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume1().subscribe();
}
}
服务2:
@Slf4j
@Service
public class Service2 implements CommandLineRunner {
@Autowired
@Qualifier("service2KafkaConsumerTemplate")
public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate;
public Flux<String> consume2() {
return service2KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume2().subscribe();
}
}
当我运行应用程序时,我只能看到一个消费者开始订阅 topic1。是否可以在同一个项目中运行多个 Kafka 消费者。如果是,请告诉我需要做什么才能让它们运行?
【问题讨论】:
-
是的;它应该可以正常工作。您需要显示您实际订阅的代码 (
.receive()...)。 -
我在订阅的地方添加了代码
标签: apache-kafka kafka-consumer-api spring-kafka project-reactor reactive-kafka