【发布时间】:2015-02-26 17:22:20
【问题描述】:
我在 RxJS 中有一个特殊的生产者消费者问题:生产者缓慢地生产元素。消费者正在请求元素,并且通常必须等待生产者。这可以通过压缩生产者和请求流来实现:
var produce = getProduceStream();
var request = getRequestStream();
var consume = Rx.Observable.zipArray(produce, request).pluck(0);
有时请求会中止。生成的元素只应在未中止的请求后使用:
produce: -------------p1-------------------------p2--------->
request: --r1--------------r2---------------r3-------------->
abort: ------a(r1)------------------a(?)------------------>
consume: ------------------c(p1, r2)-------------c(p2, r3)-->
第一个请求r1 将消耗第一个生成的元素p1,但r1 在消耗p1 之前被a(r1) 中止。 p1 在第二次请求 r2 时产生并被消耗 c(p1, r2)。第二个中止a(?) 被忽略,因为之前没有发生未响应的请求。第三个请求r3 必须等待下一个生成的元素p2,并且在生成p2 之前不会中止。因此,p2 在生成后立即被使用 c(p2, r3)。
如何在 RxJS 中实现这一点?
编辑:
我在 jsbin 上创建了一个带有 QUnit 测试的example。您可以编辑函数createConsume(produce, request, abort) 来尝试/测试您的解决方案。
示例包含previously accepted answer的函数定义。
【问题讨论】:
-
好问题。 Here 是我的尝试,但 @Brandon 的要好得多。
-
顺便说一句。我注意到你的测试有一些小问题:1.你产生
1,2,但期望:p1,p2; 2.你们曾经交换过expected和actual。 -
对不起。我匆忙构建了这个测试,并以错误的方式分享了 jsbin 示例。因此,我后来在我自己的方法中不小心改变了它。我将元素更改为其原始值:产生:
p1和p2;请求:r1、r2和r3;中止:a1.
标签: javascript reactive-programming rxjs reactive-extensions-js