【问题标题】:How to create an observable of a stream of infinite items如何创建一个无限项流的可观察对象
【发布时间】:2019-08-16 23:07:41
【问题描述】:

我有一个发射器,可以无限发射物品。如何将发射器发出的项目流转换为 RxJava 2 中的 Observable(或其中一种)。

【问题讨论】:

  • 您可以使用主题或使用 Observable.create 包装发射器....您可以添加代码吗?
  • 谢谢,亚历克斯!我还没有任何代码,但你的评论回答了我的问题。找到解决方案后,我将发布我的代码。

标签: rx-java2


【解决方案1】:

您是否正在寻找流式传输数据?假设我正在尝试从数据库中流式传输数据。

return Observable.using(
            () -> getQueryConnectionSubscription(sql),

            connectionSubscription -> Observable.create((subscriber) -> {

                ResultSet resultSet = connectionSubscription.getResultSet();
                int rowNumber = 0;
                while (!subscriber.isDisposed() && resultSet.next()) {

                    T row = rowMapper.mapRow(resultSet, rowNumber);
                    subscriber.onNext(row);
                }
                subscriber.onComplete();

            }),

            (queryConnectionSubscription) -> {
                queryConnectionSubscription.close();
            });

我不确定您的数据来源是什么。但是,只要您有数据,您就会一直调用subscriber.onNext(data)。 如果您想要完整的详细信息,请查看链接 https://www.developerthoughtsonline.com/2019/02/02/streaming-with-reactive-java-and-spring-jdbctemplate/

【讨论】:

    【解决方案2】:

    为了解决这个问题,我选择了 Subject 而不是 Observable。下面是代码示例。

    public class ItemEmitter {
      private BehaviorSubject<Object> subject = BehaviorSubject.create();
    
      public void onEvent(Object item) {
        subject.onNext(item);
      }
    
      public Flowable<Object> getObservable() {
        return subject.toFlowable(BackpressureStrategy.LATEST);
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-14
      • 1970-01-01
      • 1970-01-01
      • 2016-10-06
      相关资源
      最近更新 更多