【问题标题】:Connecting Multiple Source Streams to The Same Set of Branches将多个源流连接到同一组分支
【发布时间】:2018-11-24 03:48:48
【问题描述】:

这是question的概括。

假设我有多个应用相同谓词集的源流。我想设置分支流,以便满足谓词的记录,无论哪个源流,都由同一分支流处理。如下图所示,每个分支流就像一个通用处理器,用于转换传入的记录。

以下代码块无法正常工作,因为它为每个源流创建了一组不同的分支流。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");

Predicate<String, String>[] branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
    int idx = i;
    branchPredicates[i] = ((key, value) ->
        key.hashCode() % forkCount == idx);
}

Kstream<String, String>[] forkStreams = Arrays.asList(source1, source2)
    .map(srcStream -> srcStream.branch(branchPredicates)
    .flatMap(x -> Arrays.stream())
    .collect(Collectors.toList());

抱歉,我主要是 scala 开发人员 :)

在上面的例子中,forkStreams.length == branchPredicates.length x 2 并且通常与源流的数量成正比。 Kafka 流中是否有一个技巧可以让我在谓词和 fork 流之间保持一对一的关系?

2018 年 11 月 27 日更新 我可以取得一些进展:

  • 使用一个源流从所有源主题中读取
  • 将源流连接到多个分支
  • 将消息平均分配到各个分支。

但是,正如以下代码块所示,ALL 叉流存在于同一线程中。我想要实现的是将每个 fork 流放到不同的线程中,以提高 CPU 利用率

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
    .parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String>[] predicates = new Predicate[totalPerdicates];
IntStream
    .range(0, totalPerdicates)
    .forEach(i -> {
        int idx = i;
        predicates[i] = (key, value) ->
            key.hashCode() % totalPerdicates == idx;
    });

forkStreams = Arrays.asList(sourceStreams.branch(predicates));

// Hack- Dump the number of messages processed every 10 seconds
forkStreams
    .forEach(fork -> {
        KStream<Windowed<String>, Long> tbl =
        fork.transformValues(new SourceTopicValueTransformerSupplier())
            .selectKey((key, value) -> "foobar")
            .groupByKey()
            .windowedBy(TimeWindows.of(2000L))
            .count()
            .toStream();

        tbl
            .foreach((key, count) -> {
                String fromTo = String.format("%d-%d",
                                              key.window().start(),
                                              key.window().end());
                System.out.printf("(Thread %d, Index %d) %s - %s: %d\n",
                                  Thread.currentThread().getId(),
                                  forkStreams.indexOf(fork),
                                  fromTo, key.key(), count);
            });

这是输出的sn-p

<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>

对于如何将每个分叉流放在不同线程中的任何建议表示赞赏。

【问题讨论】:

  • 你为什么不同时阅读这两个主题:StreamsBuilder.stream("x", "y");
  • 一个线程读取所有主题的量太大。我试图避免将 CPU 与源流挂钩。
  • 主题通过分区进行扩展——Kafka Streams 也可以按分区进行扩展。因此,如果两个输入主题都有 10 个分区,则最多可以运行 10 个线程,每个线程将处理 2 个分区。这对你有用吗?
  • 我知道按分区缩放的概念。这是我需要超过 1 个 CPU 来处理的单个分区中的记录。我想看到的是源流只是继续读取记录并将它们交给通用处理器(分支)。
  • 对于这种情况,您需要保持拓扑不变,并将to() 语句添加到拓扑的两个并行运行(未连接)部分。多次写入同一个主题是可以的——当然,这种情况下不会有任何排序保证。 Kafka Streams 仅使用多个线程处理两个不同主题的分区,如果它们在拓扑图中未连接。

标签: java java-8 apache-kafka apache-kafka-streams


【解决方案1】:

2018 年 11 月 27 日的更新已经回答了这个问题。话虽如此,该解决方案对我不起作用,因为我希望每个 fork 流都作为单独的线程运行。调用stream.branch() 在同一线程空间内创建多个子流。因此,一个分区内的所有记录都在同一个线程空间中处理。

为了实现子分区处理,我最终使用了kafka客户端API,结合java线程和并发队列。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-04-20
    • 2011-06-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-07
    相关资源
    最近更新 更多