【问题标题】:How to get my topic name from message header and publish如何从消息头中获取我的主题名称并发布
【发布时间】:2019-03-08 12:30:33
【问题描述】:

我是弹簧集成的新手。我正在尝试构建一个 spring rest 服务,它将获取带有一些 json 消息的任何 HTTP 请求并发布到 kafka 主题。

我的 json 消息将通过 RequestBody 发布,其中将在消息头中包含主题名称。

我可以将消息从我的控制器发布到 kafka 频道,但是我很难从我的 json 消息头中获取主题名称。

谁能建议一种从我的消息头中获取主题名称的方法(通常一个 HTTP 请求包含带有主题名称的 json 消息)并使用该主题来发布消息。

我的 json :

{"resourceType": "MessageHeader",
"topicName": "testToptic",
"messagePayload":{
    "location": "chennai",
    "messageDetail": {
        "department-id": 123,
        "department-name": "SSS",
        "pincode": 600009
    }
}
}}

这里是我的 bean 和处理程序

@Bean
public IntegrationFlow hanldeGenericKafka() {
    return IntegrationFlows.from(sendToKafkaChannel)

            .handle(
                    kafkaGenericMessageHandler(producerFactory),
                    e -> e.id("kafkaProducer2"))
            .get();
}

public KafkaProducerMessageHandlerTemplateSpec<String, String> kafkaGenericMessageHandler(
        ProducerFactory<String, String> producer) {

    return Kafka
            .outboundChannelAdapter(producer)
            .sync(true)
            .headerMapper(kafkaDefaultHeaderMapper())
            .messageKey(m -> m.getHeaders()
                    .get("topicname"))
            .configureKafkaTemplate(t -> t.id("kafkaTemplate"));
}

【问题讨论】:

    标签: json apache-kafka http-headers spring-integration kafka-producer-api


    【解决方案1】:

    您可以使用带有内置 JsonPath SpEL function 的表达式从 JSON 有效负载中提取字段值。

    使用适配器的.topicExpression() 中的表达式。

    【讨论】:

      猜你喜欢
      • 2019-06-20
      • 1970-01-01
      • 2020-03-05
      • 2023-04-04
      • 2015-08-01
      • 2020-07-08
      • 1970-01-01
      • 1970-01-01
      • 2019-02-10
      相关资源
      最近更新 更多