【问题标题】:How to close Threads in Python?如何在 Python 中关闭线程?
【发布时间】:2013-01-10 23:16:14
【问题描述】:

我有太多未完成的线程的问题。 我认为队列命令 .join() 只是关闭队列而不是使用它的线程。

在我的脚本中,我需要检查 280k 域,并为每个域获取他的 MX 记录列表,并获取服务器的 IPv6 地址(如果有的话)。

我使用了线程,感谢它们,脚本的速度要快很多倍。但是有一个问题,虽然队列有join(),但是存活线程的数量一直在增长,直到出现一个错误,提示无法创建任何新线程(操作系统的限制?)。

当我从数据库中检索新域时,如何在每个 For 循环之后终止/关闭/停止/重置线程?

线程类定义...

class MX_getAAAA_thread(threading.Thread):
    def __init__(self,queue,id_domain):
        threading.Thread.__init__(self)
        self.queue = queue
        self.id_domain = id_domain


    def run(self):
        while True:
            self.mx = self.queue.get()

            res = dns.resolver.Resolver()
            res.lifetime = 1.5
            res.timeout = 0.5

            try:
                answers = res.query(self.mx,'AAAA')
                ip_mx = str(answers[0])
            except:
                ip_mx = "N/A"

            lock.acquire()

            sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()

            print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

            lock.release()
            self.queue.task_done()

线程类正在使用中... (主要的for循环不在这里,这只是他身体的一部分)

try:
    answers = resolver.query(domain, 'MX')

    qMX = Queue.Queue()
    for i in range(len(answers)):
        t = MX_getAAAA_thread(qMX,id_domain)
        t.setDaemon(True)
        threads.append(t)
        t.start()

    for mx in answers:
        qMX.put(mx.exchange)

    qMX.join()

except NoAnswer as e:
    print "MX - Error: No Answer"
except Timeout as etime:
    print "MX - Error: dns.exception.Timeout"

print "end of script"

我尝试过:

for thread in threads:
            thread.join()

队列完成后,thread.join() 永远不会停止等待,尽管实际上不需要等待,因为当 queue.join() 执行时,线程没有任何事情可做。

【问题讨论】:

    标签: python multithreading join queue


    【解决方案1】:

    当我的线程涉及这样的无限循环时,我经常做的是将条件更改为我可以从外部控制的东西。比如这样:

    def run(self):
        self.keepRunning = True
        while self.keepRunning:
            # do stuff
    

    这样,我可以从外部更改keepRunning 属性并将其设置为false,以便在下次检查循环条件时优雅地终止线程。

    顺便说一句。由于您似乎为放入队列的每个项目都生成了一个线程,因此您甚至根本不需要线程循环,尽管我认为您应该始终强制执行可以创建的线程的最大限制这样(即for i in range(min(len(answers), MAX_THREAD_COUNT)):

    另类

    在您的情况下,您可以重用线程,而不是在每次 for 循环迭代中终止线程。根据我从您的线程源中收集的信息,使线程对迭代唯一的所有因素都是您在创建时设置的id_domain 属性。但是,您也可以在队列中提供它,因此线程是完全独立的,您可以重用它们。

    这可能看起来像这样:

    qMX = Queue.Queue()
    threads = []
    for i in range(MAX_THREAD_COUNT):
        t = MX_getAAAA_thread(qMX)
        t.daemon = True
        threads.append(t)
        t.start()
    
    for id_domain in enumerateIdDomains():
        answers = resolver.query(id_domain, 'MX')
        for mx in answers:
            qMX.put((id_domain, mx.exchange)) # insert a tuple
    
    qMX.join()
    
    for thread in threads:
        thread.keepRunning = False
    

    当然,你需要稍微改变你的线程:

    class MX_getAAAA_thread(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
    
        def run(self):
            self.keepRunning = True
            while self.keepRunning:
                id_domain, mx = self.queue.get()
                # do stuff
    

    【讨论】:

    • 我的整个项目是获取 IPv6 地址。输入是 280 000 个域。我需要来自每个域的: ■ “www”的 AAAA 记录。 + 域 ■ “@ns.sk-nic.sk www”的粘合记录。 + 域 ■ 对于每个 NS 记录,询问它是否分配了 AAAA 记录 ■ 对于每个 MX 记录,询问它是否分配了 AAAA 记录 所以我以在从 DB 中选择的每个域之后使用线程的脚本结​​束 我认为可以更有效地完成,但我不知道如何。它可以通过并行加载域和每个域并行回答记录来完成。但这对于我的初学者技能来说太复杂了:)
    【解决方案2】:

    我不明白你为什么首先需要Queue
    毕竟在您的设计中,每个线程只处理一项任务。
    您应该能够在创建时将该任务传递给线程。
    这样你就不需要Queue 并且你摆脱了while-loop:

    class MX_getAAAA_thread(threading.Thread):
        def __init__(self, id_domain, mx):
            threading.Thread.__init__(self)
            self.id_domain = id_domain
            self.mx = mx
    

    然后你就可以去掉run-方法中的while-loop:

    def run(self):
        res = dns.resolver.Resolver()
        res.lifetime = 1.5
        res.timeout = 0.5
    
        try:
            answers = res.query(self.mx,'AAAA')
            ip_mx = str(answers[0])
        except:
            ip_mx = "N/A"
    
        with lock:
            sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()
    
            print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)
    

    为每个任务创建一个线程

    for mx in answers:
        t = MX_getAAAA_thread(qMX, id_domain, mx)
        t.setDaemon(True)
        threads.append(t)
        t.start()
    

    加入他们

    for thread in threads:
        thread.join()
    

    【讨论】:

    • 不确定我是否更喜欢这个解决方案。为什么要扇出这么多线程?它仍然非常低效。
    • @LorenAbrams,我同意——但这就是 OP 的做法。这个答案解决了他面临的永无止境的线程问题——当然,创建一些有限的线程池并通过队列向它们发送任务会更有效
    • 它可以在没有队列的情况下工作......非常感谢;)但我一直在按照 Loren 的建议设置 stoppageFlag,但我无法让它工作。
    • @user1610458 你最好选择 poke 的答案——他的替代方法——恕我直言,这就是这样做的方法
    • “当然,创建一些有限的线程池并通过队列向它们发送任务会更有效” 当我不知道需要多少线程时,有必要创建有限的线程池吗?这样做是个好主意,但是当我确定时,例如,对于一个域,我永远不需要超过 50 个。我认为创建尽可能多的要求会更好。
    【解决方案3】:

    加入线程可以解决问题,但在您的情况下,连接会无限期地阻塞,因为您的线程永远不会退出您的运行循环。您需要退出 run 方法才能加入线程。

    【讨论】:

    • 另外,没有超时的 Queue.get 将永远阻塞,因为队列保持为空。
    • 你建议如何退出运行循环?也许我不完全理解它是如何在运行方法中工作的......如果我没记错的话,在我看到的每个例子中,都有 while true: 循环。这就是为什么线程永远不会退出运行循环的问题?
    • 我通常在这种情况下使用被动方法。即,在 MX_getAAAA_thread 上设置一个停止标志,并在循环中的不同点检查它的值。如果已设置,则跳出循环或从 run 方法返回。 -- 回答你的问题:是的,线程将无限期运行,直到正在执行的代码完成(或线程被杀死)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-04-12
    • 1970-01-01
    • 1970-01-01
    • 2020-07-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多