1 背景

面试被问到:有用过多线程吗?

2 理论知识

  • 进程是资源分配的最小单位,线程是CPU调度的最小单位
  • 两者在不同维度上的对比:
    Python | 多线程和多进程
  • 各自优缺点和适用场合:
    Python | 多线程和多进程

3 实际项目是如何用多进程的?

3.1 相关概念

再次重申相关概念:

  • 线程:线程是一个基本的 CPU 执行单元。它必须依托于进程存活。一个线程是一个execution context(执行上下文),即一个 CPU 执行时所需要的一串指令
  • 进程:进程是指一个程序在给定数据集合上的一次执行过程,是系统进行资源分配和运行调用的独立单位。可以简单地理解为操作系统中正在执行的程序。也就说,每个应用程序都有一个自己的进程。每一个进程启动时都会最先产生一个线程,即主线程。然后主线程会再创建其他的子线程。

上述两者区别:

  • 线程必须在某个进程中执行。
  • 一个进程可包含多个线程,其中有且只有一个主线程。
  • 多线程共享同个地址空间、打开的文件以及其他资源
  • 多进程共享物理内存、磁盘、打印机以及其他资源。

3.2 实际处理见下

4 多进程实例

4.1 读入数据

此处提供示例性讲解。

4.2 应用场景

场景:

  • 首先通过商户表拿到商户的所有商户号
  • 根据商户号分块去取商户交易流水进行计算
  • 计算的时候就是通过多进程的方式!

4.3 Python实现

步骤:

  • 显示主进程PID
  • 设置进程数以及每个进程处理的样本量
  • 设置子进程
  • 启动子进程
  • 等待所有进程结束
# 日志函数
def log(rec):
    with open("log", 'a') as f:
        f.write(rec+'\n')
# 显示主进程PID
import os
main_pid = os.getpid() 
with open("pid", "w") as f:
    f.write(str(main_pid)+'\n')
import time
# [1].多进程-计算交易特征
# 每个进程计算结果作为一小块放在index_res目录下
log("*** index computation start.")
time_all0 = time.time()
# 进程列表
ps = []
# 样本量
n = 10000
# 进程数
n_thread = 20
# 单个进程处理数据量
block = int(n / n_thread)
# 补充一个进程补完数据
n_thread = n_thread + 1

log("    block size: %d" % block)
block = 50
n_thread = 2

# 设置子进程
for i in range(50, 50+n_thread):
    p = Process(target = score.generate_index, # score.generate_index为计算交易流水的函数
        args=(i, i*block, (1+i)*block,)) # args为上述函数对应的参数
    ps.append(p)

# 启动子进程
for i in range(0, len(ps)):
    time.sleep(2)
    ps[i].daemon = True
    ps[i].start()  
    log("    [ ] thread [%d] launched." % i)
    
# 等待所有进程结束
for i in range(0, len(ps)):
    ps[i].join()

5 多线程实例

5.1 创建多线程

方法1:直接使用threading.Thread()

import threading

# 这个函数名可随便定义
def run(n):
    print("current task:", n)

if __name__ == "__main__":
    t1 = threading.Thread(target=run, args=("thread 1",))
    t2 = threading.Thread(target=run, args=("thread 2",))
    t1.start()
    t2.start()
current task: thread 1
current task: thread 2

方法2:继承threading.Thread来自定义线程类,重写run方法

import threading

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重构run函数必须要写
        self.n = n

    def run(self):
        print("current task:", n)

if __name__ == "__main__":
    t1 = MyThread("thread 1")
    t2 = MyThread("thread 2")

    t1.start()
    t2.start()

5.2 线程合并

Join函数执行顺序是逐个执行每个线程,执行完毕后继续往下执行。主线程结束后,子线程还在运行,join函数使得主线程等到子线程结束时才退出。

import threading

def count(n):
    n = int(n)
    while (n) > 0:
        n -= 1

if __name__ == "__main__":
    t1 = threading.Thread(target=count, args=("100000",))
    t2 = threading.Thread(target=count, args=("100000",))
    t1.start()
    t2.start()
    # 将 t1 和 t2 加入到主线程中
    t1.join()
    t2.join()

5.3 线程同步与互斥锁

线程之间数据共享的。当多个线程对某一个共享数据进行操作时,就需要考虑到线程安全问题。threading模块中定义了Lock 类,提供了互斥锁的功能来保证多线程情况下数据的正确性。

#创建锁
mutex = threading.Lock()

#锁定
mutex.acquire([timeout])
# 其中,锁定方法acquire可以有一个超时时间的可选参数timeout。
# 如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁,从而可以进行一些其他的处理。

#释放
mutex.release()
import threading
import time

num = 0
mutex = threading.Lock()

class MyThread(threading.Thread):
    def run(self):
        global num 
        time.sleep(1)

        if mutex.acquire(1):  
            num = num + 1
            msg = self.name + ': num value is ' + str(num)
            print(msg)
            mutex.release()

if __name__ == '__main__':
    for i in range(5):
        t = MyThread()
        t.start()

5.4 可重入锁(递归锁)

为了满足在同一线程中多次请求同一资源的需求,Python 提供了可重入锁(RLock)。
RLock内部维护着一个Lock和一个counter变量,counter 记录了 acquire 的次数,从而使得资源可以被多次 require。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源。

#创建 RLock
mutex = threading.RLock()

class MyThread(threading.Thread):
    def run(self):
        if mutex.acquire(1):
            print("thread " + self.name + " get mutex")
            time.sleep(1)
            mutex.acquire()
            mutex.release()
            mutex.release()

5.5 守护线程

如果希望主线程执行完毕之后,不管子线程是否执行完毕都随着主线程一起结束。我们可以使用setDaemon(bool)函数,它跟join函数是相反的。它的作用是设置子线程是否随主线程一起结束,必须在start() 之前调用,默认为False。

5.6 定时器

如果需要规定函数在多少秒后执行某个操作,需要用到Timer类。具体用法如下:

from threading import Timer
 
def show():
    print("Pyhton")

# 指定一秒钟之后执行 show 函数
t = Timer(1, show())
t.start()  
Pyhton

7 是选择多线程还是多进程?

在这个问题上,首先要看下你的程序是属于哪种类型的。一般分为两种 CPU 密集型 和 I/O 密集型。

  • CPU 密集型:程序比较偏重于计算,需要经常使用 CPU 来运算。例如科学计算的程序,机器学习的程序等。
  • I/O 密集型:顾名思义就是程序需要频繁进行输入输出操作。爬虫程序就是典型的 I/O 密集型程序。

如果程序是属于 CPU 密集型,建议使用多进程。而多线程就更适合应用于 I/O 密集型程序。

8 参考

相关文章: