【发布时间】:2021-08-14 03:40:12
【问题描述】:
我有两个应用程序 - 第一个使用带有 AWS Kinesis Binder 的 spring-cloud-stream/function 生成消息,第二个是基于 spring 集成构建以使用消息的应用程序。两者之间的通信不是问题——我可以从“流”发送消息并在“集成”中轻松处理。
当我想发送自定义标头时,就会出现问题。标头以“新”格式(开头有 0xff 等)作为嵌入式标头到达消费者 - 请参阅 spring-cloud-stream 中的 AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable。
但是,KinesisMessageDrivenChannelAdapter (spring-integration-aws) 似乎不理解“新”嵌入标头形式。它使用无法“解码”消息的 EmbeddedJsonHeadersMessageMapper(请参阅#toMessage)。由于嵌入的标头中包含附加信息(0xff 等),它会抛出 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')。
我需要通过线路发送标头(标头用于在另一侧进行路由),因此在生产者上“关闭”标头不是一个选项。我看不到使用“旧”嵌入式标题的方法。
我想在生产者端使用 spring-cloud-stream/function - 这太棒了。我希望我可以重做消费者,但是......
我可以编写自己的嵌入式标头映射器来理解新格式(使用 EmbeddedHeaderUtils),并将其连接到 KinesisMessageDrivenChannelAdapter。
鉴于 spring-cloud-stream 和 spring-integration 之间的密切关系,我一定是做错了什么。 Spring Integration 是否有一个可以理解新嵌入表单的 OutboundMessageMapper?
或者有没有办法强制 Spring Cloud Stream 使用不同的嵌入策略?
我可以在生产者端使用 Spring Integration。 (悲伤的脸)。
有什么想法吗?提前致谢。
【问题讨论】:
标签: spring-integration spring-cloud-stream spring-integration-aws