【问题标题】:Python in Linux: Put user input asynchronously into queueLinux中的Python:将用户输入异步放入队列
【发布时间】:2013-03-08 20:39:07
【问题描述】:

我正在尝试运行一个在工作完成时接收输入的程序。我查看了几种表格,并查看了documentation。我在 Debian 中运行它,我知道我可以使用这个 getch function 来接收字符而无需按回车键。为了打破它,这就是我试图在我的无限循环中实现的

  • 接受输入(线程在我这里不起作用
  • 将输入放入队列
  • 如果没有正在运行的作业,则以队列前面的项目作为变量启动作业

我也在运行线程模块来执行另一条指令。有什么办法可以做到吗?


更新:这是我迄今为止尝试过的:

首先,我尝试使用线程模块中的计时器来阻止它等待,结果是这样的。

def getchnow():    
        def time_up():
            answer= None
            print 'time up...'

    wait = Timer(5,time_up) # x is amount of time in seconds
    wait.start()
    try:
            print "enter answer below"
            answer = getch()
    except Exception:
            print 'pass\n'
            answer = None

    if answer != True:   # it means if variable have somthing 
            wait.cancel()       # time_up will not execute(so, no skip)
    return answer
line = getchnow()
#Add line variable to queue
#Do stuff with queue

这里的问题是它仍在等待用户输入。

然后我尝试将 getch 函数放入另一个线程。

q = Queue.Queue
q.put(getch())  
if q.get() != True:   # it means if variable have somthing
    line = q.get()
    #Add line variable to queue
#Do stuff with queue

这个尝试没有让我做任何事情。

【问题讨论】:

  • 是的,但也许如果您提供您尝试过的想法,我们可以帮助您实现目标。
  • @Michael 刚刚更新
  • 抛开你为什么试图从终端运行这些东西的问题,我建议你使用curses来满足你所有的高级终端输入和输出需求。
  • @9000 你能告诉我使用curses模块的方法吗?
  • 需要使用select()进行轮询不阻塞,也可以使用termios。您可以在 Linux 上将 select() 与 stdin 一起使用,但不能移植到 Windows。

标签: python linux asynchronous queue user-input


【解决方案1】:

我阅读了更多link,在底部有一个我想要的实现。

我将 select 模块用于 Linux 上的非阻塞实现。 如果没有收到输入,这将在(此处为 5 秒)超时。 在线程中使用时特别有用,因此 getch 调用是 非阻塞,并允许线程干净地退出

# This class gets a single character input from the keyboard
class _GetchUnix:
    def __init__(self):
        import tty, sys
        from select import select
    def __call__(self):
        import sys, tty, termios
        from select import select
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
                tty.setraw(sys.stdin.fileno())
                [i, o, e] = select([sys.stdin.fileno()], [], [], 2)
                if i: 
                ch=sys.stdin.read(1)
                else: 
                ch='' 
        finally:
                    termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
            return ch
getch = _GetchUnix()
# End Class

【讨论】:

    【解决方案2】:

    我也使用过[i, o, e] = select([sys.stdin.fileno()], [], [], 2),但我听说它可能不适用于 Windows。如果有人仍然需要多线程、非阻塞输入示例:

    import threading
    import sys
    import time
    
    bufferLock=threading.Lock()
    inputBuffer=[]
    
    class InputThread(threading.Thread):
        def run(self):
            global inputBuffer
            print("starting input")
            while True:
                line=sys.stdin.readline()
                bufferLock.acquire()
                inputBuffer.insert(0,line)
                bufferLock.release()
    
    input_thread=InputThread()
    input_thread.start()
    while True:
        time.sleep(4)
        bufferLock.acquire()
        if len(inputBuffer)>0:
            print("Popping: "+inputBuffer.pop())
        bufferLock.release()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-12-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多