【问题标题】:Chain Observable Queue链式可观察队列
【发布时间】:2017-06-26 22:03:00
【问题描述】:

来自 Promise 世界,我可以实现一个队列函数,该函数返回一个 Promise,在前一个 Promise 解决之前不会执行。

var promise = Promise.resolve();
var i = 0;

function promiseQueue() {
  return promise = promise.then(() => {
    return Promise.resolve(++i);
  });
}

promiseQueue().then(result => {
  console.log(result); // 1
});
promiseQueue().then(result => {
  console.log(result); // 2
});
promiseQueue().then(result => {
  console.log(result); // 3
});
// -> 1, 2, 3

我正在尝试使用 Observables 重新创建这个类似队列的函数。

var obs = Rx.Observable.of(undefined);
var j = 0;

function obsQueue() {
  return obs = obs.flatMap(() => {
    return Rx.Observable.of(++j);
  });
}

obsQueue().subscribe(result => {
  console.log(result); // 1
});
obsQueue().subscribe(result => {
  console.log(result); // 3
});
obsQueue().subscribe(result => {
  console.log(result); // 6
});
// -> 1, 3, 6

每次订阅时,它都会重新执行 Observable 的历史记录,因为在订阅时,“当前 Observable”实际上是一个发出多个值的 Observable,而不是等待直到最后一次执行的 Promise完成。

flatMap 不是这个用例的答案,我可以在网上找到的几乎所有“链”和“队列”答案都是关于链接多个 Observable,它们是一个整体 Observable 的一部分,其中 flatMap 是正确答案。

如何使用 Observables 创建上述 Promise 队列函数?

就上下文而言,此队列功能正在对话服务中使用,它规定一次只能显示一个对话。如果多次调用以显示不同的对话框,则它们一次只会按调用顺序出现一个。

【问题讨论】:

  • 所以你想每次都从 observable 中取出下一项?你试过.take(1)吗?
  • 我想等到complete() 被前一个 Observable 的观察者调用后再创建一个新的观察者。我玩过finally(),但还没有找到让它工作的方法。
  • 如果你寻求一个特定的顺序,应该使用 concatMap。内部值可以使用在通知应该调用下一个通知时发出的主题

标签: promise rxjs observable


【解决方案1】:

如果你改变:

return obs = obs.flatMap...

return obs.flatMap...

您将看到与使用 Promise (1, 2, 3) 相同的输出。

要链接可观察对象,以便在前一个完成之前不执行下一个,请使用the concat operator

let letters$ = Rx.Observable.from(['a','b','c']);
let numbers$ = Rx.Observable.from([1,2,3]);
let romans$ = Rx.Observable.from(['I','II','III']);

letters$.concat(numbers$).concat(romans$).subscribe(e=>console.log(e));
//or...
Rx.Observable.concat(letters$,numbers$,romans$).subscribe(e=>console.log(e));
// results...
a   b   c   1   2   3   I   II   III

Live demo

【讨论】:

  • 您的flatMap 建议的问题是可观察对象不会等待彼此完成。它们是从 Rx.Observable.of(undefined) 映射的,而不是以前的映射,因此此时没有指向 flatMap 的意义。至于concat,这会立即创建新的可观察对象并将它们链接起来,而我想延迟创建可观察对象直到前一个对象完成。
【解决方案2】:

想通了!可能不如 Promise 链那么优雅,我绝对愿意接受清理它的建议。

var trigger = undefined;
function obsQueue() {
  if (!trigger || trigger.isStopped) {
    trigger = new Rx.Subject();
    return createObservable(trigger);
  } else {
    var lastTrigger = trigger;
    var newTrigger = trigger = new Rx.Subject();
    return lastTrigger.last().mergeMap(() => {
      return createObservable(newTrigger);
    });
  }
}

var j = 0;
function createObservable(trigger) {
  // In my use case, this creates and shows a dialog and returns an 
  // observable that emits and completes when an option is selected. 
  // We want to make sure we only create the next dialog when the previous
  // one is closed.
  console.log('creating');
  return new Rx.Observable.of(++j).finally(() => {
    trigger.next();
    trigger.complete();
  });
}

obsQueue().subscribe(result => {
  console.log('first', result);
});
obsQueue().subscribe(result => {
  console.log('second', result);
});
obsQueue().subscribe(result => {
  console.log('third', result);
});
var timer = setTimeout(() => {
  obsQueue().subscribe(result => {
    console.log('fourth', result);
  });
}, 1000);

// Output:
// creating
// first 1
// creating
// second 2
// creating
// third 3
// creating
// fourth 4

我没有试图弄清楚如何按顺序链接它们,而是让每个 observable 创建自己的触发器,让下一个 observable 知道何时创建自己。

如果所有的触发器都完成了(setTimeout的情况,我们稍后再排队),那么队列又开始了。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-02-12
    • 2011-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-01
    • 1970-01-01
    相关资源
    最近更新 更多