【发布时间】:2018-01-09 00:40:43
【问题描述】:
我正在使用 Java9 反应流和 RxJava2 进行测试。我对任何一个都没有偏好,但我正在寻找一些关于这是否可能的指导。
-
我正在创建预先配置的订阅者数量,如下所示:
for(int i = 0; i<MAX_SUBSCRIBERS; i++) { System.out.println("Creating subscriber: " + i); publisher.subscribe(new MySubscriber<>(i + "-subscriber")); } -
我正在从目录中读取文件列表,以便同时上传到某些第 3 方系统。
Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files")); paths .filter((Files::isRegularFile)) .forEach(pathName -> publisher.submit(pathName.toString()));
我收到以下输出:
0-subscriber: /my/dir/with/files/test0.txt received in onNext
0-subscriber: /my/dir/with/files/test1.txt received in onNext
1-subscriber: /my/dir/with/files/test0.txt received in onNext
1-subscriber: /my/dir/with/files/test1.txt received in onNext
理想情况下,我们应该看到以下行为。每个订阅者都应该在一个唯一的文件上执行工作。
0-subscriber: /my/dir/with/files/test0.txt received in onNext
1-subscriber: /my/dir/with/files/test1.txt received in onNext
这可能吗?任何提示都会很棒!
【问题讨论】:
标签: rx-java reactive-programming rx-java2 java-9 reactive-streams