【问题标题】:Is RxJava a good fit for branching workflows?RxJava 是否适合分支工作流?
【发布时间】:2018-07-18 10:40:23
【问题描述】:

我正在使用 RxJava 处理我们从队列中提取的一些通知。

RxJava 似乎在一个简单的工作流程上工作得很好,现在随着新需求的出现,流程变得越来越复杂,分支越来越多(请参见下图作为参考) 我试图用一个小单元测试来举例说明流程:

@Test
public void test() {
    Observable.range(1, 100)
        .groupBy(n -> n % 3)
        .toMap(GroupedObservable::getKey)
        .flatMap(m1 -> {
            Observable<Integer> ones1 = m1.get(0);
            Observable<Integer> twos1 = m1.get(1).map(n -> n - 10);
            Observable<Integer> threes = m1.get(2).map(n -> n + 100);
            Observable<Integer> onesAndTwos = Observable.merge(ones1, twos1)
                .map(n -> n * 3)
                .groupBy(n -> n % 2)
                .toMap(GroupedObservable::getKey)
                .flatMap(m2 -> {
                    Observable<Integer> ones2 = m2.get(0).map(n -> n * 10);
                    Observable<Integer> twos2 = m2.get(1).map(n -> n * 100);
                    return Observable.merge(ones2, twos2);
                });
                return Observable.merge(onesAndTwos, threes).map(n -> n +1);
        })
        .subscribe(System.out::println);
}

虽然使用 RxJava 在技术上仍然可以实现,但我现在想知道这是否是一个不错的选择,因为要正式化分支,我必须在主 flatMap 内进行 2 级嵌套,这看起来不太整洁。

这是描述上述工作流程的正确方式吗?还是 RxJava 不适合分支工作流?

感谢您的帮助!

【问题讨论】:

  • 请注意,没有什么会限制您在单个表达式中生成这样的 Rx 图。您可以轻松地将流程中内部点的可观察值存储在变量或字段中,然后单独使用它们。这可以让您将此流程拆分为更清晰的模块,而不是将其显示为一个整体,这至少有点令人生畏。
  • 确实如此。但我也想知道在 RxJava 中是否有更好的方法来处理分支,也许是使用本地操作符。最后的要点是,在groupBy 之后,我需要同时处理所有组,这样我就可以一起操作它们。这就是我使用toMap 的原因,但这也是创建嵌套的原因。也许有更好的方法?

标签: java functional-programming rx-java reactive-programming rx-java2


【解决方案1】:

分组 observable 是 AFAIK 的正确方法。就个人而言,如果您的图片中“按类型拆分”和“合并所有内容”之间的任何内容都是异步的,那么在 RX 中执行此操作肯定有很多优势,例如重试逻辑、缓冲、错误处理、背压等。如果是常规的非异步代码我猜这是个人喜好。您可以使用 RX 来完成,但您也可以使用常规同步代码在“按类型拆分”和“合并所有内容”之间进行任何操作。

无论您选择哪种方式,拆分代码以使其更具可读性始终是一个好主意,这样您就可以像阅读您附加的图片一样简单地“阅读流程”。

【讨论】:

  • 谢谢,我同意你的观点,但我仍在努力了解 RxJava 是否适合分支流程。在上面的示例中,我使用toMap 能够在下一阶段同时对所有组进行推理,因此我可以将它们合并在一起。我刚刚了解到toMap 正在阻塞,因为它需要解析所有键才能发出地图,这根本不是很好。 groupBy 是要走的路,但是很难同时与这些新组一起工作,因此您可以与他们一起做其他事情。我仍然看不到像我的 RxJava 示例中那样对分支流进行编码的干净方法。
  • 我不确定你想要达到什么目的。您基本上可以选择三种方式来处理这些项目:阻塞toMap 并获取所有内容,使用groupBy 进行非阻塞流以使其流式传输,或者在某处之间放置一个缓冲区/窗口,以便您可以批量获取所有内容(并且可能正如你提到的那样“一次性”做一些事情)。
【解决方案2】:

只是另一种可能对您有用的方法的想法:您可以多播源并单独处理分支,而不是分组/toMap。

例子:

@Test
public void multicastingShare() {
    final Observable<Integer> sharedSource = Observable.range(1, 10)
            .doOnSubscribe(dummy -> System.out.println("subscribed"))
            .share();
    // split by some criteria
    final Observable<String> oddItems = sharedSource
            .filter(n -> n % 2 == 1)
            .map(odd -> "odd: " + odd)
            .doOnNext(System.out::println);
    final Observable<String> evenItems = sharedSource
            .filter(n -> n % 2 == 0)
            .map(even -> "even: " + even)
            .doOnNext(System.out::println);

    // recombine the individual streams at some point
    Observable.concat(oddItems, evenItems)
            .subscribe(result -> System.out.println("result: " + result));
}

video 可能会有所帮助(至少前 15 分钟)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-09-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-26
    • 1970-01-01
    • 1970-01-01
    • 2020-12-05
    相关资源
    最近更新 更多