【问题标题】:ExpressJS queue requestsExpressJS 队列请求
【发布时间】:2022-02-17 17:04:04
【问题描述】:

我希望对 ExpressJs 的请求进行排队,以便一次在端点上只处理一个请求。我发现了一些这样的例子:ExpressJS backend put requests into a queue

似乎它们需要为每个端点提供一个单独的函数。我正在尝试创建一个函数,该函数允许我传递队列名称,然后在指定队列中堆叠项目。

它是一个监听请求的 API,在收到请求后,将向另一个在线 API 执行请求,然后通过原始 express 端点将结果转发回用户。最终,我会为每个端点添加一些非常基本的缓存,只是为了在过期前存储一个简短的 JSON 字符串 3 秒。这样它会在 3 秒的限制内返回缓存的字符串,而不是再次从在线获取数据。

据我所知,我很想知道是否有更好的方法:

//UI request -> check cache -> return response || call request then return response

// Queue items on endpoint
class QueueUnique {
  func;
  q;
  requestCache = [];

  constructor(func) {
    this.q = Promise.resolve();
    this.func = func;
  }

  add(request) {
    // Fetch all cached items related to the current endpoint queue
    const cachedItem = this.requestCache.find(
      (itm) => itm.queueName === request.queueName
    );
    // If the current request is within X seconds of the last successful requesst, return the cache
    // otherwise make a new request
    if (cachedItem && new Date().getTime() - cachedItem.runtime > 3000) {
      console.log(
        "Cache is over 3 seconds old. Doing new request. Queue name: " +
          request.queueName
        // no cahe, forward request:
        //seperate this in to function
        //res.sendResponse = res.send
        // res.send = (body) => {
        //  request.body = body
        //  this.updateCache(request);
        // res.sendResponse(body)
        //}
        //next()
      );
      this.updateCache(request);
    } else if (cachedItem) {
      console.log("Valid cache, return cache");
      // res.send(request.body)
      this.updateCache(request);
    } else {
      console.log("no cache");
      //continue as normal as if no cache
      // no cahe, forward request: Same as first run
      this.addToCache(request);
    }

    // Do I need to use await before setting datetime?

    //then cache

    // Set the current time as a value in the item Array
    request.runtime = new Date().getTime();

    const queuedFunc = this.queue(request);
    queuedFunc();
  }

  addToCache(request) {
    // Add the new item to the permanent cache
    this.requestCache.push(request);
  }

  updateCache(request) {
    // Update the permanent request cache entry
    const arrayIndex = this.requestCache.findIndex(
      (itm) => itm.queueName === request.queueName
    );
    this.requestCache[arrayIndex] = request;
  }

  queue(item) {
    return () => {
      this.q = this.q
        .then(() => this.func(item))
        .catch((err) => {
          console.log(err);
        });
      return this.q;
    };
  }
}

const response = (item) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log("say", item.payload);
      resolve();
    }, item.delay);
  });
};

const queue = [];
function test(bar, payload) {
  if (!queue[bar]) {
    queue[bar] = new QueueUnique(response);
  }
  queue[bar].add(payload);
  console.log(queue);
  return queue;
}

test("te", {
  queueName: "ping",
  payload: "one",
  delay: 3000,
});

test("te", {
  queueName: "ping",
  payload: "one",
  delay: 3000,
});

test("te", {
  queueName: "ping",
  payload: "one",
  delay: 3000,
});

test("te2", {
  queueName: "ping",
  payload: "two",
  delay: 1000,
});

test("te2", {
  queueName: "ping",
  payload: "two",
  delay: 1000,
});

【问题讨论】:

    标签: java typescript express


    【解决方案1】:

    这是我现在所拥有的。很高兴听到有关改进或问题的信息。它被转换为 Typescript,因为它是我最终将要使用的。

    import queueCache from './middleware/queueCache'
    
    ...
    
    app.locals.cacheTimeout = 3000
    
    app.get('/test', queueCache, (_req, res) => {
      res.json({ test: 'test' })
    })
    
    app.get('/test1', queueCache, (_req, res) => {
      res.json({ test1: 'test1' })
    })
    

    queueCache.ts:

    import { Request, Response, NextFunction } from 'express'
    
    interface requestData {
      queueName?: string
      cachedData?: string
      runtime?: number
    }
    
    const cachedItemList = {} as Array<requestData>
    const queue = [] as Array<QueueUnique>
    const requestCache = [] as Array<requestData>
    
    // Class to queue items and cache results
    class QueueUnique {
      func
      q: Promise<unknown>
    
      constructor(
        func: (req: Request, res: Response, next: NextFunction) => Promise<unknown>
      ) {
        this.func = func
        this.q = Promise.resolve()
      }
    
      add(req: Request, res: Response, next: NextFunction) {
        // Check if the item is already cached
        if (checkCache(req, res)) {
          return
        }
    
        // If not cached, add to queue
        const queuedFunc = this.queue(req, res, next)
        queuedFunc()
      }
    
      queue(req: Request, res: Response, next: NextFunction) {
        return () => {
          this.q = this.q
            .then(() => this.func(req, res, next))
            .catch((err) => {
              console.log(err)
            })
          return this.q
        }
      }
    }
    
    const response = (req: Request, res: Response, next: NextFunction) => {
      // Do another check to see if item just finished in queue created a useful cache
      return new Promise((resolve) => {
        if (checkCache(req, res)) {
          resolve(true)
          return
        }
        setTimeout(() => {
          if (cachedItemList[0].queueName) {
            // Got this far, and cache exists so it must be older than set time, starting new request.
    
            // Return response to user
            res.sendResponse = res.json
            res.json = (body) => {
              res.sendResponse(body)
              // Find the current item in the request cache
              const arrayIndex = requestCache?.findIndex(
                (itm) => itm.queueName === req.url
              )
              // Set the time that the request was stored
              requestCache[arrayIndex].runtime = new Date().getTime()
              // Store the body of the response in the cache
              requestCache[arrayIndex].cachedData = body
              return res
            }
          } else {
            // There was no cache
    
            // Return the response to the caller
            res.sendResponse = res.json
            res.json = (body) => {
              res.sendResponse(body)
              // Only use cache on GET requests. When not GET, this middleware only acts as a queue.
              if (req.method === 'GET') {
                // Check if it is already in cache to avoid duplicates
                // Overcomes an error: https://github.com/expressjs/express/issues/4826
                const arrayIndex = requestCache?.findIndex(
                  (itm) => itm.queueName === req.url
                )
    
                if (arrayIndex === -1) {
                  requestCache.push({
                    cachedData: body, // Add the new item to the permanent cache
                    queueName: req.url, // Add the request URL to the item for later reference
                    runtime: new Date().getTime() // Add the time the request was made
                  })
                }
              }
              return res
            }
          }
          next()
          resolve(true)
        }, 4000)
      })
    }
    
    function checkCache(req: Request, res: Response) {
      // Fetch all cached items related to the current endpoint queue, which is named after the endpoint url
      cachedItemList[0] =
        requestCache.find((itm) => itm.queueName === req.url) || {}
    
      // If the current request is within X seconds of the last successful requesst, return the cached version
      if (
        cachedItemList[0].runtime &&
        new Date().getTime() - cachedItemList[0].runtime <
          req.app.locals.cacheTimeout
      ) {
        // Return the cached item to the user
        res.json(cachedItemList[0].cachedData)
        return true
      } else {
        return false
      }
    }
    
    // Create multipule queues, one for each endpoint
    function sortQueues(req: Request, res: Response, next: NextFunction) {
      // Use the endpoint name to create a queue within the queues array
      if (!queue[req.route.path]) {
        queue[req.route.path] = new QueueUnique(response)
      }
      queue[req.route.path].add(req, res, next)
    }
    
    export default sortQueues
    

    【讨论】:

      猜你喜欢
      • 2023-04-04
      • 2021-01-13
      • 2016-01-22
      • 1970-01-01
      • 2016-09-19
      • 1970-01-01
      • 2011-05-18
      • 1970-01-01
      • 2015-10-16
      相关资源
      最近更新 更多