【问题标题】:How to bulk fetch model objects from database handled by django/sqlalchemy如何从 django/sqlalchemy 处理的数据库中批量获取模型对象
【发布时间】:2017-10-27 15:30:51
【问题描述】:

最近我遇到了以下问题:如何迭代真正的大数据查询以执行操作(例如为每个对象创建两个不同的对象)。 如果你处理一个小的查询集,这很简单:

for obj in Mymodel.objects.all():
    create_corresponding_entries(obj)

现在尝试在包含 900k 个对象的查询集中执行此操作。可能你的电脑会死机,因为它会耗尽所有内存。那么我怎样才能懒惰地做到这一点呢?无论您使用 Django ORM 还是 SQLAlchemy,都会出现同样的问题

【问题讨论】:

    标签: python django sqlalchemy lazy-loading


    【解决方案1】:

    虽然 Django ORM 提供了一个“惰性”查询集,但我一直在寻找的是一个 generator,它可以为我提供一种惰性获取对象的方法。 django 中的查询集并不是真正的惰性,它们在您尝试访问它们之前是惰性的,数据库将在其中访问并获取 1M 条目。 SQLAlchemy 也是如此。如果你有 oracle 或 postgre 数据库,你很幸运,你可以使用受支持的服务器端游标。如果您使用 mysqldb 或 pymysql 方言,SQLAlchemy 还支持这些以及 mysql。我不确定服务器端游标是如何在幕后工作的。

    更多信息

    因此,如果您不符合上述任何一种情况,您必须想办法懒惰地获取这些对象。因为 Django ORM 和 SQLAlchemy 都通过将其转换为纯 SQL 查询来支持切片,所以我想我可以使用自定义生成器来切片我需要的批量查询。

    免责声明:该解决方案试图解决在本地转储大量数据的问题,它不会尝试最大限度地提高查询性能或与数据库相关的任何性能。

    警告:与简单的 Mymodel.objects.all() 相比,这将导致对数据库的更多查询,但对 RAM 的挑战会更少。

    def lazy_bulk_fetch(max_obj, max_count, fetch_func, start=0):
        counter = start
        while counter < max_count:
            yield fetch_func()[counter:counter + max_obj]
            counter += max_obj
    

    然后以它为例:

    fetcher = lazy_bulk_fetch(50, Mymodel.objects.count(), lambda: Mymodel.objects.order_by('id'))
    for batch in fetcher:
        make_actions(batch)
    

    这将为每次迭代获取一个包含 50 个对象的列表,直到我达到我想要的最大计数。如果您在 django 中将 make_actions(batch) 更改为 print(batch.query),您将看到如下内容:

    SELECT "services_service"."id" FROM "services_service" LIMIT 50
    SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 50
    SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 100
    SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 150
    

    sliceSQLAlchemy supports 可以使用相同的概念。在这种情况下,解决方案是相同的,但您可以使用 SQLAlchemy 查询对象的 slice 函数来代替 python 切片

    编辑:据我所知,SQLAlchemy Query 类实现了__getitem__ 函数。 所以对于 SQLAlchemy,你可以使用我为 Django 建议的完全相同的函数。如果你想明确地使用 slice 函数,你最终会得到如下的结果:

    def lazy_bulk_fetch(max_obj, max_count, fetch_func, start=0):
        counter = start
        while counter < max_count:
            yield fetch_func().slice(counter, counter + max_obj)
            counter += max_obj
    

    无论如何你都会这样称呼它:

    from sqlalchemy import func
    fetcher = lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)), 
                              lambda: session.query(Mymodel).order_by(Mymodel.id))
    

    这里有两个注意事项:

    1. 您想使用func.count 以便将其转换为服务器中的COUNT SQL 语句。如果您使用len(session.query(Mymodel)),您将在本地转储所有内容,找到它的长度然后将其丢弃
    2. 我使用lambda,这样实现就像django 一样。我也可以有

      lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)), 
                      session.query(Mymodel).order_by(Mymodel.id)) 
      

      但是我必须在我的功能中拥有

      yield fetch_func.slice(counter, counter + max_obj)
      

    编辑#2:我添加了排序,否则您无法确定在第 N 次运行中不会得到相同的结果。订购保证您将获得独特的结果。最好将 id 作为排序键,否则你不能确定你错过了一个结果(因为在第 N 次命中期间,可能已经添加了一个新条目,并且没有 id 的排序可能会导致你错过它或得到双重条目)

    【讨论】:

    • 请注意,如果您的max_obj 小于条目总数,则此方法不能保证集合中的每个对象都将仅获取一次。这是因为这种切片方法不保证结果是有序的。这意味着如果您的批量大小比总对象数小很多,那么您很可能会多次检索同一个对象,而某些对象将永远不会被检索到。这可能不是人们想要的行为!
    • 你是对的。您需要包装 order_by 以确保获得独特的结果(此后偏移量将根据排序起作用)。例如在 Django 中:而不是 lazy_bulk_fetch(50, Mymodel.objects.count(), Mymodel.objects.all) 你应该使用 lazy_bulk_fetch(50, Mymodel.objects.count(), lambda: Mymodel.objects.order_by('id')) 而在 sqlalchemy lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)), lambda: session.query(Mymodel).order_by(Mymodel.id) 我错过了什么还是应该更新答案?
    • 这确实可以解决问题,尽管我不确定这是否是最有效的解决方案。当然,应该确保对于大型数据库,排序列上有一个索引。但至少这个解决方案现在应该是正确的。
    • 正如我在答案开头所说的那样,这个解决方案在数据库方面肯定不是有效的,但我发现它在后端方面非常有效(直到找到更有效的解决方案)。即使这是一个极端情况(尝试获取太多条目并不是通常会做的事情;他们使用过滤器并且获取更少)我发现这是唯一的方法,这样后端的内存就不会受到太大的挑战。感谢您的评论
    【解决方案2】:

    我不知道我是否误解了您的问题,或者答案是否在当前版本的 Django 之前,但对于 Django,请参阅:https://docs.djangoproject.com/en/dev/ref/models/querysets/#iterator

    for i in Mymodel.objects.iterator(chunk_size=2000):
        print(i)
    

    正如在某些数据库的文档中一样,它是通过 RDBMS 上的游标实现的,而其他一些数据库则使用了一些技巧。

    【讨论】:

    • 如果您想在 for 循环的每次迭代中获取模型实例的查询集并以这样的大小从数据库中获取它们,@JohnParaskevopoulos 接受的答案是合适的。如果您尝试逐个处理模型实例(在 for 循环的每次迭代中获取一个实例),但希望从数据库中批量获取它们,则您的答案是合适的。有一些相似之处,只需少量的附加代码就可以实现基本相同的行为。
    【解决方案3】:

    如果你将处理卸载到数据库(通过 Django ORM),整个操作可以在 3 个数据库调用中完成:

    1. 调用values_list 获取所有主键的列表。对于 900K 个 64 字节的密钥,它仍然应该只占用大约 56 MB 的内存,这不会给您的系统带来过大的压力。
    model_ids = MyModel.objects.values_list('id', flat=True)
    
    1. 现在,决定一次加载多少条目。如果你用values_list 的子集调用in_bulk,你可以在你的系统熟悉的块中处理它。对于所有条目,将 CHUNK_SIZE 设置为 len(model_ids)。 (“3 次数据库调用”注释仅在您使用 CHUNK_SIZE &gt; len(model_ids) 调用 in_bulk 时才成立。内存负载取决于 MyModel 的大小,并且 CPU 负载应该最小。)
    for counter in range(0, len(model_ids), CHUNK_SIZE):
         chunk = MyModel.objects.in_bulk(model_ids[counter:counter+CHUNK_SIZE])
    # Do whatever you wish with this chunk, like create the objects but in place.
    
    1. 最后一部分是创建其他对象的位置。这是使用bulk_create 的理想场所,这将使整个过程更加高效。即使您不使用in_bulkvalues_list,如果您要创建超过2-3 个对象,bulk_create 也会给您带来显着优势。 结合步骤 2 中的代码,您可以执行以下操作:
    objs_to_create = []
    for counter in range(0, len(model_ids), CHUNK_SIZE):
         chunk = MyModel.objects.in_bulk(model_ids[counter:counter+CHUNK_SIZE])
         # Populate the object(s), either directly or in loop, but using MyModel 
         # constructor, not ORM query. That is, use 
         # m = MyModel(..)
         # instead of 
         # m = MyModel.objects.create(..)
         # Append each of the created MyModel python objects to objs_to_create. Note 
         # that we have not created these objects in the database yet.
         # ...
         # Now create these objects in database using a single call
         MyModel.objects.create_bulk(objs_to_create)
         # Rinse and repeat
         objs_to_create = []
    

    不再有 CPU 挂起,您可以根据自己的喜好微调内存使用情况。

    【讨论】:

      【解决方案4】:

      基于@John Paraskevopoulos 对 Django ORM 的回答,我对其进行了一些调整以使其更通用:

      def bulkFetch(Cls,  batchSize: int = 100, start: int = 0, end: int = None, fetchFunc: Callable = None):
          '''
          Query Django model instances and retrieve the instances lazily in batches.
          Params:
          - Cls: the Django model class
          - batchSize: number of instances to yield each iteration
          - start: start number to yield from the queryset
          - end: end order number to yield from the queryset
          - fetchFunc: a function to retrieve instances. By default set to None: all model instances of the given class will be retrieved.
          '''
          counter = start
          maxCount = Cls.objects.count()
      
          if end is not None and end < maxCount:
              maxCount = end
      
          def defaultFetchFunc():
              qs = Cls.objects.order_by('pk')
              if end is None:
                  return qs
              else:
                  return qs[:end]
      
          if fetchFunc is None:
              fetchFunc = defaultFetchFunc
      
          while counter < maxCount:
              yield fetchFunc()[counter:counter+batchSize]
              counter += batchSize
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-02-14
        • 1970-01-01
        • 1970-01-01
        • 2023-04-09
        • 2010-09-19
        • 1970-01-01
        • 2011-07-11
        • 1970-01-01
        相关资源
        最近更新 更多