【问题标题】:Cancel asynchronous iterator by timeout通过超时取消异步迭代器
【发布时间】:2016-03-08 23:40:00
【问题描述】:

我有一个使用asyncio 运行的进程,它应该永远运行。

我可以使用 ProcessIterator 与该进程交互,它可以(此处省略)将数据发送到标准输入并从标准输出获取。

我可以通过async for fd, data in ProcessIterator(...):访问数据。

现在的问题是这个异步迭代器的执行必须是有时间限制的。如果时间用完,则调用timeout() 函数, 但该异常并非源自__anext__ 通知超时的函数。

如何在异步迭代器中引发此异常? 我发现没有办法为此打电话给awaitable.throw(something) 或类似的电话。

class ProcessIterator:
    def __init__(self, process, loop, run_timeout):
        self.process = process
        self.loop = loop

        self.run_timeout = run_timeout

        # set the global timer
        self.overall_timer = self.loop.call_later(
            self.run_timeout, self.timeout)

    def timeout(self):
        # XXX: how do i pass this exception into the iterator?
        raise ProcTimeoutError(
            self.process.args,
            self.run_timeout,
            was_global,
        )

    async def __aiter__(self):
        return self

    async def __anext__(self):    
        if self.process.exited:
            raise StopAsyncIteration()

        else:
            # fetch output from the process asyncio.Queue()
            entry = await self.process.output_queue.get()
            if entry == StopIteration:
                raise StopAsyncIteration()

            return entry

现在异步迭代器的用法大致如下:

async def test_coro(loop):
    code = 'print("rofl"); time.sleep(5); print("lol")'

    proc = Process([sys.executable, '-u', '-c', code])

    await proc.create()

    try:
        async for fd, line in ProcessIterator(proc, loop, run_timeout=1):
            print("%d: %s" % (fd, line))

    except ProcessTimeoutError as exc:
        # XXX This is the exception I'd like to get here! How can i throw it?
        print("timeout: %s" % exc)

    await proc.wait()

tl;dr:如何抛出一个定时异常,使其源自异步迭代器?

【问题讨论】:

    标签: python python-3.5 python-asyncio


    【解决方案1】:

    编辑:添加解决方案 2

    解决方案 1:

    timeout() 回调能否将 ProcTimeoutError 异常存储在实例变量中?然后__anext__() 可以检查实例变量,如果设置了则引发异常。

    class ProcessIterator:
        def __init__(self, process, loop, run_timeout):
            self.process = process
            self.loop = loop
            self.error = None
    
            self.run_timeout = run_timeout
    
            # set the global timer
            self.overall_timer = self.loop.call_later(
                self.run_timeout, self.timeout)
    
        def timeout(self):
            # XXX: set instance variable
            self.error = ProcTimeoutError(
                             self.process.args,
                             self.run_timeout,
                             was_global
                         )
    
        async def __aiter__(self):
            return self
    
        async def __anext__(self): 
            # XXX: if error is set, then raise the exception
            if self.error:
                raise self.error
    
            elif self.process.exited:
                raise StopAsyncIteration()
    
            else:
                # fetch output from the process asyncio.Queue()
                entry = await self.process.output_queue.get()
                if entry == StopIteration:
                    raise StopAsyncIteration()
    
                return entry
    

    解决方案 2:

    将异常放在 process.output_queue 上。

    ....
    def timeout(self):
        # XXX: set instance variable
        self.process.ouput_queue.put(ProcTimeoutError(
                                         self.process.args,
                                         self.run_timeout,
                                         was_global
                                     ))
    
    ....
    
    # fetch output from the process asyncio.Queue()
    entry = await self.process.output_queue.get()
    if entry == StopIteration:
        raise StopAsyncIteration()
    
    elif entry = ProcTimeoutError:
        raise entry
    ....
    

    如果队列中可能有条目,请使用优先队列。为 ProcTimeoutError 分配比其他条目更高的优先级,例如 (0, ProcTimeoutError) vs (1, other_entry)。

    【讨论】:

    • 解决方案 1 不起作用,因为队列可能不会产生任何输出,我们将永远挂在 queue.get() 中。解决方案 2 不起作用,因为队列可能会发送近乎无限量的垃圾邮件,这将阻塞排队的 StopIteration 或需要很长时间才能成为下一个元素。超时必须具有最高优先级(但使用优先级队列似乎是错误的)才能可靠地终止进程,因为它是不受信任的代码。
    • 更好的方法可能是同时等待未来和队列,并在其中一个准备好时继续,如果两者都存在,则首选未来。这允许在将异常设置为结果时立即做出反应。我想我有一个想法,让我们看看。
    【解决方案2】:

    请从asyncio查看timeout上下文管理器:

    with asyncio.timeout(10):
        async for i in get_iter():
            process(i)
    

    它还没有发布,但是你可以从asyncio master branch复制粘贴实现

    【讨论】:

    • 看起来很有希望。如果我现在可以使用我在 get_iter(timeout=10) 中指定并设置的超时来做到这一点,那么这正是我想要的:)
    【解决方案3】:

    您可以使用get_nowait,它将立即返回条目或抛出QueueEmpty。将它包裹在self.error 上的while 循环中并带有一些异步睡眠应该可以解决问题。比如:

    async def __anext__(self):    
        if self.process.exited:
            raise StopAsyncIteration()
    
        else:
            while self.error is None:
                try:
                    entry = self.process.output_queue.get_nowait()
                    if entry == StopIteration:
                        raise StopAsyncIteration()
                    return entry
                except asyncio.QueueEmpty:
                    # some sleep to give back control to ioloop
                    # since we using nowait
                    await asyncio.sleep(0.1)
            else:
                raise self.error
    

    作为在Tornado's Queue.get 实现中使用的提示方法,带有超时:

    def get(self, timeout=None):
        """Remove and return an item from the queue.
        Returns a Future which resolves once an item is available, or raises
        `tornado.gen.TimeoutError` after a timeout.
        """
        future = Future()
        try:
            future.set_result(self.get_nowait())
        except QueueEmpty:
            self._getters.append(future)
            _set_timeout(future, timeout)
        return future
    

    【讨论】:

    • 我不认为这是一个可以接受的解决这个问题的方法,比如像这样“忙”地等待。再次处理循环时应该抛出异常,而不是一次又一次地尝试。
    • 你可能不同意,但这就像asyncio.Queue.get 工作 - 忙着等待github.com/python/asyncio/blob/master/asyncio/queues.py#L157
    • 当然可以写得更好,作为提示我添加了Tornado的实现
    • 很抱歉,但在队列实现中,我只看到yield from getter,它一直等到有可用的项目。怎么会忙着等呢?
    • 我有点滥用“忙等待”这个词......但是,回到解决方案,async.sleep 使用不同的方法(ioloop 计时器),它的目标是“超时必须具有最大优先级"
    【解决方案4】:

    这是我现在想出的解决方案。

    有关上游版本,请参阅 https://github.com/SFTtech/kevin kevin/process.py

    它还具有行计数和输出超时,我从这个例子中去掉了。

    class Process:
        def __init__(self, command, loop=None):
    
            self.loop = loop or asyncio.get_event_loop()
    
            self.created = False
            self.killed = asyncio.Future()
    
            self.proc = self.loop.subprocess_exec(
                lambda: WorkerInteraction(self),  # see upstream repo
                *command)
    
            self.transport = None
            self.protocol = None
    
        async def create(self):
            self.transport, self.protocol = await self.proc
    
        def communicate(self, timeout):
            if self.killed.done():
                raise Exception("process was already killed "
                                "and no output is waiting")
    
            return ProcessIterator(self, self.loop, timeout)
    
    class ProcessIterator:
        """
        Asynchronous iterator for the process output.   
        Use like `async for (fd, data) in ProcessIterator(...):`
        """
    
        def __init__(self, process, loop, run_timeout):
            self.process = process
            self.loop = loop
            self.run_timeout = run_timeout
    
            self.overall_timer = None
    
            if self.run_timeout < INF:
                # set the global timer
                self.overall_timer = self.loop.call_later(
                    self.run_timeout,
                    functools.partial(self.timeout, was_global=True))
    
        def timeout(self):
            if not self.process.killed.done():
                self.process.killed.set_exception(ProcTimeoutError(
                    self.process.args,
                    self.run_timeout,
                ))
    
        async def __aiter__(self):
            return self
    
        async def __anext__(self):
            # either the process exits,
            # there's an exception (process killed, timeout, ...)
            # or the queue gives us the next data item.
            # wait for the first of those events.
            done, pending = await asyncio.wait(
                [self.process.protocol.queue.get(), self.process.killed],
                return_when=asyncio.FIRST_COMPLETED)
    
            # at least one of them is done now:
            for future in done:
                # if something failed, cancel the pending futures
                # and raise the exception
                # this happens e.g. for a timeout.
                if future.exception():
                    for future_pending in pending:
                        future_pending.cancel()
    
                    # kill the process before throwing the error!
                    await self.process.pwn()
                    raise future.exception()
    
                # fetch output from the process
                entry = future.result()
    
                # it can be stopiteration to indicate the last data chunk
                # as the process exited on its own.
                if entry == StopIteration:
                    if not self.process.killed.done():
                        self.process.killed.set_result(entry)
    
                        # raise the stop iteration
                        await self.stop_iter(enough=False)
    
                return entry
    
            raise Exception("internal fail: no future was done!")
    
        async def stop_iter(self):
            # stop the timer
            if self.overall_timer:
                self.overall_timer.cancel()
    
            retcode = self.process.returncode()
    
            raise StopAsyncIteration()
    

    神奇的功能是这样的:

    done, pending = await asyncio.wait(
        [self.process.protocol.queue.get(), self.process.killed],
        return_when=asyncio.FIRST_COMPLETED)
    

    当超时发生时,队列获取可靠地中止。

    【讨论】:

      猜你喜欢
      • 2014-02-04
      • 2020-01-15
      • 2013-04-09
      • 2016-07-05
      • 2018-12-08
      • 2014-06-11
      • 1970-01-01
      • 2012-08-16
      相关资源
      最近更新 更多