【问题标题】:How to subscribe to merged infinite flowables?如何订阅合并的无限流?
【发布时间】:2019-10-18 14:58:34
【问题描述】:

我有几个无限的 Flowables(从 BlockingQueues 获取数据)。我将它们合并并订阅我的自定义订阅者。我不明白为什么我只从单个输入 Flowable 中获取消息。

这是我的代码:

<T> void example() {
    List<BlockingQueue<T>> queues = createQueues();

    List<Flowable<T>> allFlowables = queues.stream()
            .map(this::createFlowable)
            .collect(Collectors.toList());

    FlowableScan.merge(allFlowables)
            .subscribe(new DefaultSubscriber<T>() {

                @Override
                protected void onStart() {
                    System.out.println("Start!");
                    request(1);
                }

                @Override
                public void onNext(T message) {
                    System.out.println(message);
                    request(1);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {
                    System.out.println("Done!");
                }
            });
}

<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
    return Flowable.generate(emitter -> {
        T msg = takeFromQueue(q); // blocking
        emitter.onNext(msg);
    });
}

我只看到来自单个队列的消息,我错过了什么? 我试过调度程序,但没有帮助。 如何修复上述代码从所有输入队列中消费?

【问题讨论】:

  • 因为您阻塞了第一个队列为所有源提供服务的唯一线程。您必须引入异步,例如在createFlowable 中应用.subscribeOn(Schedulers.io())
  • @akarnokd 没错,我在这篇文章中找到了答案:medium.com/yammer-engineering/…

标签: reactive-programming rx-java2


【解决方案1】:

因为您阻塞了第一个队列为所有源提供服务的唯一线程。您必须引入异步,例如在createFlowable 中应用.subscribeOn(Schedulers.io())

<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
    return Flowable.generate(emitter -> {
        T msg = takeFromQueue(q); // blocking
        emitter.onNext(msg);
    }).subscribeOn(Schedulers.io()); // <----------------------------------
}

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-09-20
  • 2013-02-21
  • 2014-04-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-06-26
相关资源
最近更新 更多