【问题标题】:Python Gevent Shared Queue (Listener Process)Python Gevent 共享队列(监听进程)
【发布时间】:2015-11-04 20:50:17
【问题描述】:

我正在尝试让一些代码工作,我可以在其中使用 gevent 实现登录到多线程程序。我想做的是设置自定义日志处理程序以将日志事件放入队列中,同时侦听器进程不断监视新的日志事件以进行适当处理。我过去曾使用 Multiprocessing 完成此操作,但从未使用 Gevent。

我遇到了一个问题,即程序陷入无限循环(侦听器进程),并且不允许其他线程“工作”...

理想情况下,在工作进程完成后,我可以将任意值传递给监听进程,告诉它中断循环,然后将所有进程连接在一起。这是我目前所拥有的:

import gevent
from gevent.pool import Pool
import Queue
import random
import time

def listener(q):
    while True:
        if not q.empty():
            num = q.get()
            print "The number is: %s" % num
            if num <= 100:
                print q.get()
            # got passed 101, break out
            else:
                break
        else:
            continue
def worker(pid,q):
    if pid == 0:
        listener(q)
    else:
        gevent.sleep(random.randint(0,2)*0.001)
        num = random.randint(1,100)
        q.put(num)

def main():
    q = Queue.Queue()
    all_threads = []
    all_threads = [gevent.spawn(worker, pid,q) for pid in xrange(10)]
    gevent.wait(all_threads[1:])
    q.put(101)
    gevent.joinall(all_threads)

if __name__ == '__main__':
    main()

正如我所说,程序似乎挂断了第一个进程,并且不允许其他工作人员做他们的事情。我也尝试过完全独立地生成侦听器进程(这实际上是我更愿意这样做),但这似乎也不起作用,所以我尝试了这种方式。

任何帮助将不胜感激,感觉我可能只是遗漏了一些关于 gevent 后端的明显内容。

谢谢

【问题讨论】:

    标签: python logging queue gevent


    【解决方案1】:

    第一个问题是,如果队列最初为空,您的侦听器将永远不会让步。你产生的第一个任务是你的听众。当它开始时,有一个while True:,q 将是空的,所以你转到 else 分支,它只是继续,循环回到 while 循环的开头,然后 q 仍然是空的。因此,您只需坐在第一个线程中不断检查 q 是否为空。

    这里的关键是 gevent 不使用“本机”线程或进程。与“真正的”线程不同,它可以在任何时候被幕后的东西(比如你的操作系统调度程序)切换到,gevent 使用“greenlets”,它要求你做一些事情来“让控制”到另一个任务。这就是 gevent 认为会阻塞的任何东西,例如从网络、磁盘读取,或使用阻塞的 gevent 操作之一。

    一个粗略的解决方法是在pid == 9 而不是0 时启动你的监听器。通过让它最后生成,q 中会有项目,它会进入主 if 分支。缺点是这并不能解决逻辑问题,所以第一次队列为空时,你会再次陷入无限循环。

    更正确的解决方法是使用gevent.sleep() 而不是continue。 sleep 是一个阻塞操作,因此您的其他任务将有机会运行。如果没有参数,它不会等待任何时间,但如果 gevent 准备好运行,它仍然有机会决定切换到另一个任务。但是,这仍然不是很有效,好像队列是空的,它将花费大量无意义的时间一遍又一遍地检查并要求尽快再次运行。睡眠时间超过默认值 0 会更有效,但会延迟处理您的日志消息。

    但是,您可以利用许多 gevent 类型(例如 Queue)可以以更多 Pythonic 方式使用这一事实,从而使您的代码更简单、更易于理解以及更高效。

    import gevent
    from gevent.queue import Queue
    
    def listener(q):
        for msg in q:
            print "the number is %d" % msg
    
    def worker(pid,q):
        gevent.sleep(random.randint(0,2)*0.001)
        num = random.randint(1,100)
        q.put(num)
    
    def main():
        q = Queue()
        listener_task = gevent.spawn(listener, q)
        worker_tasks = [gevent.spawn(worker, pid, q) for pid in xrange(1, 10)]
        gevent.wait(worker_tasks)
        q.put(StopIteration)
        gevent.join(listener_task)
    

    这里,Queue 可以作为 for 循环中的迭代器运行。只要有消息,它就会获取一个项目,运行循环,然后等待另一个项目。如果没有物品,它只会阻塞并徘徊,直到下一个到达。但是,由于它会阻塞,gevent 将切换到您的其他任务之一来运行,从而避免您的示例代码存在的无限循环问题。

    因为这个版本使用Queue作为for循环迭代器,所以我们也可以自动将一个漂亮的哨兵值放入队列中以使侦听器任务退出。如果一个 for 循环从它的迭代器中得到StopIteration,它将干净地退出。因此,当我们从 q 读取的 for 循环从 q 中获取 StopIteration 时,它会退出,然后函数退出,生成的任务就完成了。

    【讨论】:

      猜你喜欢
      • 2020-05-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-03
      • 1970-01-01
      • 2017-01-19
      相关资源
      最近更新 更多