【问题标题】:How to send custom header from spring-cloud-stream (aws kinesis binder) to "legacy" spring integration consumer如何将自定义标头从 spring-cloud-stream(aws kinesis binder)发送到“旧版”spring 集成使用者
【发布时间】:2021-08-14 03:40:12
【问题描述】:

我有两个应用程序 - 第一个使用带有 AWS Kinesis Binder 的 spring-cloud-stream/function 生成消息,第二个是基于 spring 集成构建以使用消息的应用程序。两者之间的通信不是问题——我可以从“流”发送消息并在“集成”中轻松处理。

当我想发送自定义标头时,就会出现问题。标头以“新”格式(开头有 0xff 等)作为嵌入式标头到达消费者 - 请参阅 spring-cloud-stream 中的 AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable。

但是,KinesisMessageDrivenChannelAdapter (spring-integration-aws) 似乎不理解“新”嵌入标头形式。它使用无法“解码”消息的 EmbeddedJsonHeadersMessageMapper(请参阅#toMessage)。由于嵌入的标头中包含附加信息(0xff 等),它会抛出 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

我需要通过线路发送标头(标头用于在另一侧进行路由),因此在生产者上“关闭”标头不是一个选项。我看不到使用“旧”嵌入式标题的方法。

我想在生产者端使用 spring-cloud-stream/function - 这太棒了。我希望我可以重做消费者,但是......

我可以编写自己的嵌入式标头映射器来理解新格式(使用 EmbeddedHeaderUtils),并将其连接到 KinesisMessageDrivenChannelAdapter。

鉴于 spring-cloud-stream 和 spring-integration 之间的密切关系,我一定是做错了什么。 Spring Integration 是否有一个可以理解新嵌入表单的 OutboundMessageMapper?

或者有没有办法强制 Spring Cloud Stream 使用不同的嵌入策略?

我可以在生产者端使用 Spring Integration。 (悲伤的脸)。

有什么想法吗?提前致谢。

【问题讨论】:

    标签: spring-integration spring-cloud-stream spring-integration-aws


    【解决方案1】:

    了解新格式

    这不是“新”格式,它是 Spring Cloud Stream 创建的一种格式,最初是为 Kafka 设计的,它只在 0.11 中添加了 header 支持。

    我可以编写自己的嵌入式标头映射器来理解新格式(使用EmbeddedHeaderUtils),并将其连接到KinesisMessageDrivenChannelAdapter

    我建议您这样做,并考虑将contributing it to the core Spring Integration ProjectEmbeddedJsonHeadersMessageMapper 放在一起,以便它可以与所有不支持本机标头的技术一起使用。

    【讨论】:

      猜你喜欢
      • 2019-04-19
      • 1970-01-01
      • 2019-10-03
      • 1970-01-01
      • 1970-01-01
      • 2019-08-22
      • 2020-02-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多