【发布时间】:2018-09-12 08:55:15
【问题描述】:
我发布了一个与old 相关的新问题,以解决从队列中获取的问题。 这是代码(感谢 Martijn Pieters)
import asyncio
import sys
import json
import os
import websockets
async def socket_consumer(socket, outgoing):
# take messages from the web socket and push them into the queue
async for message in socket:
await outgoing.put(message)
file = open(r"/home/host/Desktop/FromSocket.txt", "a")
file.write("From socket: " + ascii(message) + "\n")
file.close()
async def socket_producer(socket, incoming):
# take messages from the queue and send them to the socket
while True:
message = await incoming.get()
file = open(r"/home/host/Desktop/ToSocket.txt", "a")
file.write("To socket: " + ascii(message) + "\n")
file.close()
await socket.send(message)
async def connect_socket(incoming, outgoing, loop=None):
header = {"Authorization": r"Basic XXX="}
uri = 'XXXXXX'
async with websockets.connect(uri, extra_headers=header) as web_socket:
# create tasks for the consumer and producer. The asyncio loop will
# manage these independently
consumer_task = asyncio.ensure_future(
socket_consumer(web_socket, outgoing), loop=loop)
producer_task = asyncio.ensure_future(
socket_producer(web_socket, incoming), loop=loop)
# start both tasks, but have the loop return to us when one of them
# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
# pipe support
async def stdio(loop=None):
if loop is None:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
await loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb'))
writer = asyncio.streams.StreamWriter(
writer_transport, writer_protocol, None, loop)
return reader, writer
async def pipe_consumer(pipe_reader, outgoing):
# take messages from the pipe and push them into the queue
while True:
message = await pipe_reader.readline()
if not message:
break
file = open(r"/home/host/Desktop/FromPipe.txt", "a")
file.write("From pipe: " + ascii(message.decode('utf8')) + "\n")
file.close()
await outgoing.put(message.decode('utf8'))
async def pipe_producer(pipe_writer, incoming):
# take messages from the queue and send them to the pipe
while True:
json_message = await incoming.get()
file = open(r"/home/host/Desktop/ToPipe.txt", "a")
file.write("Send to pipe message: " + ascii(json_message) + "\n")
file.close()
try:
message = json.loads(json_message)
message_type = int(message.get('header', {}).get('messageID', -1))
except (ValueError, TypeError, AttributeError):
# failed to decode the message, or the message was not
# a dictionary, or the messageID was convertable to an integer
message_type = None
file = open(r"/home/host/Desktop/Error.txt", "a")
file.write(" Error \n")
file.close()
# 1 is DENM message, 2 is CAM message
file.write("Send to pipe type: " + type)
if message_type in {1, 2}:
file.write("Send to pipe: " + json_message)
pipe_writer.write(json_message.encode('utf8') + b'\n')
await pipe_writer.drain()
async def connect_pipe(incoming, outgoing, loop=None):
reader, writer = await stdio()
# create tasks for the consumer and producer. The asyncio loop will
# manage these independently
consumer_task = asyncio.ensure_future(
pipe_consumer(reader, outgoing), loop=loop)
producer_task = asyncio.ensure_future(
pipe_producer(writer, incoming), loop=loop)
# start both tasks, but have the loop return to us when one of them
# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
task.result()
def main():
loop = asyncio.get_event_loop()
pipe_to_socket = asyncio.Queue(loop=loop)
socket_to_pipe = asyncio.Queue(loop=loop)
socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop)
pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop)
loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
main()
这段代码是父进程调用的子进程
subprocess.Popen(["python3", test], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)
问题是对象在队列中由socket_consumer(从套接字接收)但pipe_producer 没有从incoming.get() 继续。
文件写入仅用于测试目的。
此刻的父级是这个(仅供测试)
test = r"/home/host/PycharmProjects/Tim/Tim.py"
process = subprocess.Popen(["python3", test],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)
for i in range(5):
message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
jsonValueBytes = message.encode("utf-8")
process.stdin.write(jsonValueBytes + b"\n")
process.stdin.close()
process.wait()
我使用以下代码而不是发送到网络套接字:
#!/usr/bin/env python
import asyncio
import websockets
async def hello(uri):
header = {"Authorization": r"Basic XXXX="}
message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1,"camParameters":{"basicContainer":{"stationType":5,"referencePosition":{"latitude":451114425,"longitude":76720957,"positionConfidenceEllipse":{"semiMajorConfidence":4095,"semiMinorConfidence":4095,"semiMajorOrientation":3601},...other fields}}';
async with websockets.connect(uri, extra_headers=header) as websocket:
await websocket.send(message)
asyncio.get_event_loop().run_until_complete(
hello('XXX'))
它通过管道发送并工作,因为我在管道上接收并发送到套接字(FromPipe.txt 和 ToSocket.txt 文件是正确的)。
然后我有代码发送到一个打开的网络套接字的服务器,这个服务器将消息发送给孩子。当孩子从套接字接收时,文件 FromSocket.txt 被创建,但 ToPipe.txt 直到我把它放在 awit incoming.get() 之前才创建。
FromSocket.txt 有这个内容:
From socket: '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1, ... other field}}'
但是如果类型检索出现问题,它会创建文件,因为它是json_message = await incoming.get() 之后的第一条指令
我认为是队列的问题。
为了测试,我在等待outgoing.put(message) 之后将incoming.get() 放在socket_consumer 中,它可以工作。
更新:如果我只运行孩子(所以没有管道),ToPipe.txt 是正确的,并且从套接字传输到管道的消息很好。 对于我的测试,我运行父级,它将子级发送到套接字的一条消息发送到管道,然后我向套接字发送一条消息,子级捕获此消息,但它没有发送到管道和 ToPipe.txt未创建。可能是main方法有问题
【问题讨论】:
-
你是如何从父进程 Popen 读写的?您使用的是
select还是多个线程?你可以试试pexpect吗?这将更容易确保这里的问题不是父进程,如pexpecthandles all the non-blocking reading and writing details for you。 -
接下来,我们需要更多关于此处交换的实际消息的详细信息,以确保
pipe_producer解析消息的方式没有问题。你能把ToPipe.txt的输出与父进程和websocket发送和接收的数据一起显示吗? -
我添加了更多细节
-
您从未从管道标准输入中读取。这可能会导致整个 I/O 层完全停止,并且您的写入永远不会到达子进程。请在此处使用
pexpect以避免此类问题。 -
我在下面的回答中概述了还有什么问题。问题在于您从未指定管道和 websocket 发送和接收什么样的数据;我以为你已经弄清楚了这些部分。
标签: python python-3.x asynchronous queue python-asyncio