【问题标题】:How can I make a background, non-blocking input loop in python?如何在 python 中创建背景、非阻塞输入循环?
【发布时间】:2017-12-15 19:25:24
【问题描述】:

所以,我正在开发一个小型控制台应用程序,它运行一个网络抓取过程,我希望能够在执行过程中为其提供控制台命令来控制它。为此,我需要某种形式的非阻塞键盘输入,因为程序可能会由于意外错误而自行终止,并且我不希望某些线程在终止时挂起并等待输入。

我已经讨论了以下内容:

import threading
import time
import queue

input_queue = queue.Queue()
command_input_event = threading.Event()

def kbdListener():
    global input_queue, command_input_event
    kbdInput = ''
    while kbdInput.lower() not in ['quit', 'exit', 'stop']:
        kbdInput = input("> ")
        input_queue.put(kbdInput)
        command_input_event.set()
        input_queue.join()

listener = threading.Thread(target=kbdListener)
listener.start()
stop = False
while not stop:
    if command_input_event.is_set():
        while not input_queue.empty():
            command = input_queue.get()
            if command.lower() in ['quit', 'exit', 'stop']:
                print('Stopping')
                while not input_queue.empty():
                    input_queue.get()
                    input_queue.task_done()
                input_queue.task_done()
                stop = True
                break
            else:
                print('Command "{}" received and processed'.format(command))
                input_queue.task_done()

我的问题是,在while not stop: 行上,我的程序中将检查另一个条件,确定主循环是否已终止。如果发生这种情况,则主线程将停止,但后台 listener 线程仍将等待输入;我试图避免的情况。

我不喜欢这种方法,所以如果有一些替代方法可以获得非阻塞输入,那么我也会接受这个建议。

【问题讨论】:

  • 我认为你应该在后台线程中进行抓取,而在 main 中进行输入循环。
  • 我已经在后台线程上进行了抓取,但它可能会以我能够在主线程上检测到的方式终止。我仍然存在后台线程完成后主线程将等待输入的问题。这是由于 python 中 input 方法的阻塞性质,所以我相当肯定这需要改变,但我不确定该怎么做。
  • 你使用 Linux 吗? stackoverflow.com/q/3762881/1025391
  • 不用担心,虽然我自己更喜欢 Linux,但这是在运行 Windows 10 的工作 PC 上。
  • 我还应该提到,我查看了msvcrt 模块,但它似乎没有提供我想要的功能。它似乎一次只提供一个字符,而不是一行输入,虽然我知道我可以用它来组合一个输入行,但这可能比一切都值得更麻烦。

标签: python multithreading python-3.x console-application


【解决方案1】:

因此,经过一些工作后,我想出了以下方法,它允许后台线程输入,然后由前台线程处理,但完全是非阻塞的,并且随着用户实时输入而更新。

import threading
import queue
import sys
from msvcrt import getch, kbhit

"""
Optional extra that I found necessary to get ANSI commands working on windows:

import colorama
colorama.init()
"""

class ClassWithNonBlockingInput:

    def __init__(self):
        self.command_queue = queue.Queue()
        self.command_available_event = threading.Event()
        self.stop_event = threading.Event()

    def run(self):
        main_loop = threading.Thread(target=self.main_loop)
        main_loop.start()
        listener = threading.Thread(target=self.keyboard_listener)
        listener.start()
        stop = False
        while main_loop.is_alive() and not stop:
            if self.command_available_event.is_set():
                while not self.command_queue.empty():
                    command = self.command_queue.get()
                    #Process command here, long jobs should be on a seperate thread.
                    self.command_queue.task_done()
                self.command_available_event.clear()

    def main_loop(self):
        #Main processing loop, may set self.stop_event at some point to terminate early.
        pass

    def keyboard_listener(self):
        line = []
        line_changed = False
        while not self.stop_event.is_set():
            while kbhit():
                c = getch()
                if c == b'\x00' or c == b'\xe0':
                    #This is a special function key such as F1 or the up arrow.
                    #There is a second identifier character to clear/identify the key.
                    id = getch()
                    #Process the control key by sending commands as necessary.
                elif c == b'\x08':
                    #Backspace character, remove last character.
                    if len(line) > 0:
                        line = line[:-1]
                    line_changed = True
                elif c == b'\x7f':
                    #ctrl-backspace, remove characters until last space.
                    while len(line) > 0 and line[-1] != b' ':
                        line = line[:-1]
                    line_changed = True
                elif c == b'\x1b':
                    #Escacpe key, process as necessary.
                    pass
                elif c == b'\r':
                    #Enter key, send command to main thread.
                    print()
                    command = b''.join(line).decode('utf-8')
                    self.command_queue.put(command)
                    self.command_available_event.set()
                    self.command_queue.join()
                    line = []
                    line_changed = True
                else:
                    #Append all other characters to the current line.
                    #There may be other special keys which need to be considered,
                    #  this is left as an exercise for the reader :P
                    line.append(c)
                    line_changed = True

                if line_changed:
                    #Clear the current output line
                    print('\033[2K', end='\r')
                    print(b''.join(line).decode('utf-8'), end='')
                    sys.stdout.flush()
                    line_changed = False

这应该会给以后遇到这个问题并偶然发现这个问题的人一个良好的开端。

【讨论】:

    猜你喜欢
    • 2015-05-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-20
    • 2018-03-27
    • 2015-09-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多