【问题标题】:Error While Deserializing object from Kafka Streams从 Kafka Streams 反序列化对象时出错
【发布时间】:2017-12-12 13:19:32
【问题描述】:

在写入主题时,在 kafka 流端,我使用来自 kafka 的 Serdes.String() 序列化程序序列化了窗口键 [test_id@timestamp1/timestamp2]。从另一个应用程序检索相同的密钥时,我在反序列化时遇到以下错误

com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token
 at [Source: [B@37e7c0b2; line: 1, column: 1]
        at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
        at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)
        at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)
        at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
        at TestAlert$3.extract(TestAlert.java:483)
        at TestAlert$3.extract(TestAlert.java:1)
        at org.apache.ignite.stream.StreamAdapter.addMessage(StreamAdapter.java:181)
        at org.apache.ignite.stream.kafka.KafkaStreamer.access$100(KafkaStreamer.java:47)
        at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:156)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

以下是我为序列化 Windowed Key 编写的代码。这里testWinAlerts是开窗后的聚合结果,以<Windowd<String>>为键

testWinAlerts.toStream((k,v)->k.toString()).filter((k,v)->{
                return (v!=null);}).to(Serdes.String(),aggrMessageSerde,"Some-Topic");

下面是反序列化器中的代码,用于将字节[]再次转换为字符串格式的键。其中 msg.key()[特定于 Ignite] 在从主题消费后以字节格式提供密钥。

String windowKey = objectMapper.readValue(msg.key(), String.class); 

在进一步的测试中,我还尝试在将其写入 kafka 主题之前从窗口字符串中删除 "@", "/", "[", "]" 字符,然后它起作用了。但在实际实现中,我不想增加从字符串中删除这些字符的额外开销。那么如何消除这个错误呢?

【问题讨论】:

    标签: java json serialization apache-kafka apache-kafka-streams


    【解决方案1】:

    您正在使用StringSerde 将输入序列化为字符串,但随后您尝试使用Jackson 对其进行反序列化,Jackson 期望将 JSON 字符串 作为其输入.常规字符串可以是任何一系列字符。但是 JSON 字符串看起来像 "string"——根据定义,它以 " 开头和结尾。因此,您不能使用 Jackson 反序列化任何字符串,在其序列化状态下,它必须以 " 开头和结尾。为什么不直接使用StringSerde 来反序列化密钥?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-08
      • 2019-01-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多