【问题标题】:Async socket server as a producer with workers consuming it异步套接字服务器作为生产者,工作人员正在使用它
【发布时间】:2020-05-19 15:08:40
【问题描述】:

我开始使用 asyncio 模块,我想知道是否可以构建一个 tcp 服务器,将一些工作放入队列中以便一些工作人员执行它。

我尝试合并 python 文档中示例中的代码。

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

和工人

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

我一开始编码,就想到了很多问题。

服务器是否创建自己的事件循环?

我可以在工作人员使用服务器填充的队列中的作业时为服务器提供服务吗?

对于此类应用程序或引导人们了解异步带来的这些新术语有什么好的指南吗?

【问题讨论】:

    标签: python sockets tcp python-asyncio asyncsocket


    【解决方案1】:

    我不太确定, 但问题在于创建自己的事件循环的队列,所以我必须在主异步函数中创建它。 虽然start_servingserve_forever 没有任何区别。 我仍在尝试和研究文档,所以我暂时接受这个答案。

    from asyncio import Queue, create_task, gather, run, start_server
    
    
    async def do_work(name: str, broker: Queue):
        while True:
            data = await broker.get()
    
            print(f'worker `{name}` is consuming {data}')
    
            broker.task_done()
    
    
    async def main():
        broker = Queue(maxsize=512)
    
        async def handler(reader, writer):
            data = await reader.read()
            message = data.decode()
            addr = writer.get_extra_info('peername')
    
            print(f'Received {message!r} from {addr!r}')
    
            print(f'Send: {message!r}')
            writer.write(data)
            await writer.drain()
    
            print(f'Add work')
            await broker.put(data)
    
            print('Close the connection')
            writer.close()
    
        server = await start_server(handler, '127.0.0.1', 8888)
    
        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')
    
        # await server.start_serving()
    
        workers = []
        for i in range(3):
            worker = create_task(do_work(f'worker-{i}', broker))
            workers.append(worker)
    
        # await gather(*workers)
        async with server:
            await server.serve_forever()
    
    
    if __name__ == '__main__':
        run(main())
    
    
    

    【讨论】:

      猜你喜欢
      • 2011-07-13
      • 2021-04-15
      • 2014-10-02
      • 2011-01-18
      • 2020-05-19
      • 2012-06-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多