这有点复杂,但可以使用 forkpty(3) 创建一个新的 TTY,您可以在其中完全控制 less,将输入和输出转发到原始 TTY,以便感觉无缝。
以下代码使用 Python 3 及其标准库。 pexpect 可以做很多繁重的工作,但 Python 没有。此外,这种方式更有教育意义。
import contextlib
import fcntl
import io
import os
import pty
import select
import signal
import struct
import termios
import time
import tty
假设其余代码缩进以在此上下文管理器中运行。
with contextlib.ExitStack() as stack:
我们需要获取真正的 TTY 并将其设置为原始模式。这可能会混淆 TTY 的其他用户(例如,此程序退出后的 shell),因此请确保将其恢复到相同的状态。
tty_fd = os.open('/dev/tty', os.O_RDWR | os.O_CLOEXEC)
stack.callback(os.close, tty_fd)
tc = termios.tcgetattr(tty_fd)
stack.callback(termios.tcsetattr, tty_fd, termios.TCSANOW, tc)
tty.setraw(tty_fd, when=termios.TCSANOW)
然后我们可以调用forkpty,在Python中被命名为pty.fork()。这做了几件事:
- 创建一个pseudoterminal。
- 分叉一个新的孩子。
- 将子节点连接到 PTY 的从端。
- 将子进程的PID和PTY的master端返回给原进程。
孩子应该跑less。请注意_exit(2) 的使用,因为在fork 之后继续执行其他代码可能是不安全的。
child_pid, master_fd = pty.fork()
if child_pid == 0:
os.execv('/bin/sh', ('/bin/sh', '-c', 'echo hello | less -K -R'))
os._exit(0)
stack.callback(os.close, master_fd)
然后需要做一些工作来设置一些异步信号处理程序。
-
SIGCHLD 在子进程更改状态(例如退出)时收到。我们可以使用它来跟踪孩子是否仍在跑步。
-
当控制终端改变大小时收到
SIGWINCH。我们将此大小转发给 PTY(它将自动向附加的进程发送另一个窗口更改信号)。我们也应该设置 PTY 的窗口大小以匹配启动。
转发SIGINT、SIGTERM等信号也可能有意义。
child_is_running = True
def handle_chld(signum, frame):
while True:
pid, status = os.waitpid(-1, os.P_NOWAIT)
if not pid:
break
if pid == child_pid:
child_is_running = False
def handle_winch(signum, frame):
tc = struct.pack('HHHH', 0, 0, 0, 0)
tc = fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, tc)
fcntl.ioctl(master_fd, termios.TIOCSWINSZ, tc)
handler = signal.signal(signal.SIGCHLD, handle_chld)
stack.callback(signal.signal, signal.SIGCHLD, handler)
handler = signal.signal(signal.SIGWINCH, handle_winch)
stack.callback(signal.signal, signal.SIGWINCH, handler)
handle_winch(0, None)
现在是真肉:在真假 TTY 之间复制数据。
target_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) + 1
has_sent_q = False
with contextlib.suppress(OSError):
while child_is_running:
now = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)
if now < target_time:
timeout = target_time - now
else:
timeout = None
if not has_sent_q:
os.write(master_fd, b'q')
has_sent_q = True
rfds, wfds, xfds = select.select((tty_fd, master_fd), (), (), timeout)
if tty_fd in rfds:
data = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE)
os.write(master_fd, data)
if master_fd in rfds:
data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)
os.write(tty_fd, data)
它看起来很简单,尽管我在掩饰一些事情,例如正确的短写和SIGTTIN/SIGTTOU 处理(通过抑制OSError 部分隐藏)。