【问题标题】:Kafka Spring Integration: Headers not coming for kafka consumerKafka Spring 集成:kafka 消费者的标头不会出现
【发布时间】:2015-11-13 07:15:39
【问题描述】:

我正在使用 Kafka Spring Integration 使用 kafka 发布和使用消息。我看到 Payload 已正确地从生产者传递到消费者,但标头信息在某处被覆盖。

@ServiceActivator(inputChannel = "fromKafka")
public void processMessage(Message<?> message) throws InterruptedException,
        ExecutionException {
    try {
            System.out.println("Headers :" + message.getHeaders().toString());
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

我得到以下标题:

Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}

我正在像这样在生产者端构造消息:

Message<FeelDBMessage> message = MessageBuilder
                .withPayload(samplePayloadObj)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "key")
                .setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();

        // publish the message
        publisher.publishMessage(message);

以下是生产者的标头信息:

 headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}

知道为什么标头会被不同的值覆盖吗?

【问题讨论】:

    标签: spring spring-integration apache-kafka


    【解决方案1】:

    只是因为默认情况下框架使用 不可变 GenericMessage

    对现有消息(例如MessageBuilder.withPayload)的任何操作都会产生一个新的GenericMessage 实例。

    另一方面,Kafka 不支持任何 headers 抽象,如 JMS 或 AMQP。这就是为什么KafkaProducerMessageHandler 在向 Kafka 发布消息时会这样做:

    this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
    

    如您所见,它根本不发送headers。因此,另一方(消费者)仅将主题中的message 处理为payload,而将一些系统选项处理为headers,例如topicpartitionmessageKey

    简而言之:我们不通过 Kafka 传输标头,因为它不支持它们。

    【讨论】:

    • 感谢 Artem 的回复。但是在 kafka-spring-integration 文档中说:可以分别通过 kafka_topic 和 kafka_partitionId 标头自定义发布消息的目标主题和分区。还有一个代码 sn-p,它在发送消息时设置标题: channel.send( MessageBuilder.withPayload(payload) .setHeader(KafkaHeaders.MESSAGE_KEY, "key") .setHeader(KafkaHeaders.TOPIC, "test") .build() );
    • 它还说:“重要。KafkaHeaders 接口包含用于与标头交互的常量。messageKey 和主题默认标头现在需要 kafka_ 前缀” KafkaHeaders 类?
    • 供内部 Spring Integration 使用。例如发送动态主题解析的部分。请不要混合使用 Spring Integration Kafka Support 和 Apache Kafka 本身。您可以在 File 模块中找到类似的内容。
    • 知道了!再次感谢。但是你能建议任何替代方式(如果可能的话)将一些数据从生产者传递到消费者而不修改有效负载吗?
    • 是的...不。没有这样的方法。您始终可以使用 payload 作为旧的 Message&lt;?&gt; 构建新消息,并分别为生产者和消费者提供一些智能自定义 Kafka (De)Serializer
    猜你喜欢
    • 2016-11-11
    • 2023-03-20
    • 1970-01-01
    • 2019-10-06
    • 1970-01-01
    • 2016-08-19
    • 1970-01-01
    • 2019-05-09
    • 2019-04-07
    相关资源
    最近更新 更多