【问题标题】:In the GraphQL reference implementation resolvers are expected to return an Iterable, how to return an asynchronous Iterable?在 GraphQL 参考实现中,resolver 期望返回一个 Iterable,如何返回一个异步 Iterable?
【发布时间】:2019-03-05 18:10:59
【问题描述】:

我正在使用Sequelize 访问我的关系数据库并在GraphQL 解析器中提供结果。 Sequelize 框架内的查询是异步执行的 (bluebird)。为了缓冲大型结果集并避免服务器上的高内存需求,例如请求了数百万条记录,我想在我的解析器中返回一个迭代器。考虑一下这个简化的要点:

// root resolver
function allPersons(...) {
  [...]
  return {
    nextId: 1,
    maxId: 10000000, 
    [Symbol.iterator]: () => { return this },
    next: function() {
      let nextRes = { done: true, value: null }
      if (this.nextId <= this.maxId) {
        nextRes.value = sequelize.models.person.findById(this.currId)
        nextRes.done = false
        this.nextId = this.nextId + 1
      }
      return nextRes
    }
}

上述方法有效,因为 Sequelize 构造的 Promise 返回为 next()value。当 this value-Promise 被解析时,它会从底层关系数据库中获取一条记录。因此,我同步构造异步数据获取。这只有效,因为每个单独的提取都独立于所有其他提取。特别是在执行下一个之前,不需要awaited 单个提取。但是,逐行获取关系数据库在技术上效率低下,实际上是一种反模式。因此,我想实现一个缓冲区,它可以获取 10k 行的批次,为它们提供服务,直到批次为空,然后再获取下一个。但是,由于随后引入了异步事件的依赖关系,要实现这一点,需要一个异步迭代器(Symbol.asyncIterator)。

我需要做什么才能使GraphQL's reference implementation(graphql-js 和/或 express-graphql)接受异步迭代器? 请注意,我想避免使用Apollo GraphQL

或者 Object-Stream 是一种可能的解决方案吗?

我们将不胜感激。

【问题讨论】:

    标签: node.js asynchronous graphql graphql-js


    【解决方案1】:

    半途而废:使用流并将其转换为同步迭代器

    由于 GraphQL 解析器预计会返回同步迭代器,因此流可用于将其数据馈送到此类迭代器中。考虑问题中发布的原始示例的以下解决方案。请注意,流行的 ORM Sequelize 不支持流,因此这里使用了另一个节点包 knex

    // Setup:
    const knex = require('knex')
    var dbCon = knex({
      client: 'pg',
      connection: {} // Define host, user, password, db (see knex docu)
    })
    
    // Get records as stream
    var peopleStream = dbCon.select('*').from('people').stream()
    
    // Serve stream within an synchronous iterator
    var iter = {
      [Symbol.iterator]: () => {
        return this
      },
      next: function() {
        let v = peopleStream.read() || null
        console.log(JSON.stringify(v)) // Check, if it works.
        return {
          done: v === null,
          value: v
        }
      }
    } 
    

    然而,这确实只是解决方案的一半,因为只有数据源才能以所示的方式使用来生成流 - 进而可以很容易地转换为同步迭代器,如此处所示。在我看来,GraphQL 的参考实现迫切需要支持异步迭代器作为解析器的结果值。详情请见this feature request

    【讨论】:

      【解决方案2】:

      GraphQL.js 在后台使用iterall。为了支持异步迭代,底层代码必须使用该库中的forAwaitEach 方法,而不是现在使用的forEach 方法。这可能是可能的,但我不确定它是否会破坏其他功能。

      如果您只想获取任意大小的块中的所有people,您不需要做任何特别花哨的事情:

      async function getAllPeople () {
        const chunkSize = 10000
        const startId = 1
        const endId = await sequelize.models.person.max('id')
        const people = []
      
        let lower = startId
        let upper = startId + chunkSize
      
        while (upper < (endId + 1)) {
          const chunk = await sequelize.models.person.findAll({
            where: {
              id: {
                [Op.and]: {
                  [Op.gte]: lower,
                  [Op.lt]: upper,
                }
              }
            },
          })
          people.push(chunk)
          lower = lower + chunkSize
          upper = upper + chunkSize
        }
      
        return people
      }
      

      编辑:要解决内存问题,您必须有效地将有效负载分解为多个响应,并有办法在客户端将它们重新组合在一起。在 Apollo 的路线图中有一个 @stream 指令可以做到这一点,我认为有些人已经尝试过它,但我想我们可能需要一段时间才能看到它的成熟实现。 @defer 具有类似的机制,目前由 Apollo 支持,但在解析器级别工作,因此在这种情况下并没有真正的帮助。

      您可以使用subscriptions 破解它,顺便说一下使用异步迭代器。您仍然可能需要使用查询或突变触发发送数据,但随后可以通过订阅将其发送给客户端。

      不幸的是,考虑到当前工具,我认为最简单的解决方案是对查询实施分页并让客户端将总结果拼凑在一起。

      【讨论】:

      • 亲爱的@Daniel Rearden,感谢您的快速回复和见解。但是,如果我没记错的话,我的内存负载问题似乎仍未解决。在将所有现有的person 记录返回给最终将它们写入 Http 响应的 GraphQL 框架之前,您的代码会不会用所有现有的 person 记录填充内存?使用异步迭代器,仅在其当前批次由其asyncnext 或执行相同操作的对象流提供服务时才加载块,确实可以解决我的问题,对吗?我如何让graphql-js 接受对象流或异步迭代器?
      • 我想象在现有架构下,整个响应需要存储在内存中才能发送。换句话说,据我所知,不可能按照您的要求进行。充其量,它可以分解成单独的响应,然后由客户拼凑在一起。有关更多详细信息,请参阅我上面的更新答案。
      • 亲爱的丹尼尔,非常感谢您的反馈。现在我们将使用经典分页。我希望 GraphQL 的参考实现将来能够实现对异步迭代器的支持。我猜我们不是唯一对此功能感到高兴的人。至于订阅,我是否正确假设它们尚未在参考 graphql-js 包中完全实现?我找不到任何关于它的文档,除了 2015 年的 this blog
      • GraphQL.js 实际上确实实现了订阅(文档通常有点缺乏)。但是,就像它没有提供与 HTTP 服务器集成的特定解决方案并且需要单独的包来执行此操作(apollo-serverexpress-graphql 等)一样,它也需要单独的包来提供传输订阅(以及支持它们的客户端!)。看看subscriptions-transport-ws
      猜你喜欢
      • 2017-08-25
      • 1970-01-01
      • 2021-01-20
      • 2011-07-26
      • 2011-06-12
      • 1970-01-01
      • 2019-10-08
      相关资源
      最近更新 更多