【问题标题】:using serial port in python3 asyncio在 python3 asyncio 中使用串口
【发布时间】:2014-02-09 22:42:35
【问题描述】:

我正在尝试,到目前为止,未能使用 python asyncio 访问串行端口。

我非常感谢在简单 fd 上使用新的 python 异步框架的任何提示。

干杯!

詹姆斯

【问题讨论】:

  • 我猜你需要创建自己的传输和协议才能在 tty 上读/写。查看源代码并尝试为您的用例调整套接字/子进程传输/协议。
  • 是的,这就是它的样子,但看起来整个事件循环也需要重新调整,因为它开始创建一个套接字对,这与这里无关。有点困惑,它似乎完全不受支持(我的意思是,读/写 tty 是异步的最简单用例,对吧?)
  • 是的。应该不难做到。你错了。使用事件循环不需要套接字。您至少可以改用子流程
  • 您可以尝试loop.connect_write_pipe()/loop.connect_read_pipe() 连接fd,如the async stdio example 中演示的那样

标签: python serial-port python-3.4 pyserial python-asyncio


【解决方案1】:

这是使用 FD 的另一种方式

import asyncio
import serial

s = serial.Serial('/dev/pts/13', 9600)


def test_serial():
    '''
    read a line and print.
    '''
    text = ""
    msg = s.read().decode()
    while (msg != '\n'):
        text += msg
        msg = s.read().decode()
    print(text)
    loop.call_soon(s.write, "ok\n".encode())

loop = asyncio.get_event_loop()
loop.add_reader(s, test_serial)
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.close()

【讨论】:

  • 我的 /dev/pts/... 终端在第一次按键时关闭,我总是得到 - SerialException('read failed: {}'.format(e)) serial.serialutil.SerialException: read failed: device reports readiness to read but returned no data (device disconnected or multiple access on port?) 。为什么?
  • @jaromrax 这通常意味着另一个进程正在从同一个串行端口读取数据。传入数据仅由一个进程接收。如果另一个进程已读取数据,则您的进程将无法再使用该数据。
【解决方案2】:

pySerial 越来越直接asyncio support。它现在处于实验状态,但对我来说可以正常工作。

来自文档的示例:

class Output(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        print('port opened', transport)
        transport.serial.rts = False
        transport.write(b'hello world\n')

    def data_received(self, data):
        print('data received', repr(data))
        self.transport.close()

    def connection_lost(self, exc):
        print('port closed')
        asyncio.get_event_loop().stop()

loop = asyncio.get_event_loop()
coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

【讨论】:

【解决方案3】:

这是一个使用 pyserial-asyncio 的工作示例:

from asyncio import get_event_loop
from serial_asyncio import open_serial_connection

async def run():
    reader, writer = await open_serial_connection(url='/dev/ttyS0', baudrate=115200)
    while True:
        line = await reader.readline()
        print(str(line, 'utf-8'))

loop = get_event_loop()
loop.run_until_complete(run())

【讨论】:

    【解决方案4】:

    另一种选择是使用阻塞调用编写所有串行内容,然后使用 run_in_executor 在不同的线程中运行它:

    import asyncio
    import concurrent
    
    from serial import Serial
    
    # Normal serial blocking reads
    # This could also do any processing required on the data
    def get_byte():
        return s.read(1)
    
    # Runs blocking function in executor, yielding the result
    @asyncio.coroutine
    def get_byte_async():
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            res = yield from loop.run_in_executor(executor, get_byte)
            return res
    
    def get_and_print():
        b = yield from get_byte_async()
        print (b)
    
    s = Serial("COM11", 19200, timeout=10)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_and_print())
    

    【讨论】:

      【解决方案5】:

      感谢大家的建议,最后我用稍微不同的方式解决了这个问题,在asyncio中使用了支持良好的socket连接,然后使用ser2net(http://sourceforge.net/projects/ser2net/)访问串口。

      这需要大约 10 秒来配置,这意味着 python 代码现在也可以处理访问远程串行端口了。

      【讨论】:

      • 有趣的解决方案。虽然如果您需要更改波特率、处理帧错误、处理串行端口硬件握手线路(DTR、DSR、DCD 等),这可能不是一个可行的解决方案。
      【解决方案6】:

      前段时间写了一个AsyncFile类,接口比低级协议简单。

      原代码在这里:https://github.com/l04m33/pyx/blob/dbaf121ab7bb9bbf04616a7285bcaba757682d03/pyx/io.py#L20

      class AsyncFile:
          """A local file class for use with the ``asyncio`` module.
          ``loop`` should be the event loop in use.
          ``filename`` is the name of the file to be opened.
          ``fileobj`` should be a regular file-like object.
          ``mode`` is the open mode accepted by built-in function ``open``.
          If ``filename`` is specified, the named file will be opened. And if
          ``fileobj`` is specified, that file object will be used directly. You
          cannot specify both ``filename`` and ``fileobj``.
          This class can be used in a ``with`` statement.
          """
      
          DEFAULT_BLOCK_SIZE = 8192
      
          def __init__(self, loop=None, filename=None,
                       fileobj=None, mode='rb'):
              if (filename is None and fileobj is None) or \
                      (filename is not None and fileobj is not None):
                  raise RuntimeError('Confilicting arguments')
      
              if filename is not None:
                  if 'b' not in mode:
                      raise RuntimeError('Only binary mode is supported')
                  fileobj = open(filename, mode=mode)
              elif 'b' not in fileobj.mode:
                  raise RuntimeError('Only binary mode is supported')
      
              fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
              if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
                  if filename is not None:
                      fileobj.close()
                  errcode = ctypes.get_errno()
                  raise OSError((errcode, errno.errorcode[errcode]))
      
              self._fileobj = fileobj
      
              if loop is None:
                  loop = asyncio.get_event_loop()
              self._loop = loop
              self._rbuffer = bytearray()
      
          def __enter__(self):
              return self
      
          def __exit__(self, exc_type, exc_value, traceback):
              self.close()
      
          def fileno(self):
              return self._fileobj.fileno()
      
          def seek(self, offset, whence=None):
              if whence is None:
                  return self._fileobj.seek(offset)
              else:
                  return self._fileobj.seek(offset, whence)
      
          def tell(self):
              return self._fileobj.tell()
      
          def _read_ready(self, future, n, total):
              if future.cancelled():
                  self._loop.remove_reader(self._fileobj.fileno())
                  return
      
              try:
                  res = self._fileobj.read(n)
              except (BlockingIOError, InterruptedError):
                  return
              except Exception as exc:
                  self._loop.remove_reader(self._fileobj.fileno())
                  future.set_exception(exc)
                  return
      
              if not res:     # EOF
                  self._loop.remove_reader(self._fileobj.fileno())
                  future.set_result(bytes(self._rbuffer))
                  return
      
              self._rbuffer.extend(res)
      
              if total > 0:
                  more_to_go = total - len(self._rbuffer)
                  if more_to_go <= 0:  # enough
                      res, self._rbuffer = self._rbuffer[:n], self._rbuffer[n:]
                      self._loop.remove_reader(self._fileobj.fileno())
                      future.set_result(bytes(res))
                  else:
                      more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
                      self._loop.add_reader(self._fileobj.fileno(),
                                            self._read_ready,
                                            future, more_to_go, total)
              else:   # total < 0
                  # This callback is still registered with total < 0,
                  # nothing to do here
                  pass
      
          @asyncio.coroutine
          def read(self, n=-1):
              future = asyncio.Future(loop=self._loop)
      
              if n == 0:
                  future.set_result(b'')
              else:
                  try:
                      res = self._fileobj.read(n)
                  except (BlockingIOError, InterruptedError):
                      if n < 0:
                          self._rbuffer.clear()
                          self._loop.add_reader(self._fileobj.fileno(),
                                                self._read_ready,
                                                future, self.DEFAULT_BLOCK_SIZE, n)
                      else:
                          self._rbuffer.clear()
                          read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
                          self._loop.add_reader(self._fileobj.fileno(),
                                                self._read_ready,
                                                future, read_block_size, n)
                  except Exception as exc:
                      future.set_exception(exc)
                  else:
                      future.set_result(res)
      
              return future
      
          def _write_ready(self, future, data, written):
              if future.cancelled():
                  self._loop.remove_writer(self._fileobj.fileno())
                  return
      
              try:
                  res = self._fileobj.write(data)
              except (BlockingIOError, InterruptedError):
                  return
              except Exception as exc:
                  self._loop.remove_writer(self._fileobj.fileno())
                  future.set_exception(exc)
                  return
      
              if res < len(data):
                  data = data[res:]
                  self._loop.add_writer(self._fileobj.fileno(),
                                        self._write_ready,
                                        future, data, written + res)
              else:
                  self._loop.remove_writer(self._fileobj.fileno())
                  future.set_result(written + res)
      
          @asyncio.coroutine
          def write(self, data):
              future = asyncio.Future(loop=self._loop)
      
              if len(data) == 0:
                  future.set_result(0)
              else:
                  try:
                      res = self._fileobj.write(data)
                  except (BlockingIOError, InterruptedError):
                      self._loop.add_writer(self._fileobj.fileno(),
                                            self._write_ready,
                                            future, data, 0)
                  except Exception as exc:
                      future.set_exception(exc)
                  else:
                      future.set_result(res)
      
              return future
      
          def stat(self):
              return os.stat(self._fileobj.fileno(), follow_symlinks=True)
      
          def close(self):
              self._loop.remove_reader(self._fileobj.fileno())
              self._loop.remove_writer(self._fileobj.fileno())
              self._fileobj.close()
      

      【讨论】:

      • 您能否给出一些简短的示例代码,将此类用于带有asyncio 的串行端口?
      【解决方案7】:

      这是我在异步串行端口上的尝试。该接口允许您将 serial.Serial 实例包装到 AIOSerial 类中,然后您可以执行 await AIOSerial.readline()await AIOSerial.write(data) 而不必使用 asyncio.Protocol() 样式的回调。

      import asyncio
      import sys
      
      import serial
      
      
      class AIOSerial:
          def __init__(self, serial, ioloop=None):
              self._serial = serial
              # Asynchronous I/O requires non-blocking devices
              self._serial.timeout = 0
              self._serial.write_timeout = 0
      
              if ioloop is not None:
                  self.loop = ioloop
              else:
                  self.loop = asyncio.get_event_loop()
              self.loop.add_reader(self._serial.fd, self._on_read)
              self._rbuf = b''
              self._rbytes = 0
              self._wbuf = b''
              self._rfuture = None
              self._delimiter = None
      
          def _on_read(self):
              data = self._serial.read(4096)
              self._rbuf += data
              self._rbytes = len(self._rbuf)
              self._check_pending_read()
      
          def _on_write(self):
              written = self._serial.write(self._wbuf)
              self._wbuf = self._wbuf[written:]
              if not self._wbuf:
                  self.loop.remove_writer(self._serial.fd)
      
          def _check_pending_read(self):
              future = self._rfuture
              if future is not None:
                  # get data from buffer
                  pos = self._rbuf.find(self._delimiter)
                  if pos > -1:
                      ret = self._rbuf[:(pos+len(self._delimiter))]
                      self._rbuf = self._rbuf[(pos+len(self._delimiter)):]
                      self._delimiter = self._rfuture = None
                      future.set_result(ret)
                      return future
      
          async def read_until(self, delimiter=b'\n'):
              while self._delimiter:
                  await self._rfuture
      
              self._delimiter = delimiter
              self._rfuture = asyncio.Future()
              #future = self._check_pending_read()
              return await self._rfuture
      
          async def readline(self):
              return await self.read_until()
      
          async def write(self, data):
              need_add_writer = not self._wbuf
      
              self._wbuf = self._wbuf + data
              if need_add_writer:
                  self.loop.add_writer(self._serial.fd, self._on_write)
              return len(data)
      

      示例用法:

      async def go_serial():
          ser = serial.Serial(sys.argv[1], 9600) #, rtscts=True, dsrdtr=True)
          print(ser)
          aser = AIOSerial(ser)
      
          written = await aser.write(b'test 1\n')
          print('written', written)
          data = await aser.readline()
          print('got from readline', data)
      
          while True:
              await aser.write(b'.\n')
              data = await aser.readline()
              print('GOT!', data)
              await asyncio.sleep(2.78)
      
      async def main():
          for n in range(120):
              await asyncio.sleep(1)
              print('n=%d' % n)
      
      if __name__ == "__main__":
          loop = asyncio.get_event_loop()
          asyncio.ensure_future(go_serial())
          loop.run_until_complete(main())
      

      这设置了串行端口和两个异步任务:go_serial 和 main。 Main 只运行了 120 秒,然后循环退出。 go_serial 写入和读取串行端口,期望对发送的每一行进行回复。

      然后使用await aser.write(b'blah')await aser.readline()(或await aser.read_until(b'\r\n'),如果您想要不同的分隔符)完成对串行端口的读取和写入。

      请注意,它并没有真正准备好生产,因为人们希望对缓冲区的数量有一些限制。

      为了测试这一点,我用下面的脚本模拟了一个串口,它输出了创建的 pty 的名称,然后是上面例子的参数。

      #!/usr/bin/python3
      import fcntl
      import time
      import os
      import errno
      import pty
      
      
      chars = []
      ser, s = pty.openpty()
      oldflags = fcntl.fcntl(ser, fcntl.F_GETFL)
      # make the PTY non-blocking
      fcntl.fcntl(ser, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
      
      print('Created: %s' % os.ttyname(s))
      
      
      while True:
          time.sleep(0.1)
          c = None
          try:
              c = os.read(ser, 10)
          except OSError as err:
              if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK:
                  c = None
              else:
                  raise
          if c:
              chars.append(c)
      
          data = b''.join(chars)
          if b'\n' in data:
              one, data = data.split(b'\n', 1)
              b = b'%.6f\n' % time.time()
              os.write(ser, b)
              print(one)
          chars = [data]
      

      【讨论】:

        【解决方案8】:

        考虑使用aioserial

        这是一个例子:

        import aioserial
        import asyncio
        
        
        async def read_and_print(aioserial_instance: aioserial.AioSerial):
            while True:
                print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)
        
        
        aioserial_com1: aioserial.AioSerial = aioserial.AioSerial(port='COM1')
        
        asyncio.run(read_and_print(aioserial_com1))
        

        致版主,

        答案与one 类似,但不重复。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-07-04
          • 2021-08-06
          • 1970-01-01
          相关资源
          最近更新 更多