【问题标题】:Call blocking method from multiple methods从多个方法调用阻塞方法
【发布时间】:2018-03-28 14:22:43
【问题描述】:

我在线程类中有一个 websocket,调用 on_message 函数 send_conf() 上的方法

但是并行线程上的方法也每 10 秒调用一次相同的函数

我尝试在 asyncio 中使用 run_in_executor,但我收到“事件循环已在运行 " 错误。

在不阻塞的情况下同时从 websocket.on_message 和并行线程调用此 send_conf() 函数的最佳方法是什么?

import websocket, threading, json, base64
import time, requests, threading, asyncio
from concurrent.futures import ThreadPoolExecutor

class Count(threading.Thread):

    def __init__(self, apiKey=None, apiSecret=None, curl=None, wsURL=None):
        threading.Thread.__init__(self)
        self.loop = asyncio.get_event_loop()
        self.apiSecret = apiSecret
        self.apiKey = apiKey
        self.curl = curl
        self.wsURL = wsURL
        self.executor = ThreadPoolExecutor(5)

        self.session = requests.Session()

        self.session.headers.update({'user-agent': 'Stacked'})
        self.session.headers.update({'content-type': 'application/json'})
        self.session.headers.update({'accept': 'application/json'})


        self.confirmation = send_conf(0)


    def curl(self, path, query=None, postdict=None, method=None):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.async_curl(path, query, postdict, method))


    async def async_curl(self, path, query=None, postdict=None, method=None):
        loop = asyncio.get_event_loop()
        URL = self.curl + '/api/v1/' + path
        req = requests.Request(method, URL, json=postdict, params=query)
        prepped = self.session.prepare_request(req)
        def do_prepped():
            return self.session.send(prepped, timeout=20)
        response = await loop.run_in_executor(self.executor, do_prepped)
        return response




    def send_conf(self, param):
        METHOD = 'POST'
        LINK = 'conf'

        return self.curl(LINK, postdict=param, method=METHOD)



    def active_patching(self, time_period):
        while self.ws.keep_running:
            x = 2 + 2

            self.send_conf(x)

            time.sleep(time_period)


    def run(self):

        def on_message(ws, message):
            if len(message) > 10:
                self.send_conf(message['stat'])

        def on_error(ws, error):
            print(error)

        def on_close(ws):
            print("### closed ###")
            exit()

        def on_open(ws):
            args = []
            # args.append()
            args.append("activity")
            request = {"operation": "subscribe", "args": args}
            self.ws.send(json.dumps(request))
            print(request)
            self.acpt = threading.Thread(target=lambda: self.active_patching(10))
            self.acpt.daemon = True
            self.acpt.start()

        def exit():
            self.exited = True
            self.ws.close()


        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        websocket.enableTrace(True)
        self.ws = websocket.WebSocketApp(self.wsURL,
                                  on_message = on_message,
                                  on_error = on_error,
                                  on_close = on_close,
                                  on_open = on_open)

        self.ws.keep_running = True
        self.ws.run_forever(ping_interval=30, ping_timeout=10)
        self.ws.keep_running = False

【问题讨论】:

    标签: python python-3.x asynchronous async-await python-asyncio


    【解决方案1】:

    让我们从定义调用requests的普通同步方法开始:

    def curl(self, path, query=None, postdict=None, method=None):
        URL = self.curl + '/api/v1/' + path
        req = requests.Request(method, URL, json=postdict, params=query)
        prepped = self.session.prepare_request(req)
        return self.session.send(prepped, timeout=20)
    
    def send_conf(self, param):
        return self.curl('conf', postdict=param, method='POST')
    

    显然,您不能从异步回调或协程中调用其中任何一个,因为它们会阻塞事件循环。要从 asyncio 安全地调用此类同步函数(以及相关代码,例如 websockets 回调),请使用 loop.run_in_executor

    async def some_coroutine(self):
        loop = asyncio.get_event_loop()
        resp = await loop.run_in_executor(None, self.send_conf, param)
    

    在幕后,asyncio 会将send_conf 提交到线程池,暂停您的协程,并在send_conf 在单独的线程中运行时继续为其他协程提供服务。当send_conf 完成时,你的协程将被唤醒,响应在手。

    另一方面,如果您想从另一个线程调用curlsend_conf只需调用它们。是的,它们会阻塞该特定线程直到它们完成,但这不会影响 asyncio 事件循环。

    除此之外,其余答案涉及如何改进架构,以便您根本不需要其他线程。


    从 asyncio 线程内部启动 async def 协程,例如从 websockets 回调中,您不需要 run_until_complete - 事实上,如果您尝试使用它,您会从问题中得到“事件循环已在运行”错误。相反,您只需致电loop.create_task(self.some_coroutine())

    这意味着您不需要从threading.Thread 继承或生成自己的线程来在后台运行某些东西。如上所示,asyncio 已经 允许您使用异步函数编写代码,这些函数看起来像在单独的线程中运行的顺序代码,但没有多线程的陷阱。例如,实现active_patching 的惯用方式是使用协程:

    async def active_patching(self, time_period):
        loop = asyncio.get_event_loop()
        while self.ws.keep_running:
            x = 2 + 2
            await loop.run_in_executor(None, self.send_conf, x)
            await asyncio.sleep(time_period)
    
    def on_open(ws):
        # ...
        loop = asyncio.get_event_loop()
        loop.create_task(self.active_patching(10))
    

    线程的剩余使用隐藏在对run_in_executor 的调用之后,这使您可以轻松地对requests 进行阻塞调用,而不会干扰异步事件循环。如果你采用原生支持 asyncio 的 http 库,比如aiohttp,你就根本不需要run_in_executor

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-03-04
      • 2012-07-13
      • 2021-03-01
      • 1970-01-01
      • 2020-01-25
      相关资源
      最近更新 更多