【问题标题】:better solution for setting max threads to hold main thread?设置最大线程以保持主线程的更好解决方案?
【发布时间】:2013-05-10 03:07:09
【问题描述】:

我有一个连接到许多 serverlet 之一的 Web 服务器。 Web 服务器最多可以排队 40 个作业,每个作业可能需要 20 分钟或 30 小时才能运行。

Web 服务器使用套接字连接到一个 serverlet,该 serverlet 运行使用线程发送的作业。

我想对可以一次运行的线程(作业)数量设置一个上限,比如 3 个,一旦达到该限制,它就会保留主线程。当其中一个线程结束时,它允许主线程继续并拾取另一项工作。

# Wait for thread count to reduce before continuing
while threading.active_count() >= self.max_threads:
    pass 

我目前正在使用循环让我的主线程等待,直到有空闲线程可用。它有效,但感觉像是一个快速而肮脏的解决方案。我想知道是否有更好的方法来做到这一点?

server.py

import socket
import sys
import urllib, urllib2
import threading
import cPickle

from supply import supply


class supply_thread(threading.Thread):

    def __init__(self, _sock):
        threading.Thread.__init__(self)
        self.__socket = _sock

    def run(self):
        data = self.readline()
        self.__socket.close()
        new_supply = supply.supply(data)
        new_supply.run()

    def readline(self):
        """ read data sent from webserver and decode it """

        data = self.__socket.recv( 1024 )
        if data:
            data = cPickle.loads(data)
            return data



class server:

    def __init__(self):
        ## Socket Vars
        self.__socket = None
        self.HOST = ''
        self.PORT = 50007
        self.name = socket.gethostname()

        self.max_jobs = 3


    def listen(self):
        """ Listen for a connection from the webserver """

        self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Allows quick connection from the same address
        self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        self.__socket.bind((self.HOST, self.PORT))
        return self.__socket.listen(1)

    def connect(self):
        webserver = self.__socket.accept()[0]
        print 'Connected by', webserver

        new_thread = supply_thread(webserver)
        print 'Starting thread' , new_thread.getName()

        new_thread.start()

    def close(self):
        return self.__socket.close()


    def run(self):
        import time

        while True:
            print(sys.version)

            # Wait for connection from Webserver
            self.listen()

            time.sleep(3)

            # Let the Webserver know I'm avilable
            self.status(status='Available')

            print 'Waiting for connection...'
            self.connect()

            print 'thread count:', threading.enumerate()
            print 'thread count:', threading.active_count()

            while threading.active_count() >= self.max_jobs:
                pass


    def status(self, status='Available'):
        computer_name = socket.gethostname()
        svcURL = "http://localhost:8000/init/default/server"
        params = {
            'computer_name':computer_name,
            'status':status,
            'max_jobs':self.max_jobs
        }
        svcHandle = urllib2.urlopen(svcURL, urllib.urlencode(params))

【问题讨论】:

  • 有关如何启动线程的信息将有助于建议提供池实现的方法。您是否设置了站点列表及其参数?您可以轻松地添加一个Queue 甚至是一个简单的deque,并让每个线程都从中消耗一定数量的线程,但这完全取决于您从什么开始。
  • 一种限制并发作业数量的简单方法是使用恰好包含 3 个线程的池来运行所有作业,例如 to download a file using exactly 4 threads you could use multiprocessing.dummy.Pool

标签: python python-multithreading


【解决方案1】:

这听起来像是 Celery 的一个很好的用例。

基本上,您可以在 tasks.py 文件中创建一个 Celery 任务,然后使用 taskname.delay() 调用它。如果工人准备好接受另一个任务,它会将任务分派给芹菜工人并开始处理它。这是an example from the docs

默认情况下,Celery 将创建一个具有等于您机器中的核心数量according to the documentation 的工作线程。如果需要,您可以更改它。

或者,您可以使用QueuesHere's 另一个例子。

【讨论】:

  • 感谢您的建议,我想尽可能多地使用原生 Python。
  • 好的,在这种情况下,我认为队列会更合适。我将很快添加一个示例。
  • 我提供了一个队列如何工作的示例。您会将工作放入队列并设置一个线程来监视队列。当队列填满时,您将无法再放入其中。作为一个想法,您可以创建三个单独的队列,每个队列都有一个线程。
  • 谢谢凯文。据我了解,在队列上执行 .join() 会暂停主线程,直到队列中的所有作业都完成?
  • 理想情况下,我希望主线程在作业完成后恢复,而不是等到所有作业都完成,否则如果最后一个作业需要额外的 20 小时,效率会很低。谢谢你的建议。
猜你喜欢
  • 1970-01-01
  • 2017-07-24
  • 2017-12-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-01-03
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多