【问题标题】:Using Multithreaded queue in python the correct way?在 python 中正确使用多线程队列?
【发布时间】:2015-07-30 07:39:05
【问题描述】:

我正在尝试在 python 中使用多线程的队列。我只是想知道我使用的方法是否正确。如果我在做一些多余的事情,或者如果有更好的方法我应该使用。

我正在尝试从表中获取新请求并使用一些逻辑来安排它们以执行某些操作,例如运行查询。

所以我在这里从主线程为队列生成了一个单独的线程。

if __name__=='__main__':

  request_queue = SetQueue(maxsize=-1)
  worker = Thread(target=request_queue.process_queue)
  worker.setDaemon(True)
  worker.start()


  while True:
    try:
      #Connect to the database get all the new requests to be verified
      db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
      #Get new requests for verification
      verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE     JOB_STATUS='%s' ORDER BY JOB_ID" %
                             (username_testschema, 'INITIATED'))

      #If there are some requests to be verified, put them in the queue.
      if len(verify_these) > 0:
        for row in verify_these:
          print "verifying : %s" % row[0]
          verify_id = row[0]
          request_queue.put(verify_id)
    except Exception as e:
      logger.exception(e)
    finally:
      time.sleep(10)

现在在 Setqueue 类中,我有一个 process_queue 函数,用于处理每次运行中添加到队列中的前 2 个请求。

'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''

class SetQueue(Queue.Queue):
  def _init(self, maxsize):
    Queue.Queue._init(self, maxsize)
    self.all_items = set()

  def _put(self, item):
    if item not in self.all_items:
      Queue.Queue._put(self, item)
      self.all_items.add(item)

  '''
  The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
  This way max two requests per run will be processed.
  '''
  def process_queue(self):
    while True:
      scheduler_obj = Scheduler()

      try:
        if self.qsize() > 0:
          for i in range(2):
            job_id = self.get()
            t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
            t.start()

          for i in range(2):
            t.join(timeout=1)
            self.task_done()

      except Exception as e:
        logger.exception(
          "QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
      finally:
        time.sleep(10)

我想看看我的理解是否正确,是否有任何问题。

所以当主函数中的 True 连接到数据库时运行的主线程获取新请求并将其放入队列中。队列的工作线程(守护进程)不断从队列中获取新请求并派生执行处理的非守护线程,并且由于连接的超时时间为 1,工作线程将继续接受新请求而不会被阻塞,并且它的子线程将继续在后台处理。对吗?

因此,如果主进程退出,这些将在完成工作之前不会被杀死,但工作守护线程将退出。 疑问:如果父进程是守护进程,子进程是非守护进程,如果父进程退出,子进程是否退出?)。


我也在这里阅读:-David beazley multiprocessing

大卫比兹利在使用池作为线程协处理器部分试图解决类似的问题。所以我应该按照他的步骤:- 1. 创建一个进程池。 2.像我为request_queue做的那样打开一个线程 3.在那个线程中

  def process_verification_queue(self):
    while True:
      try:
        if self.qsize() > 0:
          job_id = self.get()
          pool.apply_async(Scheduler.verify_func, args=(job_id,))
      except Exception as e:
        logger.exception("QUEUE EXCEPTION : Exception occured while    processing requests in the VERIFICATION QUEUE")

使用池中的进程并并行运行 verify_func。这会给我带来更好的性能吗?

【问题讨论】:

  • 请您修复缩进?
  • 我认为我无法充分回答您的所有问题,但您可以考虑遵循流行的Celery Project 中的一些架构模式。我并不是要推荐它作为您正在创建的解决方案,但它是一个成熟的项目,可以突出您可以复制的体面方法。
  • 我会看的。谢谢@erik-e

标签: python queue python-multithreading


【解决方案1】:

虽然可以为队列创建一个新的独立线程,并按照您的操作方式单独处理该数据,但我相信每个独立工作线程将消息发布到他们已经“知道”的队列更为常见“ 关于。然后通过从该队列中拉出消息,从其他线程处理该队列。

设计理念

我设想您的应用程序的方式是三个线程。主线程和两个工作线程。 1 个工作线程将从数据库获取请求并将它们放入队列中。另一个工作线程将处理队列中的数据

主线程会使用线程函数 .join() 等待其他线程完成

您可以保护线程可以访问的队列,并通过使用互斥锁使其线程安全。我在其他语言的许多其他设计中也看到了这种模式。

推荐阅读

Brett Slatkin 的“Effective Python”就是这个问题的一个很好的例子。

他没有从 Queue 继承,而是在他的类中为其创建了一个包装器 调用 MyQueue 并添加了 get() 和 put(message) 函数。

他甚至在他的 Github 仓库中提供了源代码

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py

我不隶属于这本书或其作者,但我强烈推荐它,因为我从中学到了很多东西:)

【讨论】:

    【解决方案2】:

    我喜欢这个关于使用线程和进程之间的优点和区别的解释 - “.....但是有一线希望:进程可以同时在多个执行线程上取得进展。由于父进程不与其子进程共享 GIL,所有进程可以同时执行(受硬件和操作系统)......”

    他对绕过 GIL 以及如何提高性能有一些很好的解释

    在这里阅读更多:

    http://jeffknupp.com/blog/2013/06/30/pythons-hardest-problem-revisited/

    【讨论】:

    • 但是总体上提供的sn-p除了GIL不会有任何问题吧?迁移到流程可能给我带来的唯一好处是性能。
    • 我不这么认为!由于线程使用相同的内存,因此必须采取预防措施,以免出现错误,但只要你在这方面小心,我认为你只是在考虑使用进程可能带来的性能提升 - 它也是如果您不必要地使用它们,可能会失去性能 - 生成进程可能比生成线程花费更长的时间,但一旦它们运行,它们的速度就会大致相同
    • 如果您使用多核,进程至少可以帮助您利用这一点!
    • 线程与进程比这复杂得多。只有在线程运行 python 字节码时,才会或多或少地获取 GIL。在这种特殊情况下,主线程将等待其他线程(GIL-released),第一个工作线程主要在数据库上等待(GIL-released),第二个工作线程是唯一需要 GIL 和GIL 可供它使用。此外,与需要操作系统级共享内存互斥锁或 IPC 的进程相比,线程所需的互斥锁(通常是自旋锁的成本)要便宜得多。
    猜你喜欢
    • 2021-12-14
    • 2013-10-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多