【问题标题】:How to dynamically route RxJS streams如何动态路由 RxJS 流
【发布时间】:2016-10-06 19:11:11
【问题描述】:

我有来自汽车拍卖的数据流。每个汽车拍卖都有 n 条车道。我想记录每辆车的拍卖情况。

流看起来像这样......

--{lane: 1, action:bid} --- { lane: 2, action: start} --- { lane:1, action:bid} --- {lane: 2, action:bid} --- {lane:1, action: sold} ---

我有以下缓冲每个拍卖通道并关闭销售缓冲区...

const bufferOpen$= auctionWebSocketStream$
    .filter(stream => stream.tag === 'CURITEM');

const bufferClose$ = () => auctionWebSocketStream$.filter(stream => stream.tag === 'SOLD');

auctionWebSocketStream$
  .bufferToggle(bufferOpen$, bufferClose$)
  .subscribe(x => console.log(x));

只要有一个拍卖和一条车道,上述方法就可以正常工作。拥有多条车道,就有关于多条车道的出价/销售信息。

如何按通道将流聚合到缓冲区中?类似的解决方案总是具有已知的聚合参数。但只要有新车道,我就需要分流。

非常感谢您的帮助。

更新

我制作了一个 JSBin 来炫耀我的挫败感和无知。它提供了一个示例输入流并解释了所需的输出。

http://jsbin.com/tuxitev/edit?js,console

(对于奖励积分,它只显示 Babel 下的空数组。不知道为什么需要 Typescript)

【问题讨论】:

  • 刚刚偶然发现 groupBy... 我想我要“抱歉浪费大家的时间”了。
  • 你有固定数量的车道吗?一块土地可以重新拍卖吗?
  • 它没有修复,它可以重新打开。例如。拍卖 1,通道 A、B、C --- 拍卖 2,通道 A、B --- 拍卖 1,通道 D --- 拍卖 1,通道 A 完成 --- 拍卖 3,通道 A
  • 那么当它重新打开时,你会再次订阅吗?
  • 是的,需要重新订阅。这有点极端,如果它引起很多问题,我可以选择非重新订阅的版本。

标签: rxjs rxjs5


【解决方案1】:

如果有人知道在哪里可以回答 RxJS 问题,请告诉我。我会接受答案。这是我遇到的第三个未回答的 RxJS 问题。

对于任何对答案感到好奇的人,这里就是。

stream$
.groupBy(stream => stream.lane)
.mergeMap(stream =>
        stream.scan((acc, cur) => {
                if (cur.action === 'start') {
                    acc = [];
                }
                acc.push(cur)
                return acc;
        }, [])
        .filter(stream => stream[stream.length-1].action === 'sold')
)
.subscribe(
    x => console.log(x), 
    (e) => console.error(e), 
    () => console.log('complete')
)

http://jsbin.com/tuxitev/edit?js,console

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-09-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-10
    • 1970-01-01
    • 2022-11-12
    • 1970-01-01
    相关资源
    最近更新 更多