【发布时间】:2019-03-08 12:30:33
【问题描述】:
我是弹簧集成的新手。我正在尝试构建一个 spring rest 服务,它将获取带有一些 json 消息的任何 HTTP 请求并发布到 kafka 主题。
我的 json 消息将通过 RequestBody 发布,其中将在消息头中包含主题名称。
我可以将消息从我的控制器发布到 kafka 频道,但是我很难从我的 json 消息头中获取主题名称。
谁能建议一种从我的消息头中获取主题名称的方法(通常一个 HTTP 请求包含带有主题名称的 json 消息)并使用该主题来发布消息。
我的 json :
{"resourceType": "MessageHeader",
"topicName": "testToptic",
"messagePayload":{
"location": "chennai",
"messageDetail": {
"department-id": 123,
"department-name": "SSS",
"pincode": 600009
}
}
}}
这里是我的 bean 和处理程序
@Bean
public IntegrationFlow hanldeGenericKafka() {
return IntegrationFlows.from(sendToKafkaChannel)
.handle(
kafkaGenericMessageHandler(producerFactory),
e -> e.id("kafkaProducer2"))
.get();
}
public KafkaProducerMessageHandlerTemplateSpec<String, String> kafkaGenericMessageHandler(
ProducerFactory<String, String> producer) {
return Kafka
.outboundChannelAdapter(producer)
.sync(true)
.headerMapper(kafkaDefaultHeaderMapper())
.messageKey(m -> m.getHeaders()
.get("topicname"))
.configureKafkaTemplate(t -> t.id("kafkaTemplate"));
}
【问题讨论】:
标签: json apache-kafka http-headers spring-integration kafka-producer-api