【问题标题】:Can asyncio.DatagramTransport.sendto be starved with CPU computation?asyncio.DatagramTransport.send 是否会因 CPU 计算而饿死?
【发布时间】:2020-10-24 00:45:49
【问题描述】:

我正在通过多种传输方式发送数据

for transport in transports:
                transport.sendto(packet)

传输列表的创建方式如下:

for country, ip in countries:
    if country != my_country:
        t, _ = await self._loop.create_datagram_endpoint(
            lambda: ClientProtocol(country), remote_addr=(ip, port)
        )
        transports.add(t)  # type: ignore

Sendto 定义为:

def sendto(self, data, addr=None):
        """Send data to the transport.

        This does not block; it buffers the data and arranges for it
        to be sent out asynchronously.
        addr is target socket address.
        If addr is None use target address pointed on transport creation.
        """

让我感到困惑的是sendto 说不阻塞,但它没有标记为async 功能。如果我的主循环中没有等待并且我正在做大量的 CPU 计算,sendto 会被饿死吗?是否需要添加一些awaits 以确保始终发送数据?

谢谢!

【问题讨论】:

  • 你可能根本不应该使用这个 API。引用docs,“传输和协议由低级事件循环API使用......本质上,传输和协议应该只在库和框架中使用,永远不要在高级异步应用程序。”
  • 不幸的是,这个特殊的应用程序对延迟非常敏感,并且有兴趣使用最有效的可用 API。

标签: python python-3.x async-await python-asyncio


【解决方案1】:

asyncio.DatagramTransport.sendto 可能会被 CPU 计算饿死。文档没有解释这一点,但如果我们看一下这个方法的one of the Python 3.9.0 implementations

def sendto(self, data, addr=None):
    if not isinstance(data, (bytes, bytearray, memoryview)):
        raise TypeError(f'data argument must be a bytes-like object, '
                        f'not {type(data).__name__!r}')
    if not data:
        return

    if self._address:
        if addr not in (None, self._address):
            raise ValueError(
                f'Invalid address: must be None or {self._address}')
        addr = self._address

    if self._conn_lost and self._address:
        if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
            logger.warning('socket.send() raised exception.')
        self._conn_lost += 1
        return

    if not self._buffer:
        # Attempt to send it right away first.
        try:
            if self._extra['peername']:
                self._sock.send(data)
            else:
                self._sock.sendto(data, addr)
            return
        except (BlockingIOError, InterruptedError):
            self._loop._add_writer(self._sock_fd, self._sendto_ready)
        except OSError as exc:
            self._protocol.error_received(exc)
            return
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as exc:
            self._fatal_error(
                exc, 'Fatal write error on datagram transport')
            return

    # Ensure that what we buffer is immutable.
    self._buffer.append((bytes(data), addr))
    self._maybe_pause_protocol()

我们看到如果底层套接字操作会阻塞,该方法使用self._loop._add_writer 告诉事件循环在套接字准备好时调用self._sendto_ready。如果不将控制权交还给事件循环,事件循环将永远没有机会调用_sendto_ready,也不会发送数据。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-01-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-19
    • 1970-01-01
    • 2013-07-20
    相关资源
    最近更新 更多