【发布时间】:2017-12-12 13:19:32
【问题描述】:
在写入主题时,在 kafka 流端,我使用来自 kafka 的 Serdes.String() 序列化程序序列化了窗口键 [test_id@timestamp1/timestamp2]。从另一个应用程序检索相同的密钥时,我在反序列化时遇到以下错误
com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token
at [Source: [B@37e7c0b2; line: 1, column: 1]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at TestAlert$3.extract(TestAlert.java:483)
at TestAlert$3.extract(TestAlert.java:1)
at org.apache.ignite.stream.StreamAdapter.addMessage(StreamAdapter.java:181)
at org.apache.ignite.stream.kafka.KafkaStreamer.access$100(KafkaStreamer.java:47)
at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
以下是我为序列化 Windowed Key 编写的代码。这里testWinAlerts是开窗后的聚合结果,以<Windowd<String>>为键
testWinAlerts.toStream((k,v)->k.toString()).filter((k,v)->{
return (v!=null);}).to(Serdes.String(),aggrMessageSerde,"Some-Topic");
下面是反序列化器中的代码,用于将字节[]再次转换为字符串格式的键。其中 msg.key()[特定于 Ignite] 在从主题消费后以字节格式提供密钥。
String windowKey = objectMapper.readValue(msg.key(), String.class);
在进一步的测试中,我还尝试在将其写入 kafka 主题之前从窗口字符串中删除 "@", "/", "[", "]" 字符,然后它起作用了。但在实际实现中,我不想增加从字符串中删除这些字符的额外开销。那么如何消除这个错误呢?
【问题讨论】:
标签: java json serialization apache-kafka apache-kafka-streams