【问题标题】:Pass asynchronous websocket.send() to stdout/stderr wrapper class将异步 websocket.send() 传递给 stdout/stderr 包装类
【发布时间】:2017-07-08 04:57:29
【问题描述】:

我有一个类函数可以取消缓冲 stdout 和 stderr,如下所示:

class Unbuffered:
    def __init__(self, stream):
        self.stream = stream
    def write(self, data):
        data = data.strip()
        if data.startswith("INFO: "):
            data = data[6:]
        if '[' in data:
            progress = re.compile(r"\[(\d+)/(\d+)\]")
            data = progress.match(data)
            total = data.group(2)
            current = data.group(1)
            data = '{0}/{1}'.format(current, total)
        if data.startswith("ERROR: "):
            data = data[7:]
        self.stream.write(data + '\n')
        self.stream.flush()
    def __getattr__(self, attr):
        return getattr(self.stream, attr)

当来自 websocket 的入站到达时,输出来自 ProcessPoolExecutor 中运行的函数。

我希望在控制台中打印输出以及发送到我的 websocket 客户端。我尝试异步 Unbuffered 并将 websocket 实例传递给它,但没有运气。


更新:run() 和我的 websocket handler() 的基本内容如下所示:

def run(url, path):
    logging.addLevelName(25, "INFO")
    fmt = logging.Formatter('%(levelname)s: %(message)s')
    #----
    output.progress_stream = Unbuffered(sys.stderr)
    stream = Unbuffered(sys.stdout)
    #----
    level = logging.INFO
    hdlr = logging.StreamHandler(stream)
    hdlr.setFormatter(fmt)
    log.addHandler(hdlr)
    log.setLevel(level)
    get_media(url, opt)

async def handler(websocket, path):
    while True:
        inbound = json.loads(await websocket.recv())
        if inbound is None:
            break
        url = inbound['url']
        if 'path' in inbound:
            path = inbound['path'].rstrip(os.path.sep) + os.path.sep
        else:
            path = os.path.expanduser("~") + os.path.sep
        # blah more code
        while inbound != None:
            await asyncio.sleep(.001)
            await loop.run_in_executor(None, run, url, path)

run()handler()Unbuffered 是相互独立的。

【问题讨论】:

  • 一些 cmets:而不是 data.strip(),应该是 data = data.strip(),稍后再尝试:if data.startswith("INFO: "): data = data[6:]。尝试使用re.match(r"\[([^\]]+)\]") 而不是re.split() 等。
  • 对,当然!谢谢,我会编辑这个。
  • run() 创建一个Unbuffered 实例吗?您能否粘贴run() 的相关部分并显示Unbuffered 的使用方式和位置?
  • 使用run_in_executor()时,无需使用with ...命令。
  • 'progress' in locals() 始终为 False。

标签: python-3.x websocket async-await python-asyncio


【解决方案1】:

重写get_media() 以使用asyncio 而不是在不同的线程中运行它是最好的。否则,有一些选项可以在常规线程和协程之间进行通信,例如,使用套接字对:

import asyncio
import socket
import threading
import time

import random


# threads stuff
def producer(n, writer):
    for i in range(10):
        # print("sending", i)
        writer.send("message #{}.{}\n".format(n, i).encode())
        time.sleep(random.uniform(0.1, 1))


def go(writer):
    threads = [threading.Thread(target=producer, args=(i + 1, writer,))
               for i in range(5)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    writer.send("bye\n".encode())


# asyncio coroutines
async def clock():
    for i in range(11):
        print("The time is", i)
        await asyncio.sleep(1)


async def main(reader):
    buffer = ""
    while True:
        buffer += (await loop.sock_recv(reader, 10000)).decode()
        # print(len(buffer))
        while "\n" in buffer:
            msg, _nl, buffer = buffer.partition("\n")
            print("Got", msg)
            if msg == "bye":
                return


reader, writer = socket.socketpair()
reader.setblocking(False)
threading.Thread(target=go, args=(writer,)).start()
# time.sleep(1.5) # socket is buffering
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([clock(), main(reader)]))
loop.close()

你也可以试试这个 3rd-party thread+asyncio 兼容队列:janus

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多