【问题标题】:Most effective way to turn Observable into ObservableValue/Binding/EventStream?将 Observable 转换为 ObservableValue/Binding/EventStream 的最有效方法?
【发布时间】:2015-05-14 15:31:54
【问题描述】:

我将更多地使用 RxJava 和 ReactFX,但我想了解的是如何协调两者,因为 ReactFX 没有 RxJava 依赖项,那么两者如何在同一个 monad 中相互交谈?对于 JavaFX 的 ObservableValue、RxJava 的 Observable 和 ReactFX 的 StreamEvent 之间没有大量样板的桥接尤其如此。

我想用 RxJava 编写我的核心业务逻辑,因为它们并不总是支持 JavaFX 应用程序。但我希望 JavaFX UI 使用ReactFX 并使用EventStream。所以我的问题是将EventStream 转换为ObservableObservable 转换为EventStreamBindingObservableValue 的最有效方法是什么?我知道我可以全面使用 RxJava,但我想利用 ReactFX 的平台线程安全性和便利性......

//DESIRE 1- Turn EventStream into Observable in the same monad
Observable<Foo> obs = EventStream.valuesOf(fooObservableValue).toObservable();

//Desire 2- Turn Observable into ObservableValue, Eventstream, or Binding
Binding<Foo> obsVal = Observable.create(...).toBinding();

【问题讨论】:

    标签: java javafx reactive-programming rx-java reactfx


    【解决方案1】:

    将 ReactFX EventStream 转换为 RxJava Observable

    Observable<Foo> toRx(EventStream<Foo> es) {
        PublishSubject<Foo> sub = PublishSubject.create();
        es.subscribe(sub::onNext);
        return sub;
    }
    

    将 RxJava Observable 转换为 ReactFX EventStream

    EventStream<Foo> fromRx(Observable<Foo> obs) {
        EventSource<Foo> es = new EventSource<>();
        obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)));
        return es;
    }
    

    注意后一种情况下的Platform.runLater(...)。这使得生成的 EventStream 在 JavaFX 应用程序线程上发出事件。

    另请注意,在这两种情况下,我们都忽略了从 subscribe 方法返回的 Subscriptions。如果您正在为应用程序的生命周期建立绑定,这很好。另一方面,如果它们之间的绑定应该是短暂的,在第一种情况下,你会让你的 RxJava 组件公开Subject,你的 ReactFX 组件公开EventStream,然后执行subscribe/ unsubscribe 必要时。第二种情况也是如此。

    【讨论】:

    • 完美!谢谢托马斯。
    • 顺便说一句,我发现这在 TableView TableColumn cellValueFactory 中不起作用。 FX 线程和调度程序会发生一些奇怪的事情。不太清楚该怎么做。
    【解决方案2】:

    我不熟悉 ReactFX,但查看 API 我可以推断出这些转换:

    public static <T> Observable<T> toObservable(EventStream<? extends T> es) {
        return Observable.create(child -> {
            Subscription s = es.subscribe(child::onNext);
            child.add(Subscriptions.create(s::unsubscribe));
        });
    }
    public static <T> EventStream<T> toEventStream(Observable<? extends T> o) {
        return new EventStream<T>() {
            final Vector<Consumer<? super T>> observers = new Vector<>();
            @Override
            public void addObserver(Consumer<? super T> observer) {
                observers.add(observer);
            }
    
            @Override
            public void removeObserver(Consumer<? super T> observer) {
                observers.remove(observer);
            }
            @Override
            public Subscription subscribe(Consumer<? super T> subscriber) {
                addObserver(subscriber);
    
                rx.Subscriber<T> s = new rx.Subscriber<T>() {
                    @Override
                    public void onNext(T t) {
                        subscriber.accept(t);
                    }
                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        removeObserver(subscriber);
                    }
                    @Override
                    public void onCompleted() {
                        removeObserver(subscriber);
                    }
                };
                o.subscribe(s);
    
                return () -> {
                    s.unsubscribe();
                    removeObserver(subscriber);
                };
            }
        };
    }
    

    两者都应该为您提供取消订阅功能,尽管 ReactFX 不支持同步取消订阅,而且我真的不知道 EventStream 是否可以用作热或冷的 observable。我无法访问 Binding,所以无法帮助您。

    【讨论】:

      猜你喜欢
      • 2015-04-14
      • 1970-01-01
      • 2010-09-18
      • 1970-01-01
      • 2015-02-25
      • 2015-07-31
      • 2013-04-15
      • 2011-07-02
      • 2010-10-02
      相关资源
      最近更新 更多