【发布时间】:2016-10-06 19:11:11
【问题描述】:
我有来自汽车拍卖的数据流。每个汽车拍卖都有 n 条车道。我想记录每辆车的拍卖情况。
流看起来像这样......
--{lane: 1, action:bid} --- { lane: 2, action: start} --- { lane:1, action:bid} --- {lane: 2, action:bid} --- {lane:1, action: sold} ---
我有以下缓冲每个拍卖通道并关闭销售缓冲区...
const bufferOpen$= auctionWebSocketStream$
.filter(stream => stream.tag === 'CURITEM');
const bufferClose$ = () => auctionWebSocketStream$.filter(stream => stream.tag === 'SOLD');
auctionWebSocketStream$
.bufferToggle(bufferOpen$, bufferClose$)
.subscribe(x => console.log(x));
只要有一个拍卖和一条车道,上述方法就可以正常工作。拥有多条车道,就有关于多条车道的出价/销售信息。
如何按通道将流聚合到缓冲区中?类似的解决方案总是具有已知的聚合参数。但只要有新车道,我就需要分流。
非常感谢您的帮助。
更新
我制作了一个 JSBin 来炫耀我的挫败感和无知。它提供了一个示例输入流并解释了所需的输出。
http://jsbin.com/tuxitev/edit?js,console
(对于奖励积分,它只显示 Babel 下的空数组。不知道为什么需要 Typescript)
【问题讨论】:
-
刚刚偶然发现 groupBy... 我想我要“抱歉浪费大家的时间”了。
-
你有固定数量的车道吗?一块土地可以重新拍卖吗?
-
它没有修复,它可以重新打开。例如。拍卖 1,通道 A、B、C --- 拍卖 2,通道 A、B --- 拍卖 1,通道 D --- 拍卖 1,通道 A 完成 --- 拍卖 3,通道 A
-
那么当它重新打开时,你会再次订阅吗?
-
是的,需要重新订阅。这有点极端,如果它引起很多问题,我可以选择非重新订阅的版本。