【问题标题】:Reconstructing nested Observables重构嵌套的 Observables
【发布时间】:2015-12-10 23:11:18
【问题描述】:

这是一个初学者的问题。我正在将多个进程的输出流回客户端,并且我想将每个流存储到一个单独的文件中。

所以我想要的是(大写字母表示嵌套流结束)

S: -a-a-a-b-b-a-A-c-b-B-c-c-...-d--C...D-e--...-z-E-z--z--...

R:  a-a-a-----a-A (complete)
          b-b-------b-B (complete)
                  c-----c-c--------C (complete)
                                d------D (complete)
                                         e--------E (complete)

                     .
                     .
                     . 
       (end many more nested streams coming)
                     .

所以我想要一个像 Observables 的动态工厂这样的东西。类似于using(),但据我了解using() 创建的Observable 与原始Observable 一样长,而我想在每次嵌套流完成时完成并关闭文件。

重要 - 我不想在内存中缓冲,因为这些是非常长的流(长时间运行的进程的输出)。所以我想避免buffer()groupBy()

【问题讨论】:

    标签: system.reactive reactive-programming rx-java


    【解决方案1】:

    如果您可以从一个值确定它是否是特定字母的最终值,则可以在内部组中使用 groupBytakeUntil(您可能需要在 lambda 中进行一些尝试捕获):

    source
    .groupBy(v -> v.type)
    .flatMap(g -> 
         Observable.using(
             () -> createOutput(g.getKey()), 
             f -> g.takeUntil(v -> v.isEnd).doOnNext(v -> f.write(v))),
             f -> f.close()
         )
    )
    .subscribe(...)
    

    TakeUntil 确保 groupBy 只保留子流,只要我们期望它的值(如果源正确排序,groupBy 将不会重新创建组)。

    如果你真的想避免groupBy,你必须手动跟踪每个打开的文件并在适当的时候关闭它们:

    Observable.using(
        HashMap::new,
        map -> source
           .doOnNext(v -> {
               Output o = map.get(v.type);
               if (o == null) {
                   o = new Output(v.type);
                   map.put(v.type, o);
               }
               o.write(v);
               if (v.isEnd) {
                   o.close();
                   map.remove(v.type);
               }
           }),
        map -> map.values().forEach(e -> e.close())
    )
    .subscribe(...);
    

    【讨论】:

    • 感谢您的快速回复。我正在看第二部分(因为我真的不想使用groupBy())它看起来像工作,但这似乎不是一个被动的解决方案。它只是一个侦听器,根据其私有 hashmap 调度数据。所以这是一种标准的命令方式。有没有办法以纯反应的方式解决问题?
    • 选项 #1 更具反应性,如果您可以保留反应性文件库,您也可以完全异步保存。否则,如果可以更有效地解决您的问题,请不要害怕混合这两个概念。
    • 这不是恐惧。只是我想学习如何编写适当的反应式代码。就像我在编写 java 代码时不尝试内联汇编程序一样(不确定这是否可能),我不想将非反应性技术注入反应性代码中。选项#1 毫无疑问是反应式的,但它会在内存中累积数据。正如原始问题中已经解释的那样,这是我无法做到的。
    • 它保留对组的引用,但不保留项目。采用反应式和功能性通常会导致更多的内存使用和数据保留,这与您的需要相反。
    • 这是真的,函数会导致更多的内存使用,但反应性,旨在简化事件驱动系统的创建,这些系统应该更轻巧,适合处理非常长的事件流。这就是为什么 Rx 默认是单线程的。所以我的用例似乎非常适合反应式。请帮助我理解“它保留了对组的引用,但不保留项目。”。据我了解您的选项 #1 代码,如果我有 100 个未完成的 1GB 流,我有 100 GB 的内存。对吗?
    猜你喜欢
    • 2022-01-24
    • 2019-08-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-27
    • 1970-01-01
    • 2017-03-19
    • 2017-01-12
    相关资源
    最近更新 更多