【问题标题】:RX: Handling Errors in combineListRX:处理 combineList 中的错误
【发布时间】:2019-05-24 23:35:40
【问题描述】:

我在 RxDart 中使用combinedLatest2,但我仍然对此感到困惑。这是我的代码:

final validator = StreamTransformer<String, String>.fromHandlers(
    handleData: (data, sink) =>
        data.isNotEmpty ? sink.add(data) : sink.addError('Cannot be empty.'));

final _subject1 = BehaviorSubject<String>();
final stream1 = _subject1.stream.transform(validator);
final changeSubject1 = _subject1.sink.add;

final _subject2 = BehaviorSubject<String>();
final stream2 = _subject2.stream.transform(validator);
final changeSubject2 = _subject2.sink.add;

final combined =
    Observable.combineLatest2(stream1, stream2, (a, b) => '$a, $b');

我有两个主题引用了它们的流和sink.add 函数。在将流分配给它们各自的变量之前,我添加了一个转换器,以确保发出非空字符串,否则将向接收器添加错误。最后,我通过使用Observable.combineLatest2 组合前两个流创建了另一个流combined

combined 流仅在其“子”流发出有效值时才会发出。我在这里面临的问题发生在两个流已经发出有效值,然后都发出无效值,然后其中一个发出有效值时。有趣的是,对于最终的发射,combined 流还发射了新更新的流的新值和另一个流的前一个有效值(尽管在前一个有效值之后已经发射了一个无效值)。我能防止这种情况发生吗?换句话说,运行这段代码:

combined.listen((data) => print(data), onError: (error) => print('error'));

changeSubject1('Hello');
changeSubject2('World');
changeSubject1('');
changeSubject2('');
changeSubject1('NewWorld');

会产生这个输出:

Hello, World
error
error
NewHello, World
NewHello, NewWorld

我想要达到的输出:

Hello, World
error
error
NewHello, NewWorld

总之,我试图让combined 流仅在每个流的最新值有效时才发出。

【问题讨论】:

    标签: dart rxjs reactivex rxdart


    【解决方案1】:

    通过创建“错误感知”combineList 变体函数,我能够得到我想要的。原始的combineList 函数通过使用CombineLatestStream 流创建一个新的Observable 来工作。我创建了一个新的流类,即ErrorAwareCombineLatestStream,它的实现与CombineLatestStream 几乎相同。我只添加了几行来保留每个流的错误状态,并且仅在解决所有错误时才发出。

    这是我的实现:

    ErrorAwareCombineLatestStream 类:

    class ErrorAwareCombineLatestStream<T, A, B, C, D, E, F, G, H, I>
        extends Stream<T> {
      final StreamController<T> controller;
    
      ErrorAwareCombineLatestStream(Iterable<Stream<dynamic>> streams,
          T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]))
          : controller = _buildController(streams, combiner);
    
      @override
      StreamSubscription<T> listen(void Function(T event) onData,
          {Function onError, void Function() onDone, bool cancelOnError}) {
        return controller.stream.listen(onData,
            onError: onError, onDone: onDone, cancelOnError: cancelOnError);
      }
    
      static StreamController<T> _buildController<T, A, B, C, D, E, F, G, H, I>(
          Iterable<Stream<dynamic>> streams,
          T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i])) {
        final List<StreamSubscription<dynamic>> subscriptions =
            new List<StreamSubscription<dynamic>>(streams.length);
        StreamController<T> controller;
    
        controller = new StreamController<T>(
            sync: true,
            onListen: () {
              final List<dynamic> values = new List<dynamic>(streams.length);
              final List<bool> triggered =
                  new List<bool>.generate(streams.length, (_) => false);
              final List<bool> completedStatus =
                  new List<bool>.generate(streams.length, (_) => false);
              final List<bool> hasError =
                  new List<bool>.generate(streams.length, (_) => false);
    
              for (int i = 0, len = streams.length; i < len; i++) {
                Stream<dynamic> stream = streams.elementAt(i);
                subscriptions[i] = stream.listen((dynamic value) {
                  values[i] = value;
                  triggered[i] = true;
                  hasError[i] = false;
    
                  final allStreamsHaveEvents =
                      triggered.reduce((bool a, bool b) => a && b) &&
                          !hasError.reduce((a, b) => a || b);
    
                  if (allStreamsHaveEvents)
                    updateWithValues(combiner, values, controller);
                }, onError: (e) {
                  hasError[i] = true;
                  controller.addError(e);
                }, onDone: () {
                  completedStatus[i] = true;
    
                  if (completedStatus.reduce((bool a, bool b) => a && b))
                    controller.close();
                });
              }
            },
            onCancel: () => Future.wait<dynamic>(subscriptions
                .map((StreamSubscription<dynamic> subscription) =>
                    subscription.cancel())
                .where((Future<dynamic> cancelFuture) => cancelFuture != null)));
    
        return controller;
      }
    
      static void updateWithValues<T, A, B, C, D, E, F, G, H, I>(
          T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]),
          Iterable<dynamic> values,
          StreamController<T> controller) {
        try {
          final int len = values.length;
          final A a = values.elementAt(0);
          final B b = values.elementAt(1);
          T result;
    
          switch (len) {
            case 2:
              result = combiner(a, b);
              break;
            case 3:
              final C c = values.elementAt(2);
    
              result = combiner(a, b, c);
              break;
            case 4:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
    
              result = combiner(a, b, c, d);
              break;
            case 5:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
    
              result = combiner(a, b, c, d, e);
              break;
            case 6:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
              final F f = values.elementAt(5);
    
              result = combiner(a, b, c, d, e, f);
              break;
            case 7:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
              final F f = values.elementAt(5);
              final G g = values.elementAt(6);
    
              result = combiner(a, b, c, d, e, f, g);
              break;
            case 8:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
              final F f = values.elementAt(5);
              final G g = values.elementAt(6);
              final H h = values.elementAt(7);
    
              result = combiner(a, b, c, d, e, f, g, h);
              break;
            case 9:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
              final F f = values.elementAt(5);
              final G g = values.elementAt(6);
              final H h = values.elementAt(7);
              final I i = values.elementAt(8);
    
              result = combiner(a, b, c, d, e, f, g, h, i);
              break;
          }
    
          controller.add(result);
        } catch (e, s) {
          controller.addError(e, s);
        }
      }
    }
    

    errorAwareCombineLatest2 函数:

    Observable<T> errorAwareCombineLatest2<A, B, T>(
            Stream<A> streamOne, Stream<B> streamTwo, T combiner(A a, B b)) =>
        new Observable<T>(new ErrorAwareCombineLatestStream<T, A, B, Null, Null,
                Null, Null, Null, Null, Null>(
            <Stream<dynamic>>[streamOne, streamTwo],
            (A a, B b, [Null c, Null d, Null e, Null f, Null g, Null h, Null i]) =>
                combiner(a, b)));
    

    【讨论】:

      猜你喜欢
      • 2016-03-03
      • 2016-03-05
      • 1970-01-01
      • 2016-12-20
      • 1970-01-01
      • 1970-01-01
      • 2016-10-04
      • 2021-12-25
      • 1970-01-01
      相关资源
      最近更新 更多