【问题标题】:Python Asyncio queue get doesn't receive the messagePython Asyncio queue get 没有收到消息
【发布时间】:2018-09-12 08:55:15
【问题描述】:

我发布了一个与old 相关的新问题,以解决从队列中获取的问题。 这是代码(感谢 Martijn Pieters)

import asyncio
import sys
import json
import os
import websockets


async def socket_consumer(socket, outgoing):
    # take messages from the web socket and push them into the queue
    async for message in socket:
        await outgoing.put(message)
        file = open(r"/home/host/Desktop/FromSocket.txt", "a")
        file.write("From socket: " + ascii(message) + "\n")
        file.close()


async def socket_producer(socket, incoming):
    # take messages from the queue and send them to the socket
    while True:
        message = await incoming.get()
        file = open(r"/home/host/Desktop/ToSocket.txt", "a")
        file.write("To socket: " + ascii(message) + "\n")
        file.close()
        await socket.send(message)


async def connect_socket(incoming, outgoing, loop=None):
    header = {"Authorization": r"Basic XXX="}
    uri = 'XXXXXX'
    async with websockets.connect(uri, extra_headers=header) as web_socket:
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.ensure_future(
            socket_consumer(web_socket, outgoing), loop=loop)
        producer_task = asyncio.ensure_future(
            socket_producer(web_socket, incoming), loop=loop)

        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
        for task in pending:
            task.cancel()


# pipe support
async def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    reader = asyncio.StreamReader()
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb'))
    writer = asyncio.streams.StreamWriter(
        writer_transport, writer_protocol, None, loop)

    return reader, writer


async def pipe_consumer(pipe_reader, outgoing):
    # take messages from the pipe and push them into the queue
    while True:
        message = await pipe_reader.readline()
        if not message:
            break
        file = open(r"/home/host/Desktop/FromPipe.txt", "a")
        file.write("From pipe: " + ascii(message.decode('utf8')) + "\n")
        file.close()

        await outgoing.put(message.decode('utf8'))


async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        file = open(r"/home/host/Desktop/ToPipe.txt", "a")
        file.write("Send to pipe message: " + ascii(json_message) + "\n")
        file.close()
        try:
            message = json.loads(json_message)
            message_type = int(message.get('header', {}).get('messageID', -1))

        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            message_type = None
            file = open(r"/home/host/Desktop/Error.txt", "a")
            file.write(" Error \n")
            file.close()
        # 1 is DENM message, 2 is CAM message
        file.write("Send to pipe type: " + type)
        if message_type in {1, 2}:
            file.write("Send to pipe: " + json_message)
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()


async def connect_pipe(incoming, outgoing, loop=None):
    reader, writer = await stdio()
    # create tasks for the consumer and producer. The asyncio loop will
    # manage these independently
    consumer_task = asyncio.ensure_future(
        pipe_consumer(reader, outgoing), loop=loop)
    producer_task = asyncio.ensure_future(
        pipe_producer(writer, incoming), loop=loop)

    # start both tasks, but have the loop return to us when one of them
    # has ended. We can then cancel the remainder
    done, pending = await asyncio.wait(
        [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
    for task in pending:
        task.cancel()
    # force a result check; if there was an exception it'll be re-raised
    for task in done:
        task.result()


def main():
    loop = asyncio.get_event_loop()
    pipe_to_socket = asyncio.Queue(loop=loop)
    socket_to_pipe = asyncio.Queue(loop=loop)

    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop)
    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop)

    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))

main()

这段代码是父进程调用的子进程

subprocess.Popen(["python3", test], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)

问题是对象在队列中由socket_consumer(从套接字接收)但pipe_producer 没有从incoming.get() 继续。 文件写入仅用于测试目的。

此刻的父级是这个(仅供测试)

test = r"/home/host/PycharmProjects/Tim/Tim.py"
process = subprocess.Popen(["python3", test],
                           stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)

for i in range(5):
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
    jsonValueBytes = message.encode("utf-8")
    process.stdin.write(jsonValueBytes + b"\n")

process.stdin.close()
process.wait()

我使用以下代码而不是发送到网络套接字:

#!/usr/bin/env python

import asyncio
import websockets

async def hello(uri):
    header = {"Authorization": r"Basic XXXX="}
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1,"camParameters":{"basicContainer":{"stationType":5,"referencePosition":{"latitude":451114425,"longitude":76720957,"positionConfidenceEllipse":{"semiMajorConfidence":4095,"semiMinorConfidence":4095,"semiMajorOrientation":3601},...other fields}}';
    async with websockets.connect(uri, extra_headers=header) as websocket:
        await websocket.send(message)


asyncio.get_event_loop().run_until_complete(
    hello('XXX'))

它通过管道发送并工作,因为我在管道上接收并发送到套接字(FromPipe.txt 和 ToSocket.txt 文件是正确的)。
然后我有代码发送到一个打开的网络套接字的服务器,这个服务器将消息发送给孩子。当孩子从套接字接收时,文件 FromSocket.txt 被创建,但 ToPipe.txt 直到我把它放在 awit incoming.get() 之前才创建。

FromSocket.txt 有这个内容:

From socket: '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1, ... other field}}'

但是如果类型检索出现问题,它会创建文件,因为它是json_message = await incoming.get() 之后的第一条指令 我认为是队列的问题。 为了测试,我在等待outgoing.put(message) 之后将incoming.get() 放在socket_consumer 中,它可以工作。

更新:如果我只运行孩子(所以没有管道),ToPipe.txt 是正确的,并且从套接字传输到管道的消息很好。 对于我的测试,我运行父级,它将子级发送到套接字的一条消息发送到管道,然后我向套接字发送一条消息,子级捕获此消息,但它没有发送到管道和 ToPipe.txt未创建。可能是main方法有问题

【问题讨论】:

  • 你是如何从父进程 Popen 读写的?您使用的是select 还是多个线程?你可以试试pexpect 吗?这将更容易确保这里的问题不是父进程,如pexpect handles all the non-blocking reading and writing details for you
  • 接下来,我们需要更多关于此处交换的实际消息的详细信息,以确保pipe_producer 解析消息的方式没有问题。你能把ToPipe.txt的输出与父进程和websocket发送和接收的数据一起显示吗?
  • 我添加了更多细节
  • 您从未从管道标准输入中读取。这可能会导致整个 I/O 层完全停止,并且您的写入永远不会到达子进程。请在此处使用pexpect 以避免此类问题。
  • 我在下面的回答中概述了还有什么问题。问题在于您从未指定管道和 websocket 发送和接收什么样的数据;我以为你已经弄清楚了这些部分。

标签: python python-3.x asynchronous queue python-asyncio


【解决方案1】:

您正在向子进程写入双编码 JSON:

message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
jsonValue = json.dumps(message)

message 已经是一个 JSON 字符串,所以 jsonValue 是一个双重编码的 JSON 字符串。

管道使用者将此双编码字符串推入套接字队列。接下来,socket_producer() 中的 websocket 生产者再次对消息进行编码:

while True:
    message = await incoming.get()
    # ...
    json_message = json.dumps(message)
    await socket.send(json_message)

所以现在json_message 是一个三重编码的 JSON 值,一个 JSON 文档包含一个 JSON 文档,其中包含一个 JSON 文档:

>>> import json
>>> message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'  # valid JSON
>>> json_message = json.dumps(message)
>>> print(json_message)  # double-encoded
"{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"
>>> json_message = json.dumps(json_message)  # encode *again*
>>> print(json_message)  # triple-encoded
"\"{\\\"header\\\":{\\\"protocolVersion\\\":1,\\\"messageID\\\":2,\\\"stationID\\\":400}}}\""

我不确切知道您的网络套接字对此做了什么,但我们假设它使用一次json.loads(),然后回显解码后的消息。这意味着socket_consumer() 接收到一个仅编码两次的 JSON 文档。您的FromSocket.txt 日志当然暗示这就是发生的情况,因为它包含一个 double 编码的 JSON 消息:

您可以在 FromSocket.txt 日志中看到这一点:

From socket: "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400},\"cam\":{\"generationDeltaTime\":1,...other fields}}"

注意那些\" 条目,整个文档用引号括起来,但值中没有\\\ 三反斜杠。

不过,这种额外的 JSON 编码分层破坏了 pipe_producer() 协程,它期望消​​息解码为字典,而不是另一个字符串(即使该字符串包含另一个 JSON 文档):

message = json.loads(json_message)
type = int(message.get('header', {}).get('messageID', -1))

message 将解码为字符串,因此message.get 将失败并返回AttributeError,从而导致协程退出:

>>> json_message = "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"  # double encoded
>>> message = json.loads(json_message)
>>> message  # Back one stop, single-encoded JSON
'{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'
>>> type(message)  # it's a string with JSON, not a dictionary
<class 'str'>
>>> message.get('header')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'str' object has no attribute 'get'

您需要确保不要对数据进行太多次编码!如果您的管道接收到 JSON 数据,在将数据发送到套接字时不要再次对数据进行编码。当从父进程向管道发送数据时,不要对数据进行双重编码,如果您已经有一个 JSON 字符串,那么再次通过 json.dumps() 传递它是没有价值的。

在协程中添加更多故障保险也是谨慎的做法。我没有使 JSON 解码足够健壮,所以让我们来弥补这部分:

async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        try:
            message = json.loads(json_message)
            type = int(message.get('header', {}).get('messageID', -1))
        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            type = None
        # 1 is DENM message, 2 is CAM message
        if type in {1, 2}:
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()

您可能希望在某处记录解码失败(将消息推送到日志队列,由单独的任务提取以写入日志)。

接下来,我们可以更新 connect_* 函数以不忽略已完成任务中的异常:

done, pending = await asyncio.wait(
    [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
    task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
    task.result()

done.result() 检查可以重新引发消费者或生产者抛出的异常。由于connect_* 协程通过asyncio.gather() 运行,而后者又由loop.run_until_complete() 运行,因此该异常会一直传播到main() 函数,因此它将退出Python,你可以看到追溯打印。我已经更新了我的其他答案以包含 for task in done: task.result() 循环,因为无论如何这是一个好习惯。

just 在我的原始答案代码中使用 task.result() 循环,以及只回显消息并输入有效 JSON 文档(非双重编码)的 websocket,我可以看到立即出错;这里的父进程是我的终端,所以我只是将 JSON 消息复制到我的终端窗口中以将数据发送到管道中:

$ python3.7 so52291218.py
{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
Traceback (most recent call last):
  File "so52291218.py", line 140, in <module>
    main()
  File "so52291218.py", line 137, in main
    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
  File "/.../lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "so52291218.py", line 126, in connect_pipe
    task.result()
  File "so52291218.py", line 104, in pipe_producer
    type = int(message.get("header", {}).get("messageID", -1))
AttributeError: 'str' object has no attribute 'get'

当我从socket_producer() 中删除json.dumps() 调用时我将我的websocket 服务器更改为在传入消息上使用json.loads() 并将其作为结果发送出去,然后代码就可以工作了并且我收到了相同的消息回显到我的终端。

请注意,当 stdinstdout 都是管道时,不能只使用循环写入 subprocess.Popen() 管道。您可以通过仅在循环中写入来轻松地导致子进程挂在 I/O 上。您还必须确保从 stdout 管道中读取数据,但由于子进程将以有效的随机顺序从这些句柄中读取和写入,您的父进程必须异步处理 Popen() 管道的 I/O。

我没有写出如何做到这一点(在 Stack Overflow 的其他地方已经介绍过),而是告诉您使用 pexpect project,因为它是 already has done all that work for you(通过生成一个单独的线程,该线程不断地从stdout 管道);使用pexpect.popen_spawn.PopenSpawn() 来保持与您的原始设置接近,如下所示:

import sys
import pexpect

test = '...'
process = pexpect.popen_spawn.PopenSpawn([sys.executable, test])

for i in range(5):
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}';
    jsonValueBytes = message.encode("utf-8")
    process.send(jsonValueBytes + b"\n")

    # echo anything coming back
    while True:
        index = process.expect([process.crlf, pexpect.EOF, pexpect.TIMEOUT], timeout=0.1)
        if not process.before:
            break
        print('>>>', process.before.decode('utf8', errors='replace'), flush=True)

# send EOF to close the pipe, then terminate the process
process.sendeof()
process.kill(1)
process.wait()

所以每次我们向管道发送一个完整的行时,我们也会寻找来自另一个方向的行,超时时间很短,并回显任何这样的行。

所有修复都到位(确保避免多重编码 JSON 消息)和一个非常简单的echoing websocket server,上面的pexpect 代码打印:

>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}

表明存在从父进程到子进程到 websocket 并返回的完整往返路径。

【讨论】:

  • 感谢您的解释,我删除了 json 编码重复但它仍然不起作用。此外,ToPipe.txt 文件的创建是在检索类型字段之前,但它甚至没有创建。
  • @luca 你还添加了for task in done: task.result() 循环吗?因为接下来可能出错的是 pipe_consumer 中的解码。
  • 帮我们一个忙,在您写入日志文件的对象上使用ascii()。共享所有日志文件的结果。我仍然在这里盲目航行,因为您不会共享所有四个点发送或接收的数据,只有 ascii() 可以保证我们将获得这些对象的准确表示。
  • 您的问题越来越混乱,我不清楚您正在运行什么代码以及生成了什么消息。请编辑没有更新部分的问题;想象一下未来的访客正在阅读它,他们不需要整个历史。我总是可以阅读帖子历史以了解发生了什么变化。这里的目标是用 W 正确的 minimal reproducible example 创建一个问题,这样我就可以重新创建整个问题。
  • 请听从我的建议,改用pexpect;我认为你有一个挂起的 I/O 层。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-09-07
  • 2017-10-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-06-03
相关资源
最近更新 更多