【问题标题】:Efficiently create new fields across a MongoDB collection using pymongo使用 pymongo 在 MongoDB 集合中高效地创建新字段
【发布时间】:2015-11-11 00:10:43
【问题描述】:

我有一个包含具有字段的文档的集合,称之为field1,我想在每个field1 条目上调用一个(复杂的)python 函数fxn,并将其存储在一个新的field2 中。我的集合非常大,fxn 需要几秒钟才能运行,所以我想在几个作业中并行处理。到目前为止,这是我的方法:

for i, entry in enumerate(collection.find().sort('_id')):
    if i % nJobs != jobID: continue
    field1 = entry['field1']
    field2 = fxn(field1)
    collection.update({'_id': entry['_id']}, {'$set': {'field2':field2})

其中nJobs 是作业总数,jobID 是当前作业的索引(例如,假设我并行运行此脚本 5 次,则 nJobs=5jobID 的范围为 0 到 4)

有没有更快或更可靠的方法来实现这一点?我宁愿把所有东西都保存在 python 中,因为fxn 需要保存在 python 中。

【问题讨论】:

    标签: mongodb pymongo


    【解决方案1】:

    您基本上需要使用 Bulk API 在 for 循环中,您可以利用写入命令 Bulk API 来执行批量更新操作,这些操作只是对服务器顶部,以便轻松构建批量操作。这些批量操作主要有两种形式:

    • 有序批量操作。这些操作按顺序执行所有操作,并在第一次写入错误时出错。
    • 无序批量操作。这些操作并行执行所有操作并聚合所有错误。无序批量操作不保证执行顺序。

    这非常有效,因为您不会向服务器发送“每个”请求,而是每 1000 个请求中发送一次,并且 api 实际上会在后台为您解决这个问题。注意,对于早于 2.6 的服务器,API 将下转换操作。但是,不可能进行 100% 的下转换,因此可能存在一些无法正确报告正确数字的极端情况。

    在非分片集群上实现此功能需要使用 snapshot 参数,以便您可以将查找光标与更新后再次看到相同的文档隔离开:

    bulk = db.collection.initialize_ordered_bulk_op()
    counter = 0;
    
    for entry in collection.find(snapshot = True):
        # process in bulk
        # calc field2 value first
        field2 = fxn(entry.field1)
        bulk.find({ '_id': entry._id }).update({ '$set': { 'field2': field2 } })
        counter++
    
        if ( counter % 1000 == 0 ):
            bulk.execute()
            bulk = db.collection.initialize_ordered_bulk_op()
    
    if (counter % 1000 != 0):
        bulk.execute()
    

    【讨论】:

    • 非常感谢!至于从jobIDnJobs 确定要处理的条目,你觉得我现在做的有什么问题吗?我还可以想象使用skiplimit 从光标中选择正确的条目。
    • 这完全取决于您的应用程序逻辑,后台的 Bulk API 批量执行写入操作,从而减少网络往返次数,提高写入吞吐量,因此您可能不需要过滤器但值得尝试使用微基准来查看差异。
    • 哦,我更需要过滤器,因为 fxn 运行时间而不一定是查询/更新的开销
    • 好吧,在那种情况下一定要包括它:)
    猜你喜欢
    • 1970-01-01
    • 2016-04-14
    • 2020-12-07
    • 2020-09-22
    • 1970-01-01
    • 2015-04-19
    • 2021-07-04
    • 2015-03-21
    • 1970-01-01
    相关资源
    最近更新 更多