【发布时间】:2018-03-28 14:22:43
【问题描述】:
我在线程类中有一个 websocket,调用 on_message 函数 send_conf() 上的方法
但是并行线程上的方法也每 10 秒调用一次相同的函数
我尝试在 asyncio 中使用 run_in_executor,但我收到“事件循环已在运行 " 错误。
在不阻塞的情况下同时从 websocket.on_message 和并行线程调用此 send_conf() 函数的最佳方法是什么?
import websocket, threading, json, base64
import time, requests, threading, asyncio
from concurrent.futures import ThreadPoolExecutor
class Count(threading.Thread):
def __init__(self, apiKey=None, apiSecret=None, curl=None, wsURL=None):
threading.Thread.__init__(self)
self.loop = asyncio.get_event_loop()
self.apiSecret = apiSecret
self.apiKey = apiKey
self.curl = curl
self.wsURL = wsURL
self.executor = ThreadPoolExecutor(5)
self.session = requests.Session()
self.session.headers.update({'user-agent': 'Stacked'})
self.session.headers.update({'content-type': 'application/json'})
self.session.headers.update({'accept': 'application/json'})
self.confirmation = send_conf(0)
def curl(self, path, query=None, postdict=None, method=None):
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.async_curl(path, query, postdict, method))
async def async_curl(self, path, query=None, postdict=None, method=None):
loop = asyncio.get_event_loop()
URL = self.curl + '/api/v1/' + path
req = requests.Request(method, URL, json=postdict, params=query)
prepped = self.session.prepare_request(req)
def do_prepped():
return self.session.send(prepped, timeout=20)
response = await loop.run_in_executor(self.executor, do_prepped)
return response
def send_conf(self, param):
METHOD = 'POST'
LINK = 'conf'
return self.curl(LINK, postdict=param, method=METHOD)
def active_patching(self, time_period):
while self.ws.keep_running:
x = 2 + 2
self.send_conf(x)
time.sleep(time_period)
def run(self):
def on_message(ws, message):
if len(message) > 10:
self.send_conf(message['stat'])
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
exit()
def on_open(ws):
args = []
# args.append()
args.append("activity")
request = {"operation": "subscribe", "args": args}
self.ws.send(json.dumps(request))
print(request)
self.acpt = threading.Thread(target=lambda: self.active_patching(10))
self.acpt.daemon = True
self.acpt.start()
def exit():
self.exited = True
self.ws.close()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(self.wsURL,
on_message = on_message,
on_error = on_error,
on_close = on_close,
on_open = on_open)
self.ws.keep_running = True
self.ws.run_forever(ping_interval=30, ping_timeout=10)
self.ws.keep_running = False
【问题讨论】:
标签: python python-3.x asynchronous async-await python-asyncio