【问题标题】:Zip list of observables into another Zip observable RxJava2将 observables 列表压缩到另一个 Zip observable RxJava2
【发布时间】:2023-04-03 14:31:01
【问题描述】:

我正在尝试zipzipObservables 的列表,但问题是我每次只能从压缩的 observables 中获得相同的值。我这样做的原因是从 ble 执行两个操作 1st reading index2nd reading data 一定次数 - 在下面的示例中是 6 次。

不确定如何使用RxJava2 处理此问题

这里是代码sn-p

 private Observable<Pair<byte[],byte[]>> getValueFromIndication(RxBleConnection rxBleConnection){

         final PublishSubject<Boolean> unsubscribeSubject = PublishSubject.create();


        return Observable.zip(

                rxBleConnection.setupIndication(Data.INDEX,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
                rxBleConnection.setupIndication(Data.DATA,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),

                (bytes, bytes2) -> {

                    unsubscribeSubject.onNext(true);

                    return Pair.create(bytes,bytes2);
                }
        );
}

从我的主流中,我首先创建Observables 列表并将其压缩并传递它

 .flatMap(rxBleConnection -> {


        List<Observable<Pair<byte[],byte[]>>> observableList = new ArrayList<>();

        for(int i=0;i<6;i++){

            //Creating list of observables so that 6 times this function gets fire
            observableList.add(getValueFromIndication(rxBleConnection));

        }

        // Zipping Zipped list of observables 
        return Observable.zip(observableList,Data::OperationReadings);
    }).subscribe(bytes->{


    })

在这里,我总是在Data::OperationReadings 中得到相同的值。目前,我得到了以下我不想要的数据。

每次相同的索引和值

INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]

预期数据如下

每次不同的索引和值

INDEX [1] DATA [10,30,20,30,33,0]
INDEX [2] DATA [11,11,2,0,3,0]
INDEX [3] DATA [0,0,0,0,33,0]
INDEX [4] DATA [10,30,0,30,3,0]
INDEX [5] DATA [10,0,0,30,3,0]
INDEX [6] DATA [10,0,20,30,3,9]

【问题讨论】:

  • 每个要发出的索引是否需要单独打开指示?或者指示可以一直持续到所有 6 个指示都收到?
  • 发射需要单独开启和关闭指示。 On -> read->off , On->read->Off 等等。每次从 ble 设备发出新的索引和数据时

标签: android rx-java2 rxandroidble rxbluetooth


【解决方案1】:

您获得相同数据重复 6 次的原因是您同时订阅了个人 getValueFromIndication()。实际上每个 Observable 都是并行运行的。您希望按顺序运行每个订阅。解决方案可能是替换它:

        return Observable.zip(observableList,Data::OperationReadings);

与:

        return Observable.concat(observableList) // we want to subscribe each Observable from list after the previous one will complete
            .toList() // we want to gather all results from individual Observables from the list — this returns a Single
            .toObservable() // get back to the Observable class so the types will match
            .map(Data::OperationReadings); // we map it into the OperationReadings class

【讨论】:

  • 如果 observableList 中的任何列表项导致任何错误,那么会传播到最终订阅者的 onError 吗?
  • 是的。它应该传播到订阅者的 onError。为了防止这种情况发生,需要对添加到链中的错误进行显式处理。
猜你喜欢
  • 2015-02-05
  • 1970-01-01
  • 2021-04-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-04-09
  • 2019-09-11
相关资源
最近更新 更多