【发布时间】:2015-02-07 05:27:43
【问题描述】:
我想知道是否可以使用 RxJava 库在以下用例中添加一些并发性:
- 使用自定义
Observable(类似于ResultSetObservable.create(resultSet))从现有ResultSet中按顺序获取String列 - 为这些值中的每一个调用 Web 服务(例如,使用
InvokeWebServiceFunc1<String, Pair<String, Integer>>()实例)以检索与String相关的一些统计信息(请注意,Pair中的String是相同的作为传入的输入) - 以 CSV 格式打印所有内容(使用
ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream))。
这就是我所拥有的:
ResultSetObservable.create(resultSet)
.map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>())
.subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out));
它运行良好,但由于 Web 服务可能需要一些时间来处理某些 String 输入,我想通过为映射提供类似线程池的行为(共 10 个)来添加一些并发性例如线程),但我需要 ExportAsCSVAction0 在同一个线程中调用(实际上当前线程是完美的)。
你能帮帮我吗?我不知道在这里使用toBlocking().forEach() 模式是否是正确的解决方案,我不明白在哪里使用Schedulers.from(fixedThreadPool)(在observeOn() 或subscribeOn() 中)。
感谢您的帮助!
【问题讨论】:
标签: java multithreading asynchronous concurrency rx-java