【发布时间】:2018-06-01 04:37:56
【问题描述】:
我有一个请求对象流:
let request$ = new Subject();
我有一个函数,它接受一个请求对象和当前应用程序状态对象,并返回一个新状态对象流:
function stream(request, state) {
// Return a cold Observable of new state objects
}
当第一个请求发出时,我想调用stream 并传入该请求和一个初始状态对象,然后开始处理它返回的状态。
此后一段时间,发出第二个请求。第一个流需要停止并替换为通过再次调用stream 创建的新流,传入第二个请求和第一个流发出的最新状态。
我觉得也许scan 有一个更高阶的流可以做到,但我被困在这里:
request$.pipe(
scan((state$, request) => {
let previousState = ???
return stream(request, previousState);
}, of(initialState)),
switchAll()
)
编辑 - 示例
在现实生活中,stream 函数建立到服务器的 websockets 连接,发送请求中的部分请求和状态参数,然后在从服务器接收到消息时发出新的状态对象。
为了简化一个例子,假设request 和state 只是数字,而stream 函数只是重复地将request 添加到state,每秒发出一个结果:
function stream(request, state) {
return interval(1000).pipe(
map(i => state + (i+1) * request)
);
}
// e.g. stream(2, 10) will emit 12, 14, 16, 18, ...
期望的输出是:
initialState = 10
request$ 2------------5-----------------
stream(2, 10) --12--14--16-|
stream(5, 16) --21--26--31--36--
...
output --12--14--16---21--26--31--36--
注意在发出5 请求时,5 和最新状态16 都是构建下一个流所必需的。
【问题讨论】:
标签: rxjs