【问题标题】:implement a queueing system with a fixed stack in RxJs在 RxJs 中实现具有固定堆栈的排队系统
【发布时间】:2020-06-27 09:41:49
【问题描述】:

假设我想要一个队列,其中任何时候只异步处理 3 个项目,我该怎么做?

这就是我的意思:如果我有一组项目要上传到后端,即将一些人工制品上传到云存储,然后创建/更新一个文档以反映每个人工制品的 url 和我不想:

  1. 在下一个上传操作之前异步/等待每个上传操作 - 因为这会很慢
  2. 同时发送所有内容 - 这可能导致写入热点或速率限制
  3. 做一个 promise.race - 这最终导致 (2)
  4. 做一个 promise.all - 如果有一个长时间运行的上传,这个过程会变慢。

而我想做的是:

  1. 拥有所有上传的队列,例如使用 RxJs 创建方法from(array-of-upload-items) 随时处理一叠 3 个项目。
  2. 当一个项目离开堆栈(即完成)时,我们将一个新项目添加到队列中
  3. 确保在任一时刻,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈。

我将如何使用 RxJs 来解决这个问题?

编辑:2020 年 6 月 27 日

这是我的想法:

 const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so

      rxQueue
        .pipe(
          mergeMap((item) =>
            of(item).pipe(
              tap(async (item) => {
                  await Promise.race([
                      processUpload(item[0]),
                      processUpload(item[1]),
                      processUpload(item[2]),
                  ])
              }),
            ),
            3
          ),
        )
        .subscribe()

目标是确保在任何时候都处理(上传)3 个文件,以至于如果一个文件上传过程结束,则添加另一个文件以使堆栈保持在 3 个上传过程中。同理,如果 2 个文件上传同时结束,则将 2 个新文件添加到堆栈中,以此类推,直到文件数组中的所有文件都上传完毕。

【问题讨论】:

  • 你考虑过mergeMap的第三个论点吗? mergeMap(() => obs$, null, 3)
  • 感谢@AndreiGătej 我感谢您的快速响应。有关更多上下文,请参阅问题中的编辑/更新。

标签: javascript rxjs queue rxjs6


【解决方案1】:

我想你可以试试这个:

from(filesArray)
  .pipe(
    mergeMap(file => service.uploadFile(file), 3)
  )

这假设 service.uploadFile 返回一个 promise 或 observable。

假设您有 5 个文件,那么将从前 3 个文件中创建 3 个可观察对象,当其中一个完成时,将获取第 4 个文件并从中创建一个新的可观察对象,依此类推。

【讨论】:

    【解决方案2】:

    使用Subject 作为队列,使用mergeMap 有一个并发参数,您可以限制最大并发数

    const queue=new Subject()
    queque.asObservable().pipe(mergeMap(item=>httpCall(item),3)
    queue.next(item)
    

    【讨论】:

    • 感谢@FanCheung 我感谢您的快速响应。有关更多上下文,请参阅问题中的编辑/更新。
    • 非常感谢这个答案,它完全按照我的需要解决了。请允许我选择@AndreiGătej 回复作为答案,因为他最初的回复首先出现并且是正确的,只是我当时并没有真正理解他的意思。在选择他的回复作为答案之前,我将等待您的确认。非常感谢
    • 无忧无虑
    猜你喜欢
    • 1970-01-01
    • 2016-03-10
    • 2011-08-05
    • 2014-07-21
    • 2021-07-08
    • 2011-04-19
    • 2021-12-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多