【问题标题】:How to know when a fetch ends without locking the body stream?如何在不锁定正文流的情况下知道提取何时结束?
【发布时间】:2020-09-14 22:00:00
【问题描述】:

我正在向 API 发出请求,但他们的服务器只允许一定数量的活动连接,因此我想限制正在进行的提取次数。就我而言,只有在 HTTP 响应正文到达客户端时才完成(而不是正在进行)提取。

我想创建一个这样的抽象:

const fetchLimiter = new FetchLimiter(maxConnections);
fetchLimiter.fetch(url, options); // Returns the same thing as fetch()

这会使事情变得简单得多,但似乎无法知道其他代码正在使用的流何时结束,因为流在​​被读取时被锁定。可以使用ReadableStream.tee() 将流一分为二,使用一个并将另一个返回给调用者(可能还用它构造一个Response),但这会降低性能,对吧?

【问题讨论】:

  • 你可以为你的调用者构造一个假的Response,你可以控制调用者将使用的流。
  • @Bergi 然后将真实的流通过管道传输到其中?听起来是个好主意,但我不确定它对性能的影响。
  • 我很确定它的性能会比tee()ing 流甚至clone()ing 响应更好。但可以肯定的是,如果您足够关心,就必须进行基准测试。
  • 另一种方法是将回调函数传递给应该处理响应的fetchLimiter,并等待其结果排队等待。这是处理异步“资源”使用的一种非常标准的方法。

标签: javascript fetch-api whatwg-streams-api


【解决方案1】:

由于fetch 使用承诺,您可以利用它来制作一个简单的队列系统。

这是我以前用于对基于 promise 的内容进行排队的一种方法。它通过创建 Promise 将项目排入队列,然后将其解析器添加到数组中。当然,在该 Promise 解决之前,await 会阻止任何后续的 Promise 被调用。

当一个完成时,我们所要做的就是抓住下一个解析器并调用它。承诺解决,然后fetch 开始!

最好的部分,因为我们实际上并没有消耗fetch 结果,所以不必担心必须clone 或任何东西......我们只是将它原封不动地传递,以便您可以在以后使用它@ 987654328@什么的。

*编辑: 因为在 fetch 承诺解决后正文仍在流式传输,所以我添加了第三个选项,以便您可以传入正文类型,并让 FetchLimiter 为您检索和解析正文.

这些都返回一个最终用实际内容解决的承诺。

这样您就可以让 FetchLimiter 为您解析正文。我这样做是为了让它返回一个[response, data] 的数组,这样你仍然可以检查响应代码、标题等内容。

就此而言,如果您需要执行更复杂的操作,例如根据响应代码解析正文的不同方法,您甚至可以传入回调或在该 await 中使用的其他内容。

示例

我添加了 cmets 来指示 FetchLimiter 代码的开始和结束位置...其余的只是演示代码。

它使用了一个假的fetch,使用了一个 setTimeout,它将在 0.5-1.5 秒之间解决。它将立即启动前三个请求,然后活动将被填满,并等待一个解决。

当这种情况发生时,您将看到承诺已解决的注释,然后队列中的下一个承诺将开始,然后您将看到来自 for 循环解析中的 then。我添加了 then 只是为了让您可以看到事件的顺序。

(function() {
  const fetch = (resource, init) => new Promise((resolve, reject) => {
    console.log('starting ' + resource);
    setTimeout(() => {
      console.log(' - resolving ' + resource);
      resolve(resource);
    }, 500 + 1000 * Math.random());
  });

  function FetchLimiter() {
    this.queue = [];
    this.active = 0;
    this.maxActive = 3;
    this.fetchFn = fetch;
  }
  FetchLimiter.prototype.fetch = async function(resource, init, respType) {
    // if at max active, enqueue the next request by adding a promise
    // ahead of it, and putting the resolver in the "queue" array.
    if (this.active >= this.maxActive) {
      await new Promise(resolve => {
        this.queue.push(resolve); // push, adds to end of array
      });
    }
    this.active++; // increment active once we're about to start the fetch
    const resp = await this.fetchFn(resource, init);
    let data;
    if (['arrayBuffer', 'blob', 'json', 'text', 'formData'].indexOf(respType) >= 0)
      data = await resp[respType]();
    this.active--; // decrement active once fetch is done
    this.checkQueue(); // time to start the next fetch from queue
    return [resp, data]; // return value from fetch
  };

  FetchLimiter.prototype.checkQueue = function() {
    if (this.active < this.maxActive && this.queue.length) {
      // shfit, pulls from start of array. This gives first in, first out.
      const next = this.queue.shift();
      next('resolved'); // resolve promise, value doesn't matter
    }
  }

  const limiter = new FetchLimiter();
  for (let i = 0; i < 9; i++) {
    limiter.fetch('/mypage/' + i)
      .then(x => console.log(' - .then ' + x));
  }
})();

注意事项:

  • 我不能 100% 确定当 Promise 解决时,body 是否仍在流式传输……这似乎是你的担忧。但是,如果这是一个问题,您可以使用像 blobtextjson 这样的 Body mixin 方法之一,在完全解析正文内容之前无法解决 (see here)

    李>
  • 我有意将其保持得很短(如 15 行实际代码)作为一个非常简单的概念证明。您希望在生产代码中添加一些错误处理,以便如果 fetch 由于连接错误或其他原因而拒绝,您仍然会减少活动计数器并启动下一个 fetch

  • 当然它也使用async/await 语法,因为它更容易阅读。如果您需要支持较旧的浏览器,则需要使用 Promises 重写或使用 babel 或等效项进行转换。

【讨论】:

  • 谢谢,但这不能解决问题。 “我不能 100% 确定当承诺解决时,身体是否还在流动……” 是的,除非身体是空的或真的很小。 fetch() 返回的承诺在标头到达后立即解决。 “但是,如果这是一个问题,您可以使用像 blobtextjson 这样的 Body mixin 方法之一,在完全解析正文内容之前无法解决”这就是问题所在,因为他们不能被多次调用。
  • 我编辑了我的答案。无论如何,您不会调用其中一种方法来解析正文吗?如果是这样,为什么不使用我上面建议的方法之一将该调用合并到 FetchLimiter 中。然后只需将解析后的正文数据与响应一起返回,如上。
猜你喜欢
  • 2013-04-12
  • 2020-11-01
  • 2013-02-01
  • 2021-05-01
  • 1970-01-01
  • 1970-01-01
  • 2015-03-22
  • 2011-03-27
相关资源
最近更新 更多