【问题标题】:Split events based on criteria and handle in order根据条件拆分事件并按顺序处理
【发布时间】:2016-09-26 13:15:22
【问题描述】:

有以下问题:给定具有 partitionId 属性的事件列表(例如 0-10),我希望根据 paritionId 拆分传入事件,以便按顺序处理具有相同 partitionId 的事件'收到。 如果分布或多或少均匀,这将导致并行处理 10 个事件(每个分区)。

除了创建 10 个单线程调度器并将事件发送到正确的调度器之外,还有没有办法使用 Project Reactor 完成上述工作?

谢谢。

【问题讨论】:

    标签: project-reactor


    【解决方案1】:

    下面的代码

    • 将源流分成多个分区,
    • 创建 ParallelFlux,每个分区一个“轨道”,
    • 将“rails”调度到单独的线程中,
    • 收集结果

    每个分区都有专用线程保证其值按原始顺序处理。

    @Test
    public void partitioning() throws InterruptedException {
        final int N = 10;
        Flux<Integer> source = Flux.range(1, 10000).share();
        // partition source into publishers
        Publisher<Integer>[] publishers = new Publisher[N];
        for (int i = 0; i < N; i++) {
            final int idx = i;
            publishers[idx] = source.filter(v -> v % N == idx);
        }
        // create ParallelFlux each 'rail' containing single partition
        ParallelFlux.from(publishers)
                // schedule partitions into different threads
                .runOn(Schedulers.newParallel("proc", N))
                // process each partition in its own thread, i.e. in order
                .map(it -> {
                    String threadName = Thread.currentThread().getName();
                    Assert.assertEquals("proc-" + (it % 10 + 1), threadName);
                    return it;
                })
                // collect results on single 'rail'
                .sequential()
                // and on single thread called 'subscriber-1'
                .publishOn(Schedulers.newSingle("subscriber"))
                .subscribe(it -> {
                    String threadName = Thread.currentThread().getName();
                    Assert.assertEquals("subscriber-1", threadName);
                });
        Thread.sleep(1000);
    }
    

    【讨论】:

    • 如果您添加一些对代码和使用的概念的解释,您的答案会更有用。比如为什么在发布者上叫share,为什么叫sequentialpublishOn
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-16
    • 1970-01-01
    • 2020-03-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多