【问题标题】:Is there a way to use asyncio.Queue in multiple threads?有没有办法在多个线程中使用 asyncio.Queue ?
【发布时间】:2015-10-01 13:58:54
【问题描述】:

假设我有以下代码:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

这段代码的问题是async协程内部的循环永远不会完成第一次迭代,而queue的大小正在增加。

为什么会发生这种情况,我可以做些什么来解决它?

我无法摆脱单独的线程,因为在我的真实代码中,我使用单独的线程与串行设备通信,而我还没有找到使用asyncio 的方法。

【问题讨论】:

  • "I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device" -- 您是否尝试过使用loop.run_in_executor 与串行设备进行任何阻塞交互?

标签: python python-3.x python-asyncio


【解决方案1】:

asyncio.Queueis not thread-safe,所以你不能直接从多个线程中使用它。相反,您可以使用janus,这是一个提供线程感知asyncio 队列的第三方库:

import asyncio
import threading
import janus

def threaded(squeue):
    import time
    while True:
        time.sleep(2)
        squeue.put_nowait(time.time())
        print(squeue.qsize())

@asyncio.coroutine
def async(aqueue):
    while True:
        time = yield from aqueue.get()
        print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()

还有aioprocessing(完全披露:我写的),它也提供进程安全(并且作为副作用,线程安全)队列,但如果你不想这样做,那就太过分了使用multiprocessing

编辑

正如在其他答案中指出的那样,对于简单的用例,您也可以使用 loop.call_soon_threadsafe 添加到队列中。

【讨论】:

  • NameError: name 'async' is not definedasyncio.Task(async(queue.async_q)) 给出。我该怎么办?
  • @StamKaly 抱歉,请使用asyncio.async,或者更好的是asyncio.ensure_future,因为asyncio.async 现在已被弃用。
【解决方案2】:

BaseEventLoop.call_soon_threadsafe 就在眼前。详情请见asyncio doc

只需像这样更改您的threaded()

def threaded():
    import time
    while True:
        time.sleep(1)
        loop.call_soon_threadsafe(queue.put_nowait, time.time())
        loop.call_soon_threadsafe(lambda: print(queue.qsize()))

这是一个示例输出:

0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943

【讨论】:

    【解决方案3】:

    如果您不想使用其他库,您可以从线程中调度协程。将queue.put_nowait 替换为以下内容可以正常工作。

    asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)
    

    变量loop代表主线程中的事件循环。

    编辑:

    你的 async 协程没有做任何事情的原因是 事件循环永远不会给它这样做的机会。队列对象是 不是线程安全的,如果您深入研究 cpython 代码,您会发现 这意味着put_nowait 通过以下方式唤醒队列的消费者 通过事件循环的call_soon 方法使用未来。如果 我们可以让它使用call_soon_threadsafe 它应该可以工作。专业 但是,call_sooncall_soon_threadsafe 之间的区别是 call_soon_threadsafe 通过调用 loop._write_to_self() 唤醒事件循环。所以让我们自己称它为:

    import asyncio
    import threading
    
    queue = asyncio.Queue()
    
    def threaded():
        import time
        while True:
            time.sleep(2)
            queue.put_nowait(time.time())
            queue._loop._write_to_self()
            print(queue.qsize())
    
    @asyncio.coroutine
    def async():
        while True:
            time = yield from queue.get()
            print(time)
    
    loop = asyncio.get_event_loop()
    asyncio.Task(async())
    threading.Thread(target=threaded).start()
    loop.run_forever()
    

    然后,一切都按预期进行。

    关于线程安全方面 访问共享对象,asyncio.queue 在后台使用 collections.deque 具有线程安全的 appendpopleft。 也许检查队列不是空的并且 popleft 不是原子的,但是如果 您仅在一个线程中使用队列(事件循环之一) 应该没问题。

    其他建议的解决方案,来自华作的loop.call_soon_threadsafe 高的回答和我的asyncio.run_coroutine_threadsafe只是在做 这个,唤醒事件循环。

    【讨论】:

    • 这对我很有效。我测试过,它在线程和 courutine 之间共享消息。
    【解决方案4】:

    只使用 threading.Lock 和 asyncio.Queue 怎么样?

    class ThreadSafeAsyncFuture(asyncio.Future):
        """ asyncio.Future is not thread-safe
        https://stackoverflow.com/questions/33000200/asyncio-wait-for-event-from-other-thread
        """
        def set_result(self, result):
            func = super().set_result
            call = lambda: func(result)
            self._loop.call_soon_threadsafe(call)  # Warning: self._loop is undocumented
    
    
    class ThreadSafeAsyncQueue(queue.Queue):
        """ asyncio.Queue is not thread-safe, threading.Queue is not awaitable
        works only with one putter to unlimited-size queue and with several getters
        TODO: add maxsize limits
        TODO: make put corouitine
        """
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.lock = threading.Lock()
            self.loop = asyncio.get_event_loop()
            self.waiters = []
    
        def put(self, item):
            with self.lock:
                if self.waiters:
                    self.waiters.pop(0).set_result(item)
                else:
                    super().put(item)
    
        async def get(self):
            with self.lock:
                if not self.empty():
                    return super().get()
                else:
                    fut = ThreadSafeAsyncFuture()
                    self.waiters.append(fut)
            result = await fut
            return result
    

    另请参阅 - asyncio: Wait for event from other thread

    【讨论】:

    • 我喜欢这个想法,我有一个类似的情况,我想要一个来自不同线程的阻塞,然后是异步事件循环。请注意,您的 sn-p 无法正确处理 job_complete。如果正在等待的协程调用task_done(),队列可能会上升,因为最初从未通知它有项目在队列中(它被直接放入未来)。
    猜你喜欢
    • 2021-07-05
    • 1970-01-01
    • 2021-12-28
    • 1970-01-01
    • 1970-01-01
    • 2022-01-25
    • 1970-01-01
    • 2011-07-31
    • 2018-12-16
    相关资源
    最近更新 更多