【问题标题】:Kafka Streams : Multiple topologies in a single applicationKafka Streams:单个应用程序中的多个拓扑
【发布时间】:2019-04-20 18:02:50
【问题描述】:

我想在单个 Kafka 流应用程序中同时使用处理器 API 和 DSL。此外,如何在单个应用程序中构建和运行多个拓扑(比如 1 使用处理器 API 和其他使用 DSL。)

【问题讨论】:

    标签: kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    您可以轻松混合 DSL 和处理器 API。 我如何理解您想使用这两种方法构建处理图,对于 DSL,您可以调用 StreamsBuilder::stream,对于处理器 API,您调用 StreamsBuilder::build() 以获取 Topology,然后应用函数来添加处理器等。

    源代码会是这样的:

    StreamsBuilder builder = new StreamsBuilder();
    builder.<String, String>stream("input1").to("output1");
    Topology topology = builder.build();
    topology.addSource("inputNode","input2");
    topology.addProcessor("processor1", InputProcessor::new, "inputNode");
    topology.addSink("sink1", "output2", "processor1");
    
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    

    EDIT1:

    您可以使用 DSL 构建两个拓扑,并行运行并监听不同的主题。它可以像@Matthias J. Sax 提到的KStream::transform(...)KStream::transformValues(...)KStream::process(...) 那样完成。代码将是这样的:

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
    KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);
    

    【讨论】:

    • 使用transform()transformValues()process()应该更简单:docs.confluent.io/current/streams/developer-guide/…
    • 那么,我们可以让 2 个拓扑并行运行并监听不同的主题吗?
    • @user1768610,是的,你可以。我已经用示例代码更新了我的答案。
    • @wardziniak:谢谢
    猜你喜欢
    • 1970-01-01
    • 2017-06-09
    • 1970-01-01
    • 2020-04-05
    • 2019-10-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-28
    相关资源
    最近更新 更多