【问题标题】:What is right way to call async code in regular callback?在常规回调中调用异步代码的正确方法是什么?
【发布时间】:2017-03-07 13:35:07
【问题描述】:

在常规回调中调用异步代码的正确方法是什么? 这段代码有效,但看起来并不漂亮。我不喜欢如何响应调用:它需要通过所有函数传递调用者地址。如何设置处理程序的超时时间?

我在 cmets 中提出了问题。

import asyncio
import logging
logger = logging.getLogger('protocol')

async def echo(data):
    #external lib emulator
    data = b'>'+data
    await asyncio.sleep(1)
    return data


class Protocol(asyncio.DatagramProtocol):
    def connection_made(self, transport):
        logger.debug('connection_made called')
        self.transport = transport

    def respond(self,task):
        logger.debug('respond called')
        # i want to get data in attrs, not task

        resp,caller = task.result()

        self.transport.sendto(resp, caller)


    async def handler(self,data, caller):
        logger.debug('handler called')

        # async needed for `await` and `async for` external library such motor, etc
        # do some awaits
        data = await echo(data)

        # simple echo
        return (data, caller)


    def datagram_received(self, data, addr):
        logger.debug('datagram_received called')

        # handler selected by data header
        handler = self.handler

        # how to do run async handler?
        loop = asyncio.get_event_loop()
        c = handler(data, addr) #coroutine

        f = asyncio.ensure_future(c,loop=loop) #attach coroutine to loop
        f.add_done_callback(self.respond)

        # How to get response here?

        # i cant loop.run_until_complete(...) because loop.run_forever() running

        #def wakeup():
        #    pass
        #loop.call_soon(wakeup)
        # without this call_soon future is not executed in first programm code, but works in test and after refactor


def main(HOST,PORT):

    loop = asyncio.get_event_loop()
    t = asyncio.Task(loop.create_datagram_endpoint(
        Protocol, local_addr=(HOST,PORT)))
    transport, server = loop.run_until_complete(t)

    sock = transport.get_extra_info('socket')
    # socket tuning here

    try:
        loop.run_forever()
    finally:
        transport.close()
        loop.close()

logging.basicConfig(level=logging.DEBUG)     
main('0.0.0.0',10012)

使用netcat测试nc -u 127.0.0.1 10012

【问题讨论】:

  • 我认为你是对的:asyncio.ensure_future 是正确的方式。
  • @SergeyBelash 如何就地获取数据(无回调)?
  • ensure_future 仅在datagram_received 返回后运行
  • 较低级别的 asyncio 基于回调,并且 asyncio 不为 UDP 提供基于协程的对象。但是,您可以查看those high-level UDP endpoints for asyncio,它可能会让您对如何实现类似的基于协程的解决方案有所了解。

标签: python python-3.x python-3.5 python-asyncio


【解决方案1】:

您可以使用 asyncio.Queue 将任务放在一个协程(句柄)中,await 将它们放在另一个协程中(响应):

class Protocol(asyncio.DatagramProtocol):

    def __init__(self):
        self.data_queue = asyncio.Queue(loop=asyncio.get_event_loop())
        asyncio.ensure_future(self.respond())

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        asyncio.ensure_future(self.handler(data, addr), loop=asyncio.get_event_loop())

    async def respond(self):
        while True:
            resp, caller = await self.data_queue.get()
            self.transport.sendto(resp, caller)

    async def handler(self, data, caller):
        data = await echo(data)
        self.data_queue.put((data, caller))

【讨论】:

【解决方案2】:

我发现最好的方法是使用异步函数在每个数据报上创建任务

class Protocol(AbstractProtocol):

    async def datagram_received_async(self, data, addr):
        await a()
        await b()

    def datagram_received(self, data, addr):
        self.loop.create_task(self.datagram_received_async( data, addr))

Futures 有未等待的警告。任务还行。在这种情况下不需要排队。

【讨论】:

  • 有没有办法同步等待async def结果datagram_received
  • datagram_received 没有返回,所以同步等待它没有任何用处。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-08-23
  • 2017-04-13
  • 2020-10-17
  • 1970-01-01
  • 2015-06-29
相关资源
最近更新 更多