【问题标题】:RxJS: Producer-consumer with abortRxJS:带有中止的生产者-消费者
【发布时间】:2015-02-26 17:22:20
【问题描述】:

我在 RxJS 中有一个特殊的生产者消费者问题:生产者缓慢地生产元素。消费者正在请求元素,并且通常必须等待生产者。这可以通过压缩生产者和请求流来实现:

var produce = getProduceStream();
var request = getRequestStream();

var consume = Rx.Observable.zipArray(produce, request).pluck(0);

有时请求会中止。生成的元素只应在未中止的请求后使用:

produce:  -------------p1-------------------------p2--------->
request:  --r1--------------r2---------------r3-------------->
abort:    ------a(r1)------------------a(?)------------------>
consume:  ------------------c(p1, r2)-------------c(p2, r3)-->

第一个请求r1 将消耗第一个生成的元素p1,但r1 在消耗p1 之前被a(r1) 中止。 p1 在第二次请求 r2 时产生并被消耗 c(p1, r2)。第二个中止a(?) 被忽略,因为之前没有发生未响应的请求。第三个请求r3 必须等待下一个生成的元素p2,并且在生成p2 之前不会中止。因此,p2 在生成后立即被使用 c(p2, r3)

如何在 RxJS 中实现这一点?

编辑: 我在 jsbin 上创建了一个带有 QUnit 测试的example。您可以编辑函数createConsume(produce, request, abort) 来尝试/测试您的解决方案。

示例包含previously accepted answer的函数定义。

【问题讨论】:

  • 好问题。 Here 是我的尝试,但 @Brandon 的要好得多。
  • 顺便说一句。我注意到你的测试有一些小问题:1.你产生12,但期望:p1p2; 2.你们曾经交换过expectedactual
  • 对不起。我匆忙构建了这个测试,并以错误的方式分享了 jsbin 示例。因此,我后来在我自己的方法中不小心改变了它。我将元素更改为其原始值:产生:p1p2;请求:r1r2r3;中止:a1.

标签: javascript reactive-programming rxjs reactive-extensions-js


【解决方案1】:

这个(核心思想减去细节)通过了你的 JSBin 测试:

var consume = request
  .zip(abort.merge(produce), (r,x) => [r,x])
  .filter(([r,x]) => isNotAbort(x))
  .map(([r,p]) => p);

还有JSBin code

【讨论】:

  • 感谢您的解决方案,尽管它的行为不像我想要的那样。我编辑了我的问题,以便更清楚我想要什么样的行为。我编辑了图表,在 jsbin 上的测试示例/环境中添加了描述和 link。我在那个例子中使用了你的解决方案。它失败了,因为它跳过了第一个生成的元素,而选择了第二个元素,第二个元素被消耗了两次。
  • 更新了我的答案。这能解决你所有的情况吗? @MegaMuetzenMike
  • 聪明。我知道必须有一种方法来使用现有的运营商。
  • 它本质上是“请求与相应的[abort orproduce]结合,忽略结果何时中止”。
  • 它是如此简洁和优雅。
【解决方案2】:

我无法完全思考如何使用现有的操作员来做到这一点。以下是使用Observable.create() 的方法:

return Rx.Observable.create(function (observer) {
  var rsub = new Rx.SingleAssignmentDisposable();
  var asub = new Rx.SingleAssignmentDisposable();
  var psub = new Rx.SingleAssignmentDisposable();
  var sub = new Rx.CompositeDisposable(rsub, asub, psub);
  var rq = [];
  var pq = [];
  var completeCount = 0;
  var complete = function () {
    if (++completeCount === 2) {
      observer.onCompleted();
    }
  };
  var consume = function () {
    if (pq.length && rq.length) {
      var p = pq.shift();
      var r = rq.shift();
      observer.onNext('p' + p);
    }
  };

  rsub.setDisposable(request.subscribe(
    function (r) {
      rq.push(r);
      consume();
    },
    function (e) { observer.onError(e); },
    complete));

  asub.setDisposable(abort.subscribe(
    function (a) {
      rq.shift();
    },
    function (e) { observer.onError(e); }
  ));

  psub.setDisposable(produce.subscribe(
    function (p) {
      pq.push(p);
      consume();
    },
    function (e) { observer.onError(e); },
    complete));


  return sub;
});

http://jsbin.com/zurepesijo/1/

【讨论】:

  • 不错的方法。只是想确认一下:调用sub.dispose()会处理所有Rx.SingleAssignmentDisposables吗?
  • 是 CompositeDisposable 可让您将多个一次性用品组合成一个一次性用品。
【解决方案3】:

此解决方案会忽略未遵循未响应请求的中止:

const {merge} = Rx.Observable;

Rx.Observable.prototype.wrapValue = function(wrapper) {
    wrapper = (wrapper || {});
    return this.map(function (value) {
        wrapper.value = value;
        return wrapper;
    });
};

function createConsume(produce, request, abort) {
  return merge(
            produce.wrapValue({type: 'produce'}),
            request.wrapValue({type: 'request'}),
            abort.wrapValue({type: 'abort'})
         )
         .scan(
            [false, []],
            ([isRequest, products], e) => {
                // if last time the request was answered
                if (isRequest && products.length) {
                    // remove consumed product
                    products.shift();
                    // mark request as answered
                    isRequest = false;
                }
                if (e.type === 'produce') {
                    // save product to consume later
                    products.push(e.value);
                } else {
                    // if evaluated to false, e.type === 'abort'
                    isRequest = (e.type === 'request');
                }
                return [isRequest, products];
            }
         )
         .filter( ([isRequest, products]) => (isRequest && products.length) )
         .map( ([isRequest, products]) => products[0] ); // consume
}

Code 在 JSBin 的最新测试中。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-14
    • 2018-04-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多