【问题标题】:rxjs rate limit (requests per second) and concurrencyrxjs 速率限制(每秒请求数)和并发
【发布时间】:2017-06-11 08:29:44
【问题描述】:

我正在尝试弄清楚如何在 rxjs 中编写速率限制器。用于访问大多数 api(twitter、facebook 等)如果不支持开箱即用的方法,我会假设可以编写调度程序。例如 highland.js 有ratelimit。我不想丢弃任何项目,如窗口、样本等。

var source = Rx.Observable.create(function (observer) {

  // queue of requests
  _.each(requests, function(r) {
    observer.onNext(r);
  });

  observer.onCompleted();

  // Any cleanup logic might go here
  return function () {
    console.log('disposed');
  }
})
  // what goes here, if built in (e.g. 2 requests per 2 seconds or 15 request per 15 minutes)

// SHOULD ONLY RUN
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

编辑 1: 想想这样的事情,使用令牌桶算法,还是很粗糙但是……

Rx.Observable.prototype.tokenBucket = function(options, scheduler) {
  function time() {
    return new Date().getTime();
  }

  var BUCKET = {
    capacity: options.capacity || Infinity,
    left: options.capacity,
    last: time(),

    tokensPerInterval: options.tokensPerInterval,
    interval: options.interval
  };

  //var BUCKET = _.merge(defaultOptions, options);
  console.log(BUCKET);

  var source = this,
    scheduler = scheduler || (scheduler = Rx.Scheduler.timeout);

  return Rx.Observable.create(function(observer) {
    var d1 = source.subscribe(function(mainValue) {
      return throttle(mainValue);
    });

    function throttle(x, tokens) {
      if (BUCKET.capacity === Infinity) {
        return observer.onNext(x);
      } // return x;

      // the number of tokens to add every S milliseconds = (r*S)/1000.
      var self = BUCKET;
      var now = time();

      var deltaMS = Math.max(now - self.last, 0);
      self.last = now;
      var dripAmount = deltaMS * (self.tokensPerInterval / self.interval);
      self.left = Math.min(self.left + dripAmount, self.capacity);

      if (self.left < 1) {
        var interval = Math.ceil((1 - self.left) * self.interval);
        scheduler.scheduleWithRelative(interval, function (s, i) {
          return throttle(x);
        });
      } else {
        self.left -= tokens || 1;
        console.log('calling');
        return observer.onNext(x);
      }
    }

    return function() {
      d1.dispose();
      console.log('disposed tokenBucket');
    };
  });
};

var start = moment();
var source = Rx.Observable.range(1, 20)
  .tokenBucket({capacity: 2, tokensPerInterval: 2, interval: 2000})

var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); addToDom(x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

function addToDom(x) {
  var ul = document.getElementById('c');
  var li = document.createElement('li');
  li.innerHTML = x + ' - ' + moment().diff(start, 'seconds') + 's ago';
  ul.appendChild(li);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.10.3/moment.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.3/rx.all.js"></script>
<ul id="c"></ul>

【问题讨论】:

标签: rxjs


【解决方案1】:

如果您只想删除发生在两者之间的事件,可以使用 windowWithTimeOrCount + throttleFirst:

var subscription = source

 //Splits the events into 15 minute windows
 .windowWithTimeOrCount(900000 /*15 minutes*/, 15) 

 //Stops us from receiving more than one window in 15 minutes
 .throttleFirst(900000 /*15 minutes*/)

 //Flatten the observable
 .concatAll()
 .subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

工作示例(控制台输出):

var source = Rx.Observable.generateWithRelativeTime(
    0,
    function(x) { return x < 1000; },
    function(x) { return x + 1; },
    function(x) { return x; },
    function(x) { return Math.floor(Math.random() * 100); });


source
 .windowWithTimeOrCount(1000, 3)
 .throttleFirst(1000)
 .concatAll()
 .subscribe(console.log.bind(console));
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.3/rx.all.js"&gt;&lt;/script&gt;

备选方案 1

如果您不想删除任何值,也可以在管道上使用 controlled 以及特别滚动的 regulate 方法:

var subscription = source
  .windowWithTimeOrCount(900000, 15)
  //This will stop the loss of any events from the hot window
  .map(function(x) {
    var c = x.replay(),
        d = c.connect();
    //Shut down the connected observable when you are done.
    return Rx.Observable.using(function() {return d; },
                               function() {return c; });
  })
  //This will prevent more than one window from going through per interval
  //[See snippet]
  .regulate(900000)
  .concatAll()
  .subscribe(
    function (x) { console.log('onNext: %s', x); },
    function (e) { console.log('onError: %s', e); },
    function () { console.log('onCompleted'); });

工作示例(控制台输出):

Rx.Observable.prototype.regulate = function(interval, scheduler) {
  var source = this,
    scheduler = scheduler || (scheduler = Rx.Scheduler.timeout);

  return Rx.Observable.create(function(observer) {
    var controller = source.controlled(scheduler),
      d = new Rx.SerialDisposable();

    function nextSample(x) {

      //This will request a new value after our minimum interval expires
      d.setDisposable(scheduler.scheduleWithRelative(interval, function(s) {
        return controller.request(1);
      }));

      observer.onNext(x);
    }

    return new Rx.CompositeDisposable(
      d,
      controller.subscribe(nextSample,
        observer.onError.bind(observer),
        observer.onCompleted.bind(observer)),
      controller.request(1));

  }, source);

};


var subscription = Rx.Observable.generateWithRelativeTime(
    0,
    function(x) {
      return x < 100;
    },
    function(x) {
      return x + 1;
    },
    function(x) {
      return x;
    },
    function(x) {
      return Math.floor(Math.random() * 200);
    })
  //Divides up windows by count and our interval time
  .windowWithTimeOrCount(2000, 15)
  //Necessary since the windows we receive are hot
  .map(function(x) {
    var c = x.replay();
    var d = c.connect();
    return Rx.Observable.using(function() {
      return d;
    }, function() {
      return c;
    });
  })
  //Only allows one window through every 2 seconds
  .regulate(2000)
  //Flatten everything out
  .concatAll()
  .subscribe(console.log.bind(console), console.error.bind(console), console.log.bind(console, "Finished!"));
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.3/rx.all.js"&gt;&lt;/script&gt;

【讨论】:

  • 我现在只是在看generateWithRelativeTime。我不想在处理队列中删除任何内容。真的只是想防止超过特定 api 的速率限制。
  • 如果您不想删除任何元素,也可以查看controlled
  • @dre,如果你不想放弃任何活动,你想用额外的东西做什么?等下一次开门?取决于您的下一个应用程序可能会开始堆积。或者您只是在某个时间间隔 ping 端点?
  • 是的,基本上是一个处理队列,所以你不想丢弃任何东西,但你必须遵守速率限制。
  • @dre 看看我的替代方案。我认为它可能更接近您正在寻找的东西。
【解决方案2】:

我在我的个人项目中遇到了一个非常相似的问题,并决定将可重用的解决方案发布为 npm 包https://www.npmjs.com/package/rx-op-lossless-throttle

http://www.g9labs.com/2016/03/21/lossless-rate-limiting-with-rxjs/ 不同,它不会强制延迟每个事件。

【讨论】:

    【解决方案3】:

    如果您不想丢失任何通知,可以使用缓冲区或其变体之一(使用时间/计数/时间或计数)。 它基本上将通知组合在一个数组中,并在以下情况下转发该数组:

    • 一个可观察的通知
    • 计时器到期
    • 已达到通知计数
    • 以上的组合。

    因此,您可以将通知缓冲在一个数组中,并且每分钟只接收一次,或者在 100 个通知到达时接收它。

    【讨论】:

    • 这只会让您在每个缓冲区关闭后处理项目。您将无法处理它们来的物品。它本身也无法解决问题,因为缓冲区要么太大,要么进来太快。
    • 这不是他想要的吗?可能我没听懂。
    • 没有汗水。根据最初的问题,OP 正在寻找一种不违反特定端点速率限制的方法。因此,例如,如果速率限制是 15 分钟内的 15 个请求,buffer 将等到 15 分钟结束后再发出(不理想),或者在填满时发出,然后开始处理下一个缓冲区,这可能会在 15 分钟结束前触发(并违反速率限制)。
    • 是的,这确实是硬币的另一面。非常有用的方法,只是我认为它对此没有帮助,也许我遗漏了一些东西,一个例子可以说明任何一种方式。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-09-12
    • 2018-11-11
    • 1970-01-01
    • 2017-06-13
    相关资源
    最近更新 更多