【发布时间】:2021-02-18 08:57:36
【问题描述】:
所以我根据一些建议进行了更新。但是流应用程序会在一段时间后终止。没有表演。 ide 显示的以下代码中没有错误。最后,我将数据发送到主题,因为键等于字符串,值作为 json 对象。还是不行。
我猜它是一条线或其他东西,但不确定我是否正确。请。还附上了下面的错误截图。
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
JSONObject jsnObj = new JSONObject();
......(word count manipulationover part over here)
KTable<Windowed<String>, Long> Ttable = TgroupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
Ttable
.toStream()
.selectKey((key, word) -> key.key())
.map((key, value) -> {
JSONParser par = new JSONParser();
StringWriter sw = new StringWriter();
KeyValue<String, JsonNode> kv = null;
try {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree("{ \"word\": \"" + key + "\" \",\" count: \"" + value + "\" }");
KeyValue.pair(key.concat("s"), jsonNode);
kv = KeyValue.pair(key.concat("s"), jsonNode);
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return kv;
}
)
.to("badliar", Produced.with(Serdes.String(), jsonSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
【问题讨论】:
标签: java json apache-kafka apache-kafka-streams