【问题标题】:How can I use threading in Python?如何在 Python 中使用线程?
【发布时间】:2023-04-09 15:36:01
【问题描述】:

我正在尝试理解 Python 中的线程。我查看了文档和示例,但坦率地说,许多示例过于复杂,我无法理解它们。

您如何清楚地显示为多线程划分的任务?

【问题讨论】:

  • 可以在 Jeff Knupp 的 Python's Hardest Problem 中找到有关此主题的一般性讨论。总之,线程似乎不适合初学者。
  • 哈哈,我倾向于认为线程适合所有人,但初学者不适合线程:)))))
  • 只是为了表明人们应该阅读所有答案,因为随着新语言功能的利用,以后的答案可能会更好......
  • 记得用 C 语言编写核心逻辑并通过 ctypes 调用它,以真正利用 Python 线程。
  • 我只是想补充一点,PyPubSub 是发送和接收消息以控制线程流的好方法

标签: python multithreading concurrency python-multithreading


【解决方案1】:

注意:对于 Python 中的实际并行化,您应该使用 multiprocessing 模块来分叉并行执行的多个进程(由于全局解释器锁,Python 线程提供交错,但它们是实际上是串行执行,而不是并行执行,并且仅在交错 I/O 操作时有用)。

但是,如果您只是在寻找交错(或者正在执行可以在全局解释器锁定的情况下进行并行化的 I/O 操作),那么 threading 模块是开始的地方。作为一个非常简单的例子,让我们考虑通过并行求和子范围来对大范围求和的问题:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

请注意,上面是一个非常愚蠢的示例,因为它绝对没有 I/O,并且由于全局解释器锁定,尽管在 CPython 中交错(增加了上下文切换的开销),但它将串行执行。

【讨论】:

  • @Alex,我没有说它实用,但它确实演示了如何定义和生成线程,我认为这是 OP 想要的。
  • 虽然这确实显示了如何定义和生成线程,但它实际上并没有并行地对子范围求和。 thread1 在主线程阻塞时一直运行直到它完成,然后 thread2 发生同样的事情,然后主线程恢复并打印出它们累积的值。
  • 不应该是super(SummingThread, self).__init__()吗?如stackoverflow.com/a/2197625/806988
  • @JamesAndres,假设没有人从“SummingThread”继承,那么任何一个都可以正常工作;在这种情况下, super(SummingThread, self) 只是在方法解析顺序 (MRO) 中查找下一个类的一种奇特方式,即 threading.Thread (然后随后在其上调用 init在这两种情况下)。不过,您是对的,因为对于当前的 Python,使用 super() 是更好的样式。 Super 在我提供此答案时相对较新,因此直接调用超类而不是使用 super()。不过,我会更新它以使用 super。
  • 警告:不要在这样的任务中使用多线程!正如 Dave Beazley 所示:dabeaz.com/python/NewGIL.pdf,2 个 CPU 上的 2 个 python 线程执行 CPU 繁重的任务比 1 个 CPU 上的 1 个线程慢 2 倍,比 1 个 CPU 上的 2 个线程慢 1.5 倍。这种奇怪的行为是由于操作系统和 Python 之间的努力不协调造成的。线程的实际用例是 I/O 繁重的任务。例如。当您通过网络执行读/写时,将等待读取/写入数据的线程置于后台并将 CPU 切换到另一个需要处理数据的线程是有意义的。
【解决方案2】:

这里有一个简单的例子:你需要尝试几个替代的 URL 并返回第一个的内容来响应。

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

这是一个使用线程作为简单优化的情况:每个子线程都在等待 URL 解析和响应,将其内容放入队列;每个线程都是一个守护进程(如果主线程结束,则不会保持进程正常运行——这比没有更常见);主线程启动所有子线程,在队列中执行get 以等待其中一个完成put,然后发出结果并终止(这会取消任何可能仍在运行的子线程,因为它们是守护线程)。

在 Python 中正确使用线程总是与 I/O 操作相关(因为 CPython 无论如何都不使用多核来运行 CPU 密集型任务,线程化的唯一原因是在等待某些进程时不阻塞进程输入/输出)。顺便说一句,队列几乎总是将工作分配给线程和/或收集工作结果的最佳方式,而且它们本质上是线程安全的,因此它们使您不必担心锁、条件、事件、信号量和其他交互-线程协调/通信概念。

【讨论】:

  • 再次感谢 MartelliBot。我更新了示例以等待所有 url 响应: import Queue, threading, urllib2 q = Queue.Queue() urls = '''a.com b.com c.com'''.split() urls_received = 0 def get_url(q, url): req = urllib2.Request(url) resp = urllib2.urlopen(req) q.put(resp.read()) global urls_received urls_received +=1 print urls_received for u in urls: t = threading.Thread(target= get_url, args = (q,u)) t.daemon = True t.start() while q.empty() and urls_received
  • @JRM:如果您查看下面的下一个答案,我认为等待线程完成的更好方法是使用 join() 方法,因为这会使主线程通过不断检查值等到它们完成而不消耗处理器。 @Alex:谢谢,这正是我了解如何使用线程所需要的。
  • 对于 python3,将 'import urllib2' 替换为 'import urllib.request as urllib2'。并在 print 语句中加上括号。
  • 对于 python 3,将 Queue 模块名称替换为 queue。方法名称相同。
  • 我注意到解决方案只会打印出其中一页。要打印队列中的两个页面,只需再次运行命令:s = q.get()print s@krs013 您不需要join,因为 Queue.get() 正在阻塞。
【解决方案3】:

与其他人提到的一样,由于GIL,CPython 只能将线程用于 I/O 等待。

如果您想从多核中受益于 CPU 密集型任务,请使用multiprocessing

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

【讨论】:

  • 你能解释一下这是做什么的吗?
  • @pandita:代码创建一个进程,然后启动它。所以现在同时发生了两件事:程序的主线,以及从目标开始的进程,f 函数。同时,主程序现在只是等待进程退出,joining 结束。如果主要部分刚刚退出,子进程可能会或可能不会运行到完成,因此始终建议使用join
  • 包含map 函数的扩展答案在这里:stackoverflow.com/a/28463266/2327328
  • @philshem 小心 b/c 您发布的链接使用的是线程池(不是进程),如此处提到的stackoverflow.com/questions/26432411/…。但是,这个答案正在使用一个过程。我对这些东西很陌生,但似乎(由于 GIL)在 Python 中使用多线程时,您只会在特定情况下获得性能提升。但是,使用进程池可以通过在一个进程上拥有多个核心工作来利用多核处理器。
  • 这是实际做一些有用的事情并利用多个 CPU 内核的最佳答案
【解决方案4】:

对我来说,线程的完美示例是监控异步事件。看看这段代码。

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

您可以通过打开IPython 会话并执行以下操作来使用此代码:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

等几分钟

>>> a[0] = 2
Mon = 2

【讨论】:

  • AttributeError: 'Monitor' 对象没有属性 'stop' ?
  • 在等待事件发生时,您不是在消耗 CPU 周期吗?并不总是一件非常实际的事情。
  • 就像 mogul 所说,这将不断执行。至少您可以添加一个短暂的睡眠,比如 sleep(0.1),这可能会显着降低像这样的简单示例中的 CPU 使用率。
  • 这是一个可怕的例子,浪费了一个核心。至少添加一个睡眠,但正确的解决方案是使用一些信号机制。
  • 我最近读过 GIL,我想知道如何在启动的线程运行时输入 a[0] = 2,这是一个 python CPU 绑定任务。 GIL 是否会阻止您运行任何其他 python 代码,因为它曾经被 Monitor 线程获取?还是python不断在线程之间切换,而GIL只是防止没有线程同时执行但可以并发(但不能并行)执行?
【解决方案5】:

请注意:线程不需要队列。

这是我能想象到的最简单的例子,它显示了 10 个同时运行的进程。

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

【讨论】:

  • 将最后一个引号添加到“完成”以使其打印“完成”
  • 我比 Martelli 更喜欢这个例子,它更容易上手。但是,我建议 printNumber 执行以下操作,以便更清楚地了解发生了什么:它应该在睡觉之前将 randint 保存到一个变量中,然后打印应该更改为 "Thread" + str( number) + " 睡了 " + theRandintVariable + " seconds"
  • 有没有办法知道每个线程何时完成?
  • @Matt 有几种方法可以做到这一点,但这取决于您的需要。一种方法是更新单例或其他一些可公开访问的变量,这些变量在 while 循环中被监视并在线程结束时更新。
  • 不需要第二个for循环,你可以在第一个循环中调用thread.start()
【解决方案6】:

The answer from Alex Martelli 帮助了我。但是,这是一个我认为更有用的修改版本(至少对我来说)。

更新:适用于 Python 2 和 Python 3

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

【讨论】:

  • 为什么不打破异常?
  • 你可以,只是个人喜好
  • 我还没有运行代码,但你不需要守护线程吗?我认为在最后一个 for 循环之后,您的程序可能会退出 - 至少应该退出,因为线程应该是这样工作的。我认为更好的方法不是将工作数据放入队列,而是将输出放入队列,因为这样您就可以拥有一个主循环,它不仅处理从工作人员进入队列的信息,而且现在它也不是线程,你知道它不会过早退出。
  • @dylnmc,这超出了我的用例(我的输入队列是预定义的)。如果您想走自己的路,我建议您查看celery
  • @JimJty 你知道我为什么会收到这个错误吗:import Queue ModuleNotFoundError: No module named 'Queue' 我正在运行 python 3.6.5 一些帖子提到在 python 3.6.5 中它是 queue 但即使在我改变之后还是不行
【解决方案7】:

我发现这非常有用:创建与内核一样多的线程并让它们执行(大量)任务(在这种情况下,调用 shell 程序):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done

【讨论】:

  • @shavenwarthog 确信可以根据自己的需要调整“cpus”变量。无论如何,子进程调用将产生子进程,这些子进程将由操作系统分配 cpus(python 的“父进程”并不意味着子进程的“相同 CPU”)。
  • 你是对的,我关于“线程与父进程在同一个 CPU 上启动”的评论是错误的。感谢您的回复!
  • 可能值得注意的是,与使用相同内存空间的多线程不同,多处理不能轻易共享变量/数据。 +1 虽然。
【解决方案8】:

自从 2010 年提出这个问题以来,如何使用 Python 进行简单的多线程处理已经有了真正的简化:mappool

以下代码来自您绝对应该查看的文章/博客文章(无隶属关系) - Parallelism in one line: A Better Model for Day to Day Threading Tasks。我将在下面进行总结 - 它最终只是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

哪个是多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

说明

Map 是一个很酷的小函数,是轻松将并行性注入 Python 代码的关键。对于那些不熟悉的人来说,map 是从像 Lisp 这样的函数式语言中提炼出来的。它是一个将另一个函数映射到序列上的函数。

Map 为我们处理序列上的迭代,应用函数,并将所有结果存储在最后一个方便的列表中。


实施

map 函数的并行版本由两个库提供:multiprocessing,以及它鲜为人知但同样出色的 step child:multiprocessing.dummy。

multiprocessing.dummy 与多处理模块完全相同,but uses threads insteadan important distinction - 将多个进程用于 CPU 密集型任务;线程用于(和期间)I/O):

multiprocessing.dummy 复制了多处理的 API,但只不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

以及计时结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数(像这样only in Python 3.3 and later):

传递多个数组:

results = pool.starmap(function, zip(list_a, list_b))

或者传递一个常量和一个数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是较早版本的 Python,则可以通过 this workaround 传递多个参数。

(感谢user136036 提供有用的评论。)

【讨论】:

  • 这只是缺少投票,因为它是如此新鲜发布。这个答案效果很好,并演示了“地图”功能,它比这里的其他答案更容易理解语法。
  • 这是线程而不是进程吗?似乎它试图多进程!=多线程
  • 顺便说一句,伙计们,你也可以写with Pool(8) as p: p.map( *whatever* ) 并摆脱簿记行。
  • @BarafuAlbino:虽然很有用,但可能值得注意的是这个only works in Python 3.3+
  • 你怎么能留下这个答案而不提它只对 I/O 操作有用?这仅在单个线程上运行,这在大多数情况下是无用的,并且实际上比以正常方式执行要慢
【解决方案9】:

使用全新的concurrent.futures 模块

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

执行器方法对于所有以前接触过 Java 的人来说似乎都很熟悉。

另外附注:为了保持宇宙的健全,如果您不使用 with 上下文,请不要忘记关闭您的池/执行程序(这太棒了,它为您做到了)

【讨论】:

    【解决方案10】:

    给定一个函数f,像这样线程化它:

    import threading
    threading.Thread(target=f).start()
    

    将参数传递给f

    threading.Thread(target=f, args=(a,b,c)).start()
    

    【讨论】:

    • 这很简单。如何确保线程在完成后关闭?
    • 据我了解,当函数退出时,Thread 对象会被清除。见the docs。如果需要,可以使用is_alive() 方法检查线程。
    • 我看到了is_alive 方法,但我不知道如何将它应用到线程中。我尝试分配thread1=threading.Thread(target=f).start(),然后用thread1.is_alive() 检查它,但thread1 填充有None,所以没有运气。不知道有没有其他方法可以访问线程?
    • 您需要将线程对象分配给一个变量,然后使用该变量启动它:thread1=threading.Thread(target=f),后跟thread1.start()。然后你可以做thread1.is_alive()
    • 那行得通。是的,只要函数退出,使用thread1.is_alive() 进行测试就会返回False
    【解决方案11】:

    这里有一个简单的多线程示例,它会很有帮助。您可以运行它并轻松理解多线程在 Python 中是如何工作的。我使用锁来防止访问其他线程,直到前一个线程完成工作。通过使用这行代码,

    tLock = threading.BoundedSemaphore(value=4)

    您可以一次允许多个进程,并保留将在稍后或完成之前的进程之后运行的其余线程。

    import threading
    import time
    
    #tLock = threading.Lock()
    tLock = threading.BoundedSemaphore(value=4)
    def timer(name, delay, repeat):
        print  "\r\nTimer: ", name, " Started"
        tLock.acquire()
        print "\r\n", name, " has the acquired the lock"
        while repeat > 0:
            time.sleep(delay)
            print "\r\n", name, ": ", str(time.ctime(time.time()))
            repeat -= 1
    
        print "\r\n", name, " is releaseing the lock"
        tLock.release()
        print "\r\nTimer: ", name, " Completed"
    
    def Main():
        t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
        t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
        t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
        t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
        t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))
    
        t1.start()
        t2.start()
        t3.start()
        t4.start()
        t5.start()
    
        print "\r\nMain Complete"
    
    if __name__ == "__main__":
        Main()
    

    【讨论】:

      【解决方案12】:

      以前的解决方案实际上都没有在我的 GNU/Linux 服务器上使用多个内核(我没有管理员权限)。它们只是在一个内核上运行。

      我使用较低级别的os.fork 接口来生成多个进程。这是对我有用的代码:

      from os import fork
      
      values = ['different', 'values', 'for', 'threads']
      
      for i in range(len(values)):
          p = fork()
          if p == 0:
              my_function(values[i])
              break
      

      【讨论】:

        【解决方案13】:

        Python 3 具有launching parallel tasks 的功能。这让我们的工作更轻松。

        它有thread poolingprocess pooling

        下面给出一个见解:

        ThreadPoolExecutor 示例 (source)

        import concurrent.futures
        import urllib.request
        
        URLS = ['http://www.foxnews.com/',
                'http://www.cnn.com/',
                'http://europe.wsj.com/',
                'http://www.bbc.co.uk/',
                'http://some-made-up-domain.com/']
        
        # Retrieve a single page and report the URL and contents
        def load_url(url, timeout):
            with urllib.request.urlopen(url, timeout=timeout) as conn:
                return conn.read()
        
        # We can use a with statement to ensure threads are cleaned up promptly
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            # Start the load operations and mark each future with its URL
            future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (url, exc))
                else:
                    print('%r page is %d bytes' % (url, len(data)))
        

        ProcessPoolExecutor (source)

        import concurrent.futures
        import math
        
        PRIMES = [
            112272535095293,
            112582705942171,
            112272535095293,
            115280095190773,
            115797848077099,
            1099726899285419]
        
        def is_prime(n):
            if n % 2 == 0:
                return False
        
            sqrt_n = int(math.floor(math.sqrt(n)))
            for i in range(3, sqrt_n + 1, 2):
                if n % i == 0:
                    return False
            return True
        
        def main():
            with concurrent.futures.ProcessPoolExecutor() as executor:
                for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
                    print('%d is prime: %s' % (number, prime))
        
        if __name__ == '__main__':
            main()
        

        【讨论】:

          【解决方案14】:

          大多数文档和教程都使用 Python 的 ThreadingQueue 模块,对于初学者来说,它们似乎难以承受。

          或许可以考虑 Python 3 的 concurrent.futures.ThreadPoolExecutor 模块。

          结合with 子句和列表理解,它可能是一个真正的魅力。

          from concurrent.futures import ThreadPoolExecutor, as_completed
          
          def get_url(url):
              # Your actual program here. Using threading.Lock() if necessary
              return ""
          
          # List of URLs to fetch
          urls = ["url1", "url2"]
          
          with ThreadPoolExecutor(max_workers = 5) as executor:
          
              # Create threads
              futures = {executor.submit(get_url, url) for url in urls}
          
              # as_completed() gives you the threads once finished
              for f in as_completed(futures):
                  # Get the results
                  rs = f.result()
          

          【讨论】:

            【解决方案15】:

            我在这里看到了很多没有执行实际工作的示例,而且它们大多受 CPU 限制。这是一个 CPU 密集型任务的示例,它计算 1000 万到 1005 万之间的所有素数。我在这里使用了所有四种方法:

            import math
            import timeit
            import threading
            import multiprocessing
            from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
            
            
            def time_stuff(fn):
                """
                Measure time of execution of a function
                """
                def wrapper(*args, **kwargs):
                    t0 = timeit.default_timer()
                    fn(*args, **kwargs)
                    t1 = timeit.default_timer()
                    print("{} seconds".format(t1 - t0))
                return wrapper
            
            def find_primes_in(nmin, nmax):
                """
                Compute a list of prime numbers between the given minimum and maximum arguments
                """
                primes = []
            
                # Loop from minimum to maximum
                for current in range(nmin, nmax + 1):
            
                    # Take the square root of the current number
                    sqrt_n = int(math.sqrt(current))
                    found = False
            
                    # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
                    for number in range(2, sqrt_n + 1):
            
                        # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
                        if current % number == 0:
                            found = True
                            break
            
                    # If not divisible, add this number to the list of primes that we have found so far
                    if not found:
                        primes.append(current)
            
                # I am merely printing the length of the array containing all the primes, but feel free to do what you want
                print(len(primes))
            
            @time_stuff
            def sequential_prime_finder(nmin, nmax):
                """
                Use the main process and main thread to compute everything in this case
                """
                find_primes_in(nmin, nmax)
            
            @time_stuff
            def threading_prime_finder(nmin, nmax):
                """
                If the minimum is 1000 and the maximum is 2000 and we have four workers,
                1000 - 1250 to worker 1
                1250 - 1500 to worker 2
                1500 - 1750 to worker 3
                1750 - 2000 to worker 4
                so let’s split the minimum and maximum values according to the number of workers
                """
                nrange = nmax - nmin
                threads = []
                for i in range(8):
                    start = int(nmin + i * nrange/8)
                    end = int(nmin + (i + 1) * nrange/8)
            
                    # Start the thread with the minimum and maximum split up to compute
                    # Parallel computation will not work here due to the GIL since this is a CPU-bound task
                    t = threading.Thread(target = find_primes_in, args = (start, end))
                    threads.append(t)
                    t.start()
            
                # Don’t forget to wait for the threads to finish
                for t in threads:
                    t.join()
            
            @time_stuff
            def processing_prime_finder(nmin, nmax):
                """
                Split the minimum, maximum interval similar to the threading method above, but use processes this time
                """
                nrange = nmax - nmin
                processes = []
                for i in range(8):
                    start = int(nmin + i * nrange/8)
                    end = int(nmin + (i + 1) * nrange/8)
                    p = multiprocessing.Process(target = find_primes_in, args = (start, end))
                    processes.append(p)
                    p.start()
            
                for p in processes:
                    p.join()
            
            @time_stuff
            def thread_executor_prime_finder(nmin, nmax):
                """
                Split the min max interval similar to the threading method, but use a thread pool executor this time.
                This method is slightly faster than using pure threading as the pools manage threads more efficiently.
                This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
                """
                nrange = nmax - nmin
                with ThreadPoolExecutor(max_workers = 8) as e:
                    for i in range(8):
                        start = int(nmin + i * nrange/8)
                        end = int(nmin + (i + 1) * nrange/8)
                        e.submit(find_primes_in, start, end)
            
            @time_stuff
            def process_executor_prime_finder(nmin, nmax):
                """
                Split the min max interval similar to the threading method, but use the process pool executor.
                This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
                RECOMMENDED METHOD FOR CPU-BOUND TASKS
                """
                nrange = nmax - nmin
                with ProcessPoolExecutor(max_workers = 8) as e:
                    for i in range(8):
                        start = int(nmin + i * nrange/8)
                        end = int(nmin + (i + 1) * nrange/8)
                        e.submit(find_primes_in, start, end)
            
            def main():
                nmin = int(1e7)
                nmax = int(1.05e7)
                print("Sequential Prime Finder Starting")
                sequential_prime_finder(nmin, nmax)
                print("Threading Prime Finder Starting")
                threading_prime_finder(nmin, nmax)
                print("Processing Prime Finder Starting")
                processing_prime_finder(nmin, nmax)
                print("Thread Executor Prime Finder Starting")
                thread_executor_prime_finder(nmin, nmax)
                print("Process Executor Finder Starting")
                process_executor_prime_finder(nmin, nmax)
            
            main()
            

            这是我的 Mac OS X 四核机器上的结果

            Sequential Prime Finder Starting
            9.708213827005238 seconds
            Threading Prime Finder Starting
            9.81836523200036 seconds
            Processing Prime Finder Starting
            3.2467174359990167 seconds
            Thread Executor Prime Finder Starting
            10.228896902000997 seconds
            Process Executor Finder Starting
            2.656402041000547 seconds
            

            【讨论】:

            • @TheUnfunCat no process executor s far better than threading for cpu bound tasks
            • 老兄的回答很好。我可以确认,在 Windows 上的 Python 3.6 中(至少) ThreadPoolExecutor 对 CPU 繁重的任务没有任何好处。它没有利用核心进行计算。虽然 ProcessPoolExecutor 将数据复制到它产生的每个进程中,但它对于大型矩阵来说是致命的。
            • 非常有用的例子,但我不明白它是如何工作的。在主调用之前我们需要一个if __name__ == '__main__':,否则测量会自行生成并打印An attempt has been made to start a new process before...
            • @Stein 我相信这只是 Windows 上的一个问题。
            【解决方案16】:

            这是一个非常简单的CSV 使用线程导入示例。 (图书馆收录可能因不同目的而有所不同。)

            辅助函数:

            from threading import Thread
            from project import app
            import csv
            
            
            def import_handler(csv_file_name):
                thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
                thr.start()
            
            def dump_async_csv_data(csv_file_name):
                with app.app_context():
                    with open(csv_file_name) as File:
                        reader = csv.DictReader(File)
                        for row in reader:
                            # DB operation/query
            

            驱动功能:

            import_handler(csv_file_name)
            

            【讨论】:

              【解决方案17】:
              import threading
              import requests
              
              def send():
              
                r = requests.get('https://www.stackoverlow.com')
              
              thread = []
              t = threading.Thread(target=send())
              thread.append(t)
              t.start()
              

              【讨论】:

              • @sP_ 我猜是因为你有线程对象,所以你可以等待它们完成。
              • t = threading.Thread(target=send()) 应该是 t = threading.Thread(target=send)
              • 我对这个答案投了反对票,因为除了包含严重的不准确之外,它没有解释它如何改进现有答案。
              【解决方案18】:

              通过借鉴this post,我们知道如何在多线程、多处理和异步/asyncio 之间进行选择以及它们的用法。

              Python 3 有一个新的内置库来实现并发性和并行性:concurrent.futures

              所以我将通过一个实验来演示通过Threading-Pool运行四个任务(即.sleep()方法):

              from concurrent.futures import ThreadPoolExecutor, as_completed
              from time import sleep, time
              
              def concurrent(max_worker):
                  futures = []
                  tic = time()
                  with ThreadPoolExecutor(max_workers=max_worker) as executor:
                      futures.append(executor.submit(sleep, 2))  # Two seconds sleep
                      futures.append(executor.submit(sleep, 1))
                      futures.append(executor.submit(sleep, 7))
                      futures.append(executor.submit(sleep, 3))
                      for future in as_completed(futures):
                          if future.result() is not None:
                              print(future.result())
                  print(f'Total elapsed time by {max_worker} workers:', time()-tic)
              
              concurrent(5)
              concurrent(4)
              concurrent(3)
              concurrent(2)
              concurrent(1)
              

              输出:

              Total elapsed time by 5 workers: 7.007831811904907
              Total elapsed time by 4 workers: 7.007944107055664
              Total elapsed time by 3 workers: 7.003149509429932
              Total elapsed time by 2 workers: 8.004627466201782
              Total elapsed time by 1 workers: 13.013478994369507
              

              [注意]:

              • 如您在上述结果中所见,最好的情况是 3 个工人完成这四项任务。
              • 如果您有一个进程任务而不是 I/O 绑定或阻塞(multiprocessing 而不是 threading),您可以将 ThreadPoolExecutor 更改为 ProcessPoolExecutor

              【讨论】:

                【解决方案19】:

                我想提供一个简单的示例和我发现在我必须自己解决这个问题时有用的解释。

                在此答案中,您将找到有关 Python 的 GIL(全局解释器锁)的一些信息以及使用 multiprocessing.dummy 编写的简单日常示例以及一些简单的基准测试。

                全局解释器锁 (GIL)

                Python 不允许真正意义上的多线程。它有一个多线程包,但是如果你想多线程来加速你的代码,那么使用它通常不是一个好主意。

                Python 有一个称为全局解释器锁 (GIL) 的结构。 GIL 确保在任何时候只有一个“线程”可以执行。一个线程获取 GIL,做一些工作,然后将 GIL 传递给下一个线程。

                这发生得非常快,因此在人眼看来,您的线程似乎是在并行执行,但实际上它们只是轮流使用相同的 CPU 内核。

                所有这些 GIL 传递都会增加执行开销。这意味着如果你想让你的代码运行得更快,那么使用线程 打包通常不是一个好主意。

                使用 Python 的线程包是有原因的。如果你想同时运行一些东西,效率不是问题, 然后就完全没问题了,很方便。或者,如果您正在运行需要等待某事(例如某些 I/O)的代码,那么它可能会很有意义。但是线程库不允许您使用额外的 CPU 内核。

                多线程可以外包给操作系统(通过执行多处理),以及一些调用您的 Python 代码的外部应用程序(例如,SparkHadoop),或者您的 Python 代码的一些代码调用(例如:你可以让你的 Python 代码调用一个 C 函数来执行昂贵的多线程操作)。

                为什么重要

                因为很多人在了解 GIL 是什么之前,会花费大量时间试图找到他们花哨的 Python 多线程代码中的瓶颈。

                一旦这些信息清楚,这是我的代码:

                #!/bin/python
                from multiprocessing.dummy import Pool
                from subprocess import PIPE,Popen
                import time
                import os
                
                # In the variable pool_size we define the "parallelness".
                # For CPU-bound tasks, it doesn't make sense to create more Pool processes
                # than you have cores to run them on.
                #
                # On the other hand, if you are using I/O-bound tasks, it may make sense
                # to create a quite a few more Pool processes than cores, since the processes
                # will probably spend most their time blocked (waiting for I/O to complete).
                pool_size = 8
                
                def do_ping(ip):
                    if os.name == 'nt':
                        print ("Using Windows Ping to " + ip)
                        proc = Popen(['ping', ip], stdout=PIPE)
                        return proc.communicate()[0]
                    else:
                        print ("Using Linux / Unix Ping to " + ip)
                        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
                        return proc.communicate()[0]
                
                
                os.system('cls' if os.name=='nt' else 'clear')
                print ("Running using threads\n")
                start_time = time.time()
                pool = Pool(pool_size)
                website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
                result = {}
                for website_name in website_names:
                    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
                pool.close()
                pool.join()
                print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))
                
                # Now we do the same without threading, just to compare time
                print ("\nRunning NOT using threads\n")
                start_time = time.time()
                for website_name in website_names:
                    do_ping(website_name)
                print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))
                
                # Here's one way to print the final output from the threads
                output = {}
                for key, value in result.items():
                    output[key] = value.get()
                print ("\nOutput aggregated in a Dictionary:")
                print (output)
                print ("\n")
                
                print ("\nPretty printed output: ")
                for key, value in output.items():
                    print (key + "\n")
                    print (value)
                

                【讨论】:

                  【解决方案20】:

                  作为python3版本的第二个anwser:

                  import queue as Queue
                  import threading
                  import urllib.request
                  
                  # Called by each thread
                  def get_url(q, url):
                      q.put(urllib.request.urlopen(url).read())
                  
                  theurls = ["http://google.com", "http://yahoo.com", "http://www.python.org","https://wiki.python.org/moin/"]
                  
                  q = Queue.Queue()
                  def thread_func():
                      for u in theurls:
                          t = threading.Thread(target=get_url, args = (q,u))
                          t.daemon = True
                          t.start()
                  
                      s = q.get()
                      
                  def non_thread_func():
                      for u in theurls:
                          get_url(q,u)
                          
                  
                      s = q.get()
                     
                  

                  你可以测试一下:

                  start = time.time()
                  thread_func()
                  end = time.time()
                  print(end - start)
                  
                  start = time.time()
                  non_thread_func()
                  end = time.time()
                  print(end - start)
                  

                  non_thread_func() 花费的时间应该是 thread_func() 的 4 倍

                  【讨论】:

                    【解决方案21】:

                    这很容易理解。以下是进行线程化的两种简单方法。

                    import time
                    from concurrent.futures import ThreadPoolExecutor, as_completed
                    import threading
                    
                    def a(a=1, b=2):
                        print(a)
                        time.sleep(5)
                        print(b)
                        return a+b
                    
                    def b(**kwargs):
                        if "a" in kwargs:
                            print("am b")
                        else:
                            print("nothing")
                            
                    to_do=[]
                    executor = ThreadPoolExecutor(max_workers=4)
                    ex1=executor.submit(a)
                    to_do.append(ex1)
                    ex2=executor.submit(b, **{"a":1})
                    to_do.append(ex2)
                    
                    for future in as_completed(to_do):
                        print("Future {} and Future Return is {}\n".format(future, future.result()))
                    
                    print("threading")
                    
                    to_do=[]
                    to_do.append(threading.Thread(target=a))
                    to_do.append(threading.Thread(target=b, kwargs={"a":1}))
                    
                    for threads in to_do:
                        threads.start()
                        
                    for threads in to_do:
                        threads.join()
                    

                    【讨论】:

                    • 如何从函数a() 中检索a+b
                    猜你喜欢
                    • 1970-01-01
                    • 2018-06-10
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    相关资源
                    最近更新 更多