【发布时间】:2020-01-15 07:12:05
【问题描述】:
我正在创建一个流处理应用程序。它应该创建一个 kafka 流连接。当消息到达时,我需要做以下事情:
- 检查消息类型
- 通过调用特定的TypeProcessing Service 处理消息
-
结束于根据消息类型决定的特定主题
public java.util.function.Consumer<KStream<String, String>> process() { String topic; return input -> input.map((key, value) -> { //check type and ask object from factory try { JSONObject msg = Util.getObjectMapper().readValue(value, JSONObject.class); String type = msg.get("type").toString(); if(type.equalsIgnoreCase("test")){ //processing started msgTypeHandlerFactory .createInstance(type) .enrichAndRelay(msg); System.out.println("IN"); } else{ input.to("notStream"); System.out.println("OUT"); } } catch (JsonProcessingException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return KeyValue.pair(key, value); }) .to("output_topic"); }上述代码的问题是我正在使用 map 函数,它使我能够使用 .to() 函数来发送流。 我想要的是检查每条消息的类型,然后进行处理,并相应地发送到另一个流。 为此,我应该使用不给我 .to() 函数的 foreach 函数,所以我必须创建另一个 Kafka Producer 来完成这项工作。
要求:
- 在处理下一个 msg 之前,每个 msg 都应该在流函数的帮助下处理和发送,而不是使用另一个 kafka 生产者
- 如果满足要求,那么我应该能够将消息发送到将根据类型动态决定的主题。
【问题讨论】:
标签: apache-kafka apache-kafka-streams spring-kafka