【问题标题】:Chained subscription to an array of observables using rxjs使用 rxjs 对一系列可观察对象的链式订阅
【发布时间】:2018-04-30 21:51:20
【问题描述】:

我有一组文件我想在 Angular 5 中上传(或至少 try 上传,个别失败是可以的),一次一个,然后知道所有订阅的时间已完成。

在 Javascript 的旧时代,我会使用链式回调来完成此操作,但由于我们现在拥有像 rxjs 这样的出色工具,我觉得有一种更好、更“反应性”的方式来做到这一点。

那么:用 rxJS 最好的方法是什么?

我有一个 Angular 提供程序,它上传单个文件并返回一个 Observable;我想尝试单独上传数组中的每个文件,并知道它们何时全部完成。

在下面的示例中,我将提供程序替换为一个简单的主题,该主题在随机时间后以随机成功或错误完成,试图模拟不稳定的互联网连接。

问题:当我使用 Observable.combineLatest() 时,只有在所有 Observables 都具有 next() 的结果时,我才会得到最终结果:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]。如果不是所有的 Observables 都完成了,我就永远不会得到结果。

此外,Observable 不会一次运行一个,而是一次运行。与 AJAX 请求一起使用时,它可能会使单元连接过载。

关于如何解决这个问题的任何想法?

constructor() {
    let observables = [];

    for (var i = 0; i < 100; i++) {
        observables.push(this.testObservable(i));
    }


    Observable.combineLatest(observables)
        .subscribe(res => {
            console.log('success', res);
        }, err => {
            console.log('errors', err);
        })
}



testObservable(param) {
    let subject = new Subject;

    let num = Math.random() * 10000;
    console.log('starting', param)
    setTimeout(() => {

        if (Math.random() > 0.5) {
            console.log('success', param);
            subject.next(param);
        } else {
            console.log('error', param);
            subject.error(param);
        }

        subject.complete();
    }, num);

    return subject;
}

【问题讨论】:

  • If not all the Observables complete, I never get a result at all. 在这种情况下你想做什么?一段时间后放弃?您的上传不应该在某个时候失败吗?此外,您声明您不喜欢combineLatest,因为您在全部完成之前不会得到结果。您希望在每个完成后得到一个结果吗?或者,无论是否发生任何变化,您更愿意每 X 秒获得一次结果?
  • 我想知道所有 Observables 何时完成,无论它们是否“解析”了一个值。这个想法是知道何时以一种或另一种方式向用户提供流程已完成的反馈。这不是 RxJS 的方式吗?我可能需要重新考虑我的设计。

标签: angular rxjs


【解决方案1】:

您可以使用 combineLatest 等待 Observable 数组完成。它会在它提供的每个 Observable 至少发出一次后发出。

let files = [uri1, uri2, uri3];
const observables = files.map(file => this.addMedia({ uri: file, post_id: res.post.Post.id }));

Observable.combineLatest(observables).subscribe(() => console.log('All complete'));

如果您希望 Observables 一个接一个地执行,您可以使用concat(用于有序)或merge(如果顺序不重要)。

为了捕获错误,您可以为每个 Observable 添加一个 catch 运算符并返回一个空的 Observable 或更合适的东西。

this.addMedia({ uri: file, post_id: res.post.Post.id })
    .do(val => console.log(`emitting: ${val}`))
    .catch(err => {
      console.log(`error: ${err}`);
      return Observable.empty();
    });

【讨论】:

  • 这种方法的问题是所有的 Observable 都被一次性订阅,这可能会使网络连接过载。
  • 另外,如果某些 Observable 由于错误而从未发出 next(),那么生成的 Observable 永远不会完成。
【解决方案2】:

子 observables 应该发出 next() 或者它们应该抛出错误。在这两种情况下,子 observable 都应该完成。如果子 observable 没有完成,那么您的子 observable 中有一个错误。如果您知道子 observable 有错误并且永远无法完成,那么您可以使用超时,但您真的不应该这样做。

由于子 observables 中的错误很好,你应该抓住它们。最后,如果您想缓冲项目以便它们不会一次全部发送到服务器,那么您也可以这样做。

把它们放在一起 (RxJS Playground Link):

const start = Date.now();

function elapsed() {
  return Date.now() - start;
}

function dummyObservable(delay, error) {
  // An observable that fails
  if (error) {
    return Rx.Observable.throw(error);
  }
  // An observable that succeeds after some amount of time
  if (delay) {
    return Rx.Observable.create(observer => {
      console.log(elapsed() + ': Request to server emitted');
      setTimeout(() => observer.next(), delay);
    });
  }
  // An observable that never completes (you really shouldn't have these)
  return Rx.Observable.create(() => {});
}

function formatResult(result) {
  if (result.failed) {
    return 'FAIL(' + result.error + ')';
  } else {
    return 'PASS';
  }
}

const obs1 = dummyObservable(1000);
const obs2 = dummyObservable(500);
const fails = dummyObservable(null, new Error('This one fails'));
const neverFinishes = dummyObservable();

const observables = [obs1, obs2, fails, neverFinishes];
// We only want the first response.  Only needed if your source observables aren't completing after emitting one item
const firstOnly = observables.map(obs => obs.first());
// Only allow 5 seconds and abort if no response after 5 seconds
const timeoutsHandled = firstOnly.map(obs => obs.timeout(5000));
// If any failures occur then handle them
const failuresHandled = timeoutsHandled.map(obs => obs.map(() => ({ failed: false })).catch((err) => Rx.Observable.of({ failed: true, error: err })));

const buffered = [];
// Buffer the request so 200 ms pass between each.
for(let i = 0; i < failuresHandled.length; i++) {
  const delay = i * 200;
  buffered.push(Rx.Observable.of([null]).delay(delay).first().switchMap(() => failuresHandled[i]));
}

const combined = Rx.Observable.combineLatest(buffered);
combined.first().subscribe(
  (values) => console.log(elapsed() + ': values: ' + values.map(v => formatResult(v))),
  err => console.log(err)
);

【讨论】:

    猜你喜欢
    • 2018-07-28
    • 1970-01-01
    • 2022-10-17
    • 1970-01-01
    • 1970-01-01
    • 2022-01-23
    • 1970-01-01
    • 1970-01-01
    • 2020-09-16
    相关资源
    最近更新 更多