【发布时间】:2014-02-22 11:44:34
【问题描述】:
我在 MongoDB 中有一个非常大的集合(约 7M 项),主要由具有三个字段的文档组成。
我希望能够以一种方便的方式迭代其中一个字段的所有唯一值。
目前,我只查询该字段,然后通过迭代游标来处理返回的结果以获得唯一性。这行得通,但它相当慢,我怀疑一定有更好的方法。
我知道 mongo 有 db.collection.distinct() 函数,但这受到最大 BSON 大小 (16 MB) 的限制,我的数据集超过了这个值。
有什么方法可以迭代类似于db.collection.distinct() 的东西,但是使用游标或其他方法,所以记录大小限制不是什么大问题?
我认为也许像 map/reduce 功能这样的东西可能适合这种事情,但我一开始并不真正了解 map-reduce 范式,所以我不知道我在做什么.我正在进行的项目部分是为了学习使用不同的数据库工具,所以我相当缺乏经验。
我正在使用 PyMongo,如果它是相关的(我不认为它是)。这应该主要依赖于 MongoDB。
例子:
对于这个数据集:
{"basePath" : "foo", "internalPath" : "Neque", "itemhash": "49f4c6804be2523e2a5e74b1ffbf7e05"}
{"basePath" : "foo", "internalPath" : "porro", "itemhash": "ffc8fd5ef8a4515a0b743d5f52b444bf"}
{"basePath" : "bar", "internalPath" : "quisquam", "itemhash": "cf34a8047defea9a51b4a75e9c28f9e7"}
{"basePath" : "baz", "internalPath" : "est", "itemhash": "c07bc6f51234205efcdeedb7153fdb04"}
{"basePath" : "foo", "internalPath" : "qui", "itemhash": "5aa8cfe2f0fe08ee8b796e70662bfb42"}
我想做的是迭代只是 basePath 字段。对于上述数据集,这意味着我将迭代 foo、bar 和 baz 各一次。
我不确定它是否相关,但我拥有的数据库是结构化的,因此虽然每个字段都不是唯一的,但所有三个字段的聚合都是唯一的(这是通过索引强制执行的)。
我当前正在使用的查询和过滤操作(注意:我将查询限制为项目的子集以减少处理时间):
self.log.info("Running path query")
itemCursor = self.dbInt.coll.find({"basePath": pathRE}, fields={'_id': False, 'internalPath': False, 'itemhash': False}, exhaust=True)
self.log.info("Query complete. Processing")
self.log.info("Query returned %d items", itemCursor.count())
self.log.info("Filtering returned items to require uniqueness.")
items = set()
for item in itemCursor:
# print item
items.add(item["basePath"])
self.log.info("total unique items = %s", len(items))
使用self.dbInt.coll.distinct("basePath") 运行相同的查询会得到OperationFailure: command SON([('distinct', u'deduper_collection'), ('key', 'basePath')]) failed: exception: distinct too big, 16mb cap
好的,这是我最终使用的解决方案。我会将其添加为答案,但我不想减损让我来到这里的实际答案。
reStr = "^%s" % fqPathBase
pathRE = re.compile(reStr)
self.log.info("Running path query")
pipeline = [
{ "$match" :
{
"basePath" : pathRE
}
},
# Group the keys
{"$group":
{
"_id": "$basePath"
}
},
# Output to a collection "tmp_unique_coll"
{"$out": "tmp_unique_coll"}
]
itemCursor = self.dbInt.coll.aggregate(pipeline, allowDiskUse=True)
itemCursor = self.dbInt.db.tmp_unique_coll.find(exhaust=True)
self.log.info("Query complete. Processing")
self.log.info("Query returned %d items", itemCursor.count())
self.log.info("Filtering returned items to require uniqueness.")
items = set()
retItems = 0
for item in itemCursor:
retItems += 1
items.add(item["_id"])
self.log.info("Recieved items = %d", retItems)
self.log.info("total unique items = %s", len(items))
与我以前的解决方案相比,一般性能大约是挂钟时间的 2 倍。在返回 834273 个项目的查询中,具有 11467 个唯一值:
原始方法(检索,填充到 python set 以强制唯一性):
real 0m22.538s
user 0m17.136s
sys 0m0.324s
聚合管道方法:
real 0m9.881s
user 0m0.548s
sys 0m0.096s
因此,虽然总体执行时间仅缩短了约 2 倍,但聚合管道在实际 CPU 时间方面的性能要高得多。
更新:
我最近重温了这个项目,重写了 DB 层以使用 SQL 数据库,一切都变得简单多了。复杂的处理管道现在是一个简单的SELECT DISTINCT(colName) WHERE xxx 操作。
实际上,MongoDB 和 NoSQL 数据库通常与我在这里尝试做的错误数据库类型相差很大。
【问题讨论】:
-
可能有一些示例数据?如果我们能看到我们正在尝试做的事情会有所帮助。
-
@NeilLunn - 这行得通吗?
-
那种。所以要弄清楚这一点。 “迭代”意味着,您试图将 ("basePath", "internalPath", "itemHash") 的 unique 值组合在一起。甚至限制说
foo炸毁了16MB 的限制。比方说,在aggregate 甚至。这意味着 result 大小。 -
好吧,我想查询唯一值,然后然后对其进行迭代(有没有一种方法可以在不超过非迭代的 BSON 限制的情况下检索数据? )。是的,即使限制为
basePath字段也超过了 16 MB 的限制。 -
澄清我的观点和指定答案的方向,您的 result 集是否可能大于 16MB,您认为 working 设置介于these tolerances 之间或者您已经尝试过吗?
标签: mongodb mongodb-query aggregation-framework pymongo