【问题标题】:RxJs buffer until database insert (promise)RxJs 缓冲区直到数据库插入(承诺)
【发布时间】:2017-10-19 11:30:32
【问题描述】:

我有一个数据流,其中包含快速传入的数据。我想通过保持顺序将它们插入数据库。我有一个数据库,它返回一个 Promise,当插入成功时解决。

我想做一个 Rx 流,它缓冲新数据,直到插入缓冲的数据。

我该怎么做?

【问题讨论】:

  • 还有什么问题?有bufferbufferTogglebufferWhen 运算符。
  • 问题是我不知道如何使用它们。想弄清楚,但还不知道怎么做。
  • 使用concatMap,从项目函数返回承诺。 concatMap 会为你做缓冲,但是 RxJS 没有背压,所以如果你的数据到达的速度比你写的快,你会耗尽内存。
  • 我也可以插入多条记录,这就是为什么缓冲区会很好,它将传入的数据收集到一个数组中,直到上一个承诺完成。

标签: promise rxjs buffer rxjs5


【解决方案1】:

我相信要获得您想要的内容,您需要创建自己的运营商。稍微脱离 RxJS,你可以得到类似(警告,尚未测试)...

export class BusyBuffer<T> {
  private itemQueue = new Subject<T>();
  private bufferTrigger = new Subject<{}>();
  private busy = false;

  constructor(consumerCallback: (items: T[]) => Promise<void>) {
    this.itemQueue.buffer(this.bufferTrigger).subscribe(items => {
      this.busy = true;
      consumerCallback(items).then(() => {
        this.busy = false;
        this.bufferTrigger.next(null);
      });
    });
  }

  submitItem(item: T) {
    this.itemQueue.next(item);
    if(!busy) {
      this.bufferTrigger.next(null);
    }
  }

}

然后可以用作

let busyBuffer = new BusyBuffer<T>(items => {
  return database.insertRecords(items);
});
items.subscribe(item => busyBuffer.submitItem(item));

虽然它并不完全是被动的,但有人可能会想出更好的东西。

【讨论】:

  • 谢谢!我创建了相同的,但我希望有人能想出一个纯粹的反应解决方案:)
  • 没有汗水,祝你好运。我正在考虑从数据库中获取某种忙/闲信号并将其通过管道传输回缓冲区方法,但是如果数据库不做任何事情,您还必须添加缓冲区应立即发出的逻辑。
  • 如果您想要更好的东西,请不要接受我的回答。我不介意。
猜你喜欢
  • 2023-03-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-12-07
  • 2022-11-11
  • 2014-12-22
相关资源
最近更新 更多