【发布时间】:2019-04-20 18:02:50
【问题描述】:
我想在单个 Kafka 流应用程序中同时使用处理器 API 和 DSL。此外,如何在单个应用程序中构建和运行多个拓扑(比如 1 使用处理器 API 和其他使用 DSL。)
【问题讨论】:
标签: kafka-consumer-api apache-kafka-streams
我想在单个 Kafka 流应用程序中同时使用处理器 API 和 DSL。此外,如何在单个应用程序中构建和运行多个拓扑(比如 1 使用处理器 API 和其他使用 DSL。)
【问题讨论】:
标签: kafka-consumer-api apache-kafka-streams
您可以轻松混合 DSL 和处理器 API。
我如何理解您想使用这两种方法构建处理图,对于 DSL,您可以调用 StreamsBuilder::stream,对于处理器 API,您调用 StreamsBuilder::build() 以获取 Topology,然后应用函数来添加处理器等。
源代码会是这样的:
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input1").to("output1");
Topology topology = builder.build();
topology.addSource("inputNode","input2");
topology.addProcessor("processor1", InputProcessor::new, "inputNode");
topology.addSink("sink1", "output2", "processor1");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
EDIT1:
您可以使用 DSL 构建两个拓扑,并行运行并监听不同的主题。它可以像@Matthias J. Sax 提到的KStream::transform(...)、KStream::transformValues(...) 和KStream::process(...) 那样完成。代码将是这样的:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);
【讨论】:
transform()、transformValues()和process()应该更简单:docs.confluent.io/current/streams/developer-guide/…