【问题标题】:Asyncio worker class to handle parallel jobs用于处理并行作业的 Asyncio 工作人员类
【发布时间】:2019-12-12 07:43:31
【问题描述】:

我找到了一个示例here,但我不明白如何使代码工作

class Worker:

    def __init__(self, func, n=3):
        self.func = func
        self.queue = asyncio.Queue()
        self.semaphore = asyncio.Semaphore(n)

    def put(self, *args):
        self.queue.put_nowait(args)

    async def run(self):
        while True:
            args = await self._get()
            if args is None:
                return
            asyncio.ensure_future(self._target(args))

    async def _get(self):
        get_task = asyncio.ensure_future(self.queue.get())
        join_task = asyncio.ensure_future(self.queue.join())
        await asyncio.wait(coros, return_when='FIRST_COMPLETED')
        if get_task.done():
            return task.result()

    async def _target(self, args):
        try:
            async with self.semaphore:
                return await self.func(*args)
        finally:
            self.queue.task_done()

我当时试过了:

def work(a1,a2): print('work done',a1,a2)
W = Worker(work,n=3)
W.put(1,2)
W.put(1,2)
W.put(1,2)
result = await W.run() # for Jupyter notebooks
# asyncio.run(W.run()) # for normal python

我得到错误:

NameError: name 'coros' is not defined

【问题讨论】:

    标签: parallel-processing queue python-asyncio


    【解决方案1】:

    我承认,链接的解决方案让我很困惑,而且它似乎没有用。所以,我只是重写了 Worker 类,希望它现在对你有用:

    import asyncio
    
    class Worker:
        def __init__(self, func, n=3):
            self.func = func
            self.queue = asyncio.Queue()
            self.semaphore = asyncio.Semaphore(n)
    
        def put(self, *args):
            self.queue.put_nowait(args)
    
        async def run(self):
            tasks = []
            while True:
                try:
                    args = self.queue.get_nowait()
                except asyncio.QueueEmpty:
                    break
                tasks.append(asyncio.ensure_future(self.do_work(args)))
            await asyncio.gather(*tasks)
    
        async def do_work(self, args):
            async with self.semaphore:
                await self.func(*args)
    

    这对我来说似乎是一种更简单的方法。基本上,这会将Worker.run 更改为只为队列中的每个项目启动一个任务,并且每个任务在调用提供的工作函数之前必须首先获取信号量。然后在所有工作完成后结束。

    用法如下:

    async def work(a1, a2):
        print("Starting work...", a1, a2)
        await asyncio.sleep(1)
        print("Finishing work...")
    
    
    W = Worker(work, n=3)
    W.put(1,2)
    W.put(3,4)
    W.put(5,6)
    W.put(7,8)
    W.put(9,10)
    asyncio.get_event_loop().run_until_complete(W.run())
    
    
    """Output
    Starting work... 1 2
    Starting work... 3 4
    Starting work... 5 6
    Finishing work...
    Finishing work...
    Finishing work...
    Starting work... 7 8
    Starting work... 9 10
    Finishing work...
    Finishing work...
    """
    

    需要注意的是,不能同时使用asyncio.run,同时使用asyncio.Semaphore,因为asyncio.run总是启动一个新循环,而asyncio.Semaphore(n)在调用asyncio.run之前将其循环设置为默认循环.这会导致信号量使用与Worker.run 不同的循环。

    所以只使用asyncio.get_event_loop.run_until_complete 可以正常工作,因为它使用默认循环(信号量所期望的)。

    【讨论】:

    • 非常感谢@ParkerD!!!是否也可以使用 4 个工作人员进行异步多处理,每个工作人员都有一个可以异步调用的队列?
    • 当然没有什么可以阻止您创建多个Worker 实例并异步调用它们的run 方法,只要从Workers 被初始化的同一个事件循环中调用run
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-11
    • 1970-01-01
    • 1970-01-01
    • 2019-01-18
    • 2021-08-22
    • 2013-07-11
    相关资源
    最近更新 更多