【问题标题】:How to return values ​from an asynchronous function that is executed in a thread in python?如何从python中线程中执行的异步函数返回值?
【发布时间】:2021-09-19 05:05:26
【问题描述】:

我是异步函数和线程的新手,我正在尝试返回从 Web 套接字获取的一系列值,以传递给正在执行同步代码的另一个线程。在代码中,我还使用了多 Web 套接字方法。下面我给你看代码:

"""
This code is designed to run an asynchronous loop
with asyncio in a separate thread. This allows mixing
a synchronous code with an asynchronous one. 
"""

import asyncio
from datetime import datetime
from threading import Thread
import websockets
from typing import Tuple, List, Iterable
import json
import time

URLS = [
    "wss://stream.binance.com:9443/ws/xrpusdt@kline_1m",
    "wss://stream.binance.com:9443/ws/btcusdt@kline_1m",
]

def start_background_loop(loop: asyncio.AbstractEventLoop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def IndividualSubscription(url: str):
    """An individual subscription to each WebSocket is created"""
    async with websockets.connect(url) as websocket:
        data = await websocket.recv()
        data = json.loads(data)

        print('\n', data)
           
        return data

async def Subscriptions(URLS: Iterable[str]):
    """All concurrent tickets are subscribed and all are combined
        in a single coroutine."""

    while True:
        task = [asyncio.create_task(SuscripcionIndividual(url)) for url in URLS]
        
        # All tasks are run in parallel
        await asyncio.gather(*tareas)
        #return tareas

def main():
    loop = asyncio.new_event_loop()
    t = Thread(target=start_background_loop, args=(loop,), daemon=True)
    t.start()

    task = asyncio.run_coroutine_threadsafe(Suscripciones(URLS), loop)
    for i in task.result():
        print(f"{i}")

    #return tareas

def function():
        for i in range(100):
            print("This is out of asynchronous ", i)
            time.sleep(1)

if __name__ == "__main__":
    main()
    T2 = Thread(target=function,)
    T2.start()

我尝试将return 放入异步代码中,但通过这样做,异步循环只运行一次,而不是像我期望的那样连续运行。另外,我已经尝试过 .result() 而不是 .create_task() 的方法。是否可以从异步函数中返回值?

【问题讨论】:

  • 你的 main 方法应该在调用 T2.start() 之后等待线程完成使用 T2.join() 并从你的 async 函数使用 r = await f() 获取返回值
  • 例如result = await asyncio.gather(...)return await asyncio.gather(...)

标签: python-3.x asynchronous websocket python-asyncio python-multithreading


【解决方案1】:

如果您想要同步和异步代码之间的互操作性,您需要设计一些不会阻塞线程运行异步代码的通信机制。队列通常用于线程之间的通信,janus 库实现了与运行异步代码的线程兼容的队列,它通过将同步队列接口暴露给同步代码并将异步队列接口暴露给异步代码来实现。

您的代码有点混乱,所以我清理它只是为了显示同步线程(主线程)和异步线程(运行 asyncio 循环的后台线程)之间的通信

import asyncio
from datetime import datetime
from threading import Thread
import websockets
from typing import Tuple, List, Iterable
import json
import time
import janus # pip install janus

URLS = [
    "wss://stream.binance.com:9443/ws/xrpusdt@kline_1m",
    "wss://stream.binance.com:9443/ws/btcusdt@kline_1m",
]

def start_background_loop(loop: asyncio.AbstractEventLoop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def IndividualSubscription(url: str):
    """An individual subscription to each WebSocket is created"""
    async with websockets.connect(url) as websocket:
        return json.loads(await websocket.recv())

async def Subscriptions(URLS: Iterable[str], results: asyncio.Queue):
    """All concurrent tickets are subscribed and all are combined
        in a single coroutine."""
    while True:
        tasks = [asyncio.create_task(SuscripcionIndividual(url)) for url in URLS]
        for task in await asyncio.gather(*tasks):
            await results.put(task.result())


def async_main(results: asyncio.Queue):
    asyncio.run(Subscriptions(URLS, results))


if __name__ == "__main__":
    results = janus.Queue(100) # max size of 100
    async_thread = Thread(target=async_main, args=(results.async_q,))
    async_thread.daemon = True # exit if main thread exits
    async_thread.start()
    while True:
        print(f"[sync thread] got result from async thread: {results.sync_q.get()}")

【讨论】:

    猜你喜欢
    • 2021-03-04
    • 1970-01-01
    • 1970-01-01
    • 2020-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-14
    相关资源
    最近更新 更多