【发布时间】:2019-04-29 22:16:56
【问题描述】:
我需要创建一个同时从 Web 套接字和管道接收的软件,并在另一个通道上发送消息(它从套接字接收,创建一个新线程并发送到管道。以同样的方式从管道,创建一个新线程并发送到套接字)。
我遇到了多线程问题,在程序启动时我必须启动方法socket_receiver 和pipe_receiver,但我只能启动pipe_receiver。我尝试删除所有代码并仅保留socket_receiver 和pipe_receiver,但它只输入pipe_receiver 的while True。
import asyncio
import sys
import json
from concurrent.futures.thread import ThreadPoolExecutor
import websockets
# make the Pool of workers
executor = ThreadPoolExecutor(max_workers=10)
# Make connection to socket and pipe
header = {"Authorization": r"Basic XXXX="}
connection = websockets.connect('wss://XXXXXXXX', extra_headers=header)
async def socket_receiver():
"""Listening from web socket"""
async with connection as web_socket:
while True:
message = await web_socket.recv()
# send the message to the pipe in a new thread
executor.submit(send_to_pipe(message))
async def pipe_receiver():
"""Listening from pipe"""
while True:
message = sys.stdin.readline()
if not message:
break
executor.submit(send_to_socket(message))
# jsonValue = json.dump(str(line), file);
sys.stdout.flush()
def send_to_pipe(message):
# Check if message is CAM or DENM
json_message = json.loads(message)
type = int(json_message["header"]["messageID"])
# 1 is DENM message, 2 is CAM message
if type == 1 or type == 2:
# send the message to the pipe
sys.stdout.print(json_message);
async def send_to_socket(message):
async with connection as web_socket:
json_message = json.dumps(message)
await web_socket.send(json_message)
asyncio.get_event_loop().run_until_complete(
asyncio.gather(socket_receiver(),pipe_receiver()))
这个程序被一个子进程调用,父进程通过连接到stdout和stdin的管道与其通信。
更新:我收到@Martijn Pieters 代码的异常
Traceback (most recent call last):
File "X", line 121, in <module>
main()
File "X", line 119, in main
loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
File "X\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 568, in run_until_complete
return future.result()
File "X", line 92, in connect_pipe
reader, writer = await stdio()
File "X", line 53, in stdio
lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 1421, in connect_read_pipe
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 433, in _make_read_pipe_transport
raise NotImplementedError
NotImplementedError
【问题讨论】:
-
websocket.connect()调用不应该在socket_receiver()协程内吗? -
project documentation 似乎确实表明了这一点。
-
请参阅Both pattern in the intro section of the
websocketsdocumentation,了解如何创建用于发送和接收的 websocket 连接。 -
我在
socket_receiver里面用过,但问题是一样的,而且我什至必须发送消息所以我应该在外面打开套接字 -
sys.stdout.print不是函数,您当前正在“打印”原始 Python 对象(不需要重新编码为 JSON 吗?)。您也不能只以非阻塞方式使用sys.stdout和sys.stdin。
标签: python python-3.x multithreading websocket python-asyncio