【问题标题】:rxjava chain observalbes dynamicllyrxjava 动态链接 observables
【发布时间】:2022-01-04 15:45:42
【问题描述】:

我有一个这样创建的链式可观察对象:

Disposable disposable = currentUsedAdapter.connect(ip)
        .observeOn(AndroidSchedulers.mainThread())
        .concatMap(fallbackAdapter(ProtocolType.V2))
        .delay(500, TimeUnit.MILLISECONDS)
        .concatMap(fallbackAdapter(ProtocolType.V1))
        .subscribeWith(connectionSubscriber);

这是fallbackAdapter的方法:

private Function<Boolean, Observable<Boolean>> fallbackAdapter(ProtocolType protocolType) {
    return new Function<Boolean, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> apply(@NonNull Boolean isConnected) throws Exception {
            if (isConnected) {
                return Observable.just(true);
            } else {
                TempAdapter adapter = new TempAdapter(context, protocolType);
                return currentUsedAdapter.connect(ipAddress);
            }
        }
    };
}

目前这是静态完成的,并且工作正常。 虽然我想创建一个 fallbackAdapter(ProtocolType.*) 列表,因为我只知道运行时的回退数量。

所以我创建了这个:

ArrayList<Function<Boolean, Observable<Boolean>>> adaptersList = new ArrayList<>();
adaptersList.add(fallbackAdapter(ProtocolType.V2));
...
adaptersList.add(fallbackAdapter(ProtocolType.V9));

Disposable disposable = Observable.fromIterable(adaptersList)
        .concatMap(adapter ->
                adapter.apply(true))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(connectionSubscriber);

我创建了一个可以动态更新的列表。 但是,我不确定如何将isConnected 的值从一个适配器传递到另一个适配器。我目前将true 传递给每个人,但其中一些应该返回false,但我不确定如何使用Observable.fromIterable 将这个值从一个发射器传递到另一个发射器。

所以我的问题是我应该如何更改这个.concatMap(adapter -&gt; adapter.apply(true)),这样我就不会总是发送true,而是发送前一个适配器处理过的值? p>

谢谢

【问题讨论】:

    标签: java android rx-java rx-java2 rx-android


    【解决方案1】:

    如果它对任何人有帮助... 我没有找到解决它的 rxjava 方法,所以我以旧的 java 方式解决了... 我创建了一个构建器类并在我的主要可观察对象中添加了一个可观察对象,最后我返回了所有内容。 类似的东西:

    public class DisposableBuilder {
        Observable<Boolean> observable;
    
        public DisposableBuilder() {
        }
    
        public void build(String ip) {
            observable = currentUsedAdapter.connect(host);
            if (adaptersNames != null) {
                for (int i = 1; i < adaptersNames.size(); i++) { // skip first adapter (currentUsedAdapter adapter)
                    this.append(AdapterFactory.getAdapter(context, adaptersNames.get(i)));
                }
            }
        }
    
        public void append(CustomAdapter adapter) {
            observable = observable
                    .delay(200, TimeUnit.MILLISECONDS)
                    .concatMap(fallbackAdapter(adapter));
        }
    
        public Observable<Boolean> getObservable() {
            return observable;
        }
    }
    

    然后我就这样使用它:

    disposableBuilder.build(ip);
    this.disposable = disposableBuilder.getObservable()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(connectionSubscriber);
    

    【讨论】:

      猜你喜欢
      • 2019-08-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多