【问题标题】:Single Stream and Multiple Subscribers单流和多订阅者
【发布时间】:2018-01-09 00:40:43
【问题描述】:

我正在使用 Java9 反应流和 RxJava2 进行测试。我对任何一个都没有偏好,但我正在寻找一些关于这是否可能的指导。

  1. 我正在创建预先配置的订阅者数量,如下所示:

    for(int i = 0; i<MAX_SUBSCRIBERS; i++) {  
         System.out.println("Creating subscriber: " + i);  
         publisher.subscribe(new MySubscriber<>(i + "-subscriber"));   
    }
    
  2. 我正在从目录中读取文件列表,以便同时上传到某些第 3 方系统。

    Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files"));
    paths
    .filter((Files::isRegularFile))
    .forEach(pathName -> publisher.submit(pathName.toString()));
    

我收到以下输出:

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    0-subscriber: /my/dir/with/files/test1.txt received in onNext
    1-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

理想情况下,我们应该看到以下行为。每个订阅者都应该在一个唯一的文件上执行工作。

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

这可能吗?任何提示都会很棒!

【问题讨论】:

    标签: rx-java reactive-programming rx-java2 java-9 reactive-streams


    【解决方案1】:

    Java 9 Flow API 由 4 个接口和 SubmissionPublisher 类组成,该类将每个提交的值分派给它的所有 Subscribers。目前没有 JDK 工具支持您的数据流。

    相比之下,RxJava 是一个丰富的 fluent 库,包含数百个运算符,您可以在其中执行并行处理而无需重复:

        ParallelFlowable<Path> pf = 
                Flowable.<Path, Stream<Path>>using(
                    () -> Files.list(Paths.get("/my/dir/with/files")),
                    files -> Flowable.fromIterable((Iterable<Path>)() -> files.iterator()),
                    AutoCloseable::close
                )
                .parallel(2)
                .runOn(Schedulers.computation())
                .filter(Files::isRegularFile);
    
    pf.subscribe(new Subscriber[] {
        new MySubscriber<>("0-subscriber"),
        new MySubscriber<>("1-subscriber"),
    });
    

    【讨论】:

      【解决方案2】:

      这是一条评论,但太长了。不过,这不是一个真正的答案,因为我不是反应流专家。这倒是一些值得深思的东西。 ?

      我的理解是每个订阅者都会看到所有已发布的元素,并且订阅者应该彼此独立(我想说这不包括明确的协调)。如果文件之间存在实质性差异(比如说一个是 PDF,另一个是 TXT),那么订阅者可能会决定只对他们构建的类型进行操作,否则,每个人都应该处理每个元素。

      您似乎正在尝试将工作负载分散到多个订阅者,我假设这些订阅者在不同的线程中运行。这绝对是现有并发构造处理得非常好的事情。例如,看看ExecutorService

      也就是说,如果您正在构建一个更大的流管道,我认为没有理由反对将分发文件处理跨线程部分封装在单个订阅者中。它甚至可能是发布者本身,一旦完成,就会发布处理每个文件的结果。

      最后一个警告:对于这个特定的用例,也许 RxJava 有一些东西。我很想阅读其他答案。

      【讨论】:

        【解决方案3】:

        发布者可以有两种:多播和单播。多播发布者向每个订阅者提供全套消息,而单播发布者将每条消息路由到单个订阅者。 SubmissionPublisher 被编程为多播,在其文档中说。

        您可以在我的库 DF4J 中找到单播发布者的实现。寻找接口org.df4j.protocol.Flow.Publisher的实现,它扩展了org.reactivestreams.Publisher

        【讨论】:

          猜你喜欢
          • 2015-02-08
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2022-11-07
          相关资源
          最近更新 更多