【问题标题】:Adding a Pool of Threads in a RxJava Flow在 RxJava 流中添加线程池
【发布时间】:2015-02-07 05:27:43
【问题描述】:

我想知道是否可以使用 RxJava 库在以下用例中添加一些并发性:

  • 使用自定义Observable(类似于ResultSetObservable.create(resultSet))从现有ResultSet 中按顺序获取String
  • 为这些值中的每一个调用 Web 服务(例如,使用 InvokeWebServiceFunc1<String, Pair<String, Integer>>() 实例)以检索与 String 相关的一些统计信息(请注意,Pair 中的 String 是相同的作为传入的输入)
  • 以 CSV 格式打印所有内容(使用ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream))。

这就是我所拥有的:

ResultSetObservable.create(resultSet)
    .map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>())
    .subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out));

它运行良好,但由于 Web 服务可能需要一些时间来处理某些 String 输入,我想通过为映射提供类似线程池的行为(共 10 个)来添加一些并发性例如线程),但我需要 ExportAsCSVAction0 在同一个线程中调用(实际上当前线程是完美的)。

你能帮帮我吗?我不知道在这里使用toBlocking().forEach() 模式是否是正确的解决方案,我不明白在哪里使用Schedulers.from(fixedThreadPool)(在observeOn()subscribeOn() 中)。

感谢您的帮助!

【问题讨论】:

    标签: java multithreading asynchronous concurrency rx-java


    【解决方案1】:

    我自己找到的!

    package radium.rx;
    
    import java.util.List;
    import java.util.Arrays;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import rx.Observable;
    import rx.schedulers.Schedulers;
    
    public class TryRx {
    
        public static Random RANDOM = new Random();
    
        public static void main(String[] arguments) throws Throwable {
            final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
            final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
    
            Iterable<Integer> outputs = Observable.<Integer>from(inputs)
                    .flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(threadPoolExecutor)))
                    .toBlocking()
                .toIterable();
    
            for (Integer output : outputs) {
                System.out.println(output);
            }
    
            threadPoolExecutor.shutdown();
        }
    
        public static void sleepQuietly(int duration, TimeUnit unit) {
            try {
                Thread.sleep(unit.toMillis(duration));
            } catch (InterruptedException e) {
    
            }
        }
    
        public static Observable<Integer> deferHeavyWeightStuff(final int input) {
            return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input)));
        }
    
        public static int randomInt(int min, int max) {
            return RANDOM.nextInt((max - min) + 1) + min;
        }
    
        public static int doHeavyWeightStuff(int input) {
            sleepQuietly(randomInt(1, 5), TimeUnit.SECONDS);
            int output = (int) Math.pow(input, 2);
            return output;
        }
    
    }
    

    干杯!

    【讨论】:

    • 非常感谢。但是,输出的顺序似乎与输入的顺序不同。关于如何修复它的任何线索。
    • 这是并行化时所期望的。
    • 查看随机休眠时间函数
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-26
    • 2011-02-06
    • 1970-01-01
    • 1970-01-01
    • 2015-02-19
    • 1970-01-01
    相关资源
    最近更新 更多