【问题标题】:Spring Cloud Streams not setting the kafka key in the message?Spring Cloud Streams 没有在消息中设置 kafka 键?
【发布时间】:2017-03-25 00:14:54
【问题描述】:

故事是这样的。我有一个 kafka 代理和一个特定对象(我将其 jsonify 以通过我的主题发送),它有一个我想用作键的 ID。

目前我正在使用“partitionKeyExtractorClass”配置来设置提取ID并将其作为键返回的类。

看起来像这样:

def extractKey(Message<?> message) {
    log.info('Extracting key from message')
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
    log.info("Got = ${id}")

    return id
}

我的实际问题是,当我浏览有关该主题的消息时,保存我的消息的 ConsumerRecord 说密钥为空...

这是一个错误吗?难道我做错了什么?这方面的文档并没有比这更进一步。

【问题讨论】:

    标签: spring spring-boot apache-kafka spring-cloud spring-cloud-stream


    【解决方案1】:

    你看,你把partitionkey混在一起了。

    目前KafkaMessageChannelBinder 不提供用于确定keyMessage 的选项。

    只有您可以强大使用的现有功能是KafkaHeaders.MESSAGE_KEY

        Object messageKey = this.messageKeyExpression != null
                ? this.messageKeyExpression.getValue(this.evaluationContext, message)
                : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);
    

    因此,在output 消息之前,您应该计算密钥并将其放入该标头中。

    【讨论】:

    • 这不是一个可接受的解决方案。我有同样的问题。我使用原始模式输出。我不希望我的消息中有任何标题,但需要设置密钥。我不想切换到 apache camel。
    • 您的问题不清楚。这是当前 Kafka Binder 实现的一个限制,mesageKey 只是一个遗漏。不过,请随意填写问题。如果您非常关心的话,您可以配置 Kafka Binder 现在不映射该标头。而且我完全不明白 Apache Camel 是如何替代 Spring Cloud Stream 的……
    • 问题是我正在与 Spring Cloud Streams 之外的其他生产者和消费者集成流。消息密钥对于保证 kafka 主题的顺序很重要。拥有 Spring Cloud Streams 控制标头意味着所有其他生产者和消费者必须处理 Spring Cloud Streams 特定的标头。我需要 kafka 消息正是我想要的,而不是 Spring Cloud Streams 想要的。我需要一个简单的键和字符串消息。原始模式给了我这个,但没有维持秩序的关键。
    • 和 Apache Camel 完全一样的能力;读取消息流,以某种方式处理它们,并从处理后的数据中路由/生成新消息。
    猜你喜欢
    • 2021-04-17
    • 1970-01-01
    • 2019-06-24
    • 1970-01-01
    • 1970-01-01
    • 2019-08-27
    • 2021-03-15
    • 2022-11-19
    • 2021-06-12
    相关资源
    最近更新 更多