【问题标题】:Pymongo multiprocessingPymongo 多处理
【发布时间】:2017-04-27 12:56:16
【问题描述】:

我必须在 MongoDB 上进行大量插入和更新。

我正在尝试测试多处理来完成这些任务。为此,我创建了这个简单的代码。我的虚拟数据是:

documents = [{"a number": i} for i in range(1000000)]

没有多处理:

time1s = time.time()
client = MongoClient()
db = client.mydb
col = db.mycol
for doc in documents:
    col.insert_one(doc)
time1f = time.time()
print(time1f-time1s)

我有 150 秒。

通过多处理,我根据需要定义了以下工作函数,并在Pymongo's FAQs 中进行了描述。

def insert_doc(document):
    client = MongoClient()
    db = client.mydb
    col = db.mycol
    col.insert_one(document)

但是,当我运行我的代码时:

time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, documents)
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)

我收到一个错误:

pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno 99] 无法分配请求的地址

在引发错误之前,总共处理了 26447 个文档。这个错误在here 进行了解释,尽管遇到该错误的人没有使用多处理。解决方案是只打开一个 MongoClient,但是当我想做多处理时这是不可能的。有什么解决方法吗?感谢您的帮助。

【问题讨论】:

    标签: python mongodb pymongo


    【解决方案1】:

    您的代码为示例中的数百万个文档中的每一个创建一个新的 MongoClient(就像您链接到的问题一样)。这要求您为每个新查询打开一个新套接字。这破坏了 PyMongo 的连接池,除了速度极慢之外,这还意味着您打开和关闭套接字的速度比 TCP 堆栈能够跟上的速度快:您将太多的套接字留在 TIME_WAIT 状态,因此最终会耗尽端口。

    如果您为每个客户端插入大量文档,您可以创建更少的客户端,从而打开更少的套接字:

    import multiprocessing as mp
    import time
    from pymongo import MongoClient
    
    documents = [{"a number": i} for i in range(1000000)]
    
    def insert_doc(chunk):
        client = MongoClient()
        db = client.mydb
        col = db.mycol
        col.insert_many(chunk)
    
    chunk_size = 10000
    
    def chunks(sequence):
        # Chunks of 1000 documents at a time.
        for j in range(0, len(sequence), chunk_size):
            yield sequence[j:j + chunk_size]
    
    time2s = time.time()
    pool = mp.Pool(processes=16)
    pool.map(insert_doc, chunks(documents))
    pool.close()
    pool.join()
    time2f = time.time()
    print(time2f - time2s)
    

    【讨论】:

    • col.insert_many() 是这里的诀窍。 如果要遍历 chunk 并使用 col.insert(),那么它将用完 tcp_socket 并抛出 KeyError: 'pop from an empty set_get_socket_no_auth : sock_info, from_pool = self.sockets.pop(), True。其他选项是使用Bulk Write Operations
    • 仅将集合实例发送到多线程函数会向我提出 TypeError: can't pickle _thread.lock objects
    • 是否推荐全部使用 cpu_count() ?当服务器同时做其他工作时
    猜你喜欢
    • 2020-07-13
    • 2021-03-08
    • 1970-01-01
    • 2015-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-11
    相关资源
    最近更新 更多