【问题标题】:How to cancel an event in reactive stream?如何取消反应流中的事件?
【发布时间】:2017-08-09 16:45:40
【问题描述】:

我是响应式编程的新手,在使用“一切都可以是流”的口头禅时遇到困难。我正在考虑以下场景 - 我有一个 websocket 事件流 定义如下:

Rx.Observable.create((observer) => {

    io.on('connect', function(socket){

      socket.on("enroll", function(player) {
        observer.next({
          event: 'enroll',
          player,
          socket
        });
      });

      socket.on('resign', function(player){
        observer.next({
          event: 'resign',
          player,
          socket
        });
      });

    });

    return {
      dispose: io.close
    };
  });

然后我可以做类似的事情

enrollmentStream = events$
      .filter(find({ event: "enroll" }))
      .map(pick('player'));

同样

resignationStream = events$
      .filter(find({ event: "resign" }))
      .map(pick('player'));

我想将已注册的玩家聚集在一个流中,将他们分成 4 组,但显然这应该只针对在注册流中但不在 resignationStream 中或至少最后一个事件是注册的用户进行。我该怎么做?

这是大理石图。

有 5 名玩家报名。当有 4 名玩家注册时,游戏开始。请注意,第二个玩家(紫色)注册但随后退出,因此游戏不会以蓝色大理石开始,而是以下一个 - 黄色 - 因为只有在那之后才有真正的 4 名玩家准备好。

大概应该有一些类似"without"的流操作...有吗?

【问题讨论】:

  • 我不确定我是否符合您的要求,您能否添加您需要处理的一系列操作的弹珠图或序列图?
  • 请查看更新后的问题。

标签: stream rxjs reactive-programming


【解决方案1】:

我认为在这种情况下,您可以使用 combineLatest()scan() 运算符,然后自己列出未辞职的球员:

const bufferedEnrollment = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []);
const bufferedResignation = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []);

Observable.combineLatest(bufferedEnrollment, bufferedResignation)
  .map(values => {
    const enrolled = values[0];
    const resigned = values[1];

    // remove resigned players from `enrolled` array
    return enrolled;
  })
  .filter(players => players.length === 4)
  .subscribe(...)

scan() 运算符仅用于将玩家收集到一个数组中。例如,如果您希望能够重置数组,您可以将其与另一个 Observable 合并。

enrollmentStream
  .merge(resetStream)
  .scan((acc, val) => {
    if (!val) {
      return [];
    }
    acc.push(val);
    return acc;
  }, []);

(出于显而易见的原因,我没有测试此代码)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-09-21
    • 1970-01-01
    • 1970-01-01
    • 2012-06-08
    • 1970-01-01
    • 2019-08-12
    • 1970-01-01
    相关资源
    最近更新 更多