1.IO编程

IO(input/output)。凡是用到数据交换的地方,都会涉及io编程,例如磁盘,网络的数据传输。在IO编程中,stream(流)是一种重要的概念,分为输入流(input stream)和输出流(output stream)。可以把流季节为一个水管,数据相当于水管中的水,但是只能单向流动,所以数据传输过程中需要假设两个水管,一个负责输入,一个负责输出,这样读写就可以实现同步。

2.GIL:global interpreter lock全局解释器锁

在Python中,GIL导致同一时刻只有一个线程进入解释器。

计算机有些操作不依赖于CPU,I/O操作几乎不利用,IO密集型不占用CPU,多线程可完成

计算操作都要用到CPU,适合多进程

  • IO密集型任务或函数,可以用多线程
  • 计算密集型任务函数,sorry,改C
  • 计算密集型程序适合C语言多线程,I/O密集型适合脚本语言开发的多线程

线程和进程的区别:

  • 线程共享创建它的进程的地址空间; 进程有自己的地址空间。
  • 线程可以直接访问其进程的数据段; 进程有自己的父进程的数据段副本。
  • 线程可以直接与其进程的其他线程进行通信; 进程必须使用进程间通信来与兄弟进程进行通信。
  • 新线程很容易创建; 新进程需要父进程的重复。
  • 线程可以对相同进程的线程进行相当的控制;
  • 流程只能控制子进程。
  • 对主线程的更改(取消,优先级更改等)可能会影响进程的其他线程的行为; 对父进程的更改不会影响子进程。

进程优点:同时利用多个CPU,同时进行多个操作;缺点:耗费资源(需要开辟多个内存空间)

线程优点:共享多个资源,IO操作,创造并发操作;缺点:抢占资源

3.threading模块

threading模块对象
Thread    表示一个线程的执行的对象
Lock    锁原语对象(跟thread模块里的锁对象相同)
RLock    可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定)
Condition条件变量对象能让一个线程停下来,等待其他线程满足了某个“条件”。如状态的改变或值的改变
Event    通用的条件变量。多个线程可以等待某个时间的发生,在事件发生后,所有的线程都被激活
Semaphore    为等待锁的线程提供一个类似“等候室”的结构
BoundedSemaphore    与Semaphore类似,只是它不允许超过初始值
Timer    与thread类似,只是它要等待一段时间后才开始运行
Barrier    创建一个障碍,必须达到指定数量的线程后才可以继续

3.1线程的两种调用方式

#直接调用
import time
import threading

begin=time.time()
def foo(n):
    print('start-foo%s'%n)
    time.sleep(1)
    print('end foo')

def bar(n):
    print('start-bar%s'%n)
    time.sleep(2)
    print('end bar')

# foo(2)
# bar(2)
t1=threading.Thread(target=foo,args=(1,))
t2=threading.Thread(target=bar,args=(1,))
t1.start()
t2.start()
end=time.time()
print('_________main_________')
print(t1.getName())
print(t2.getName())
print(end-begin)
#继承式调用
import threading
import time
class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num
        
    def run(self):  #定义每个线程要运行的函数
        print("running on number:%s" % self.num)
        time.sleep(4)

if __name__ == '__main__':
    t1 = MyThread(111)
    t2 = MyThread(222)
    t1.start()
    t2.start()

threading的Thread类主要的运行对象

函数
start()                开始线程的执行
run()                定义线程的功能的函数(一般会被子类重写)
join(timeout=None)    程序挂起,直到线程结束;如果给了timeout,则最多阻塞timeout秒
getName()            返回线程的名字
setName(name)        设置线程的名字
isAlive()            布尔标志,表示这个线程是否还在运行中
isDaemon()            返回线程的daemon标志
setDaemon(daemonic)    把线程的daemon标志设为daemonic(一定要在调用start()函数前调用)
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

3.2 join and setDaemon

import threading
from time import ctime,sleep

def music(func):
    for i in range(2):
        print ("Begin listening to %s. %s" %(func,ctime()))
        sleep(2)
        print("end listening %s"%ctime())

def move(func):
    for i in range(2):
        print ("Begin watching at the %s! %s" %(func,ctime()))
        sleep(5)
        print('end watching %s'%ctime())

threads = []
t1 = threading.Thread(target=music,args=('七里香',))
threads.append(t1)
t2 = threading.Thread(target=move,args=('阿甘正传',))
threads.append(t2)

# join在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
if __name__ == '__main__':
    t2.setDaemon(True)
    for t in threads:
        t.setDaemon(True)#守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。
        t.start()
        # t.join()#跟线程没有关系了,像正常一样运行,没意义
    t1.join()
    # t2.join()########考虑这三种join位置下的结果?
    print(threading.current_thread())
    print(threading.active_count())
    print ("all over %s" %ctime())

join()会等到线程结束,或者在给了timeout参数的时候,等到超时为止。使用join()比使用一个等待锁释放的无限循环清楚一些(也称“自旋锁”)。

join()的另一个比较重要的方法是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。
如果你的主线程除了等线程结束外,还有其他的事情要做(如处理或等待其他的客户请求),那就不用调用join(),只有在你要等待线程结束的时候才要调用join()。

setDaemon(True):将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法

3.3 同步锁Lock

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

import time
import threading
def addNum():
    global num #在每个线程中都获取这个全局变量
    num-=1
    temp=num
    print('--get num:',num )
    print('看下结果是0')
    print('再看下结果')
    time.sleep(0.001)
    num =temp-1 #对此公共变量进行-1操作

num = 100  #设定一个共享变量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('final num:', num )

注意:

1:  why num-=1没问题呢?这是因为动作太快(完成这个动作在切换的时间内)

2: if sleep(1),现象会更明显,100个线程每一个一定都没有执行完就进行了切换,我们说过sleep就等效于IO阻塞,1s之内不会再切换回来,所以最后的结果一定是99.

多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?

有同学会想用join呗,但join会把整个线程给停住,造成了串行,失去了多线程的意义,而我们只需要把计算(涉及到操作公共数据)的时候串行执行。

我们可以通过同步锁来解决这种问题

import time
import threading
begin=time.time()
def addNum():
    global num #在每个线程中都获取这个全局变量
    # num-=1
    lock.acquire()
    temp=num
    # print('--get num:',num )
    time.sleep(0.0001)
    num =temp-1 #对此公共变量进行-1操作
    lock.release()
num = 100  #设定一个共享变量
thread_list = []
lock=threading.Lock()
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list: #等待所有线程执行完毕
    t.join()
end=time.time()
print('final num:', num )
print(end-begin)

同步锁与GIL的关系?

Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。

GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。
但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。

Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,
以及两个基本的方法。可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。
池中的线程处于状态图中的同步阻塞状态。
构造方法:Lock()
实例方法:
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

3.4 线程死锁和递归锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,
处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
构造方法:RLock()
实例方法:acquire([timeout])/release(): 跟Lock差不多。

#__author: greg
#date: 2017/9/17 21:38
# 线程死锁和递归锁
# Thread deadlocks and recursive locks
import threading,time
class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()

if __name__=="__main__":
    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()#等待线程结束
View Code

相关文章:

  • 2022-12-23
  • 2021-10-26
  • 2022-01-14
  • 2022-12-23
猜你喜欢
  • 2021-04-17
  • 2021-10-11
  • 2021-10-23
  • 2021-10-15
  • 2021-07-01
相关资源
相似解决方案