进程 && 线程
进程:是内存中的一个独立的句柄,我们可以理解为一个应用程序在内存中就是一个进程。 各个进程之间是内存相互独立,不可共享的
线程:每个应用运行之后就会对应启动一个主线程,通过主线程可以创建多个字线程,各个线程共享主进程的内存空间。
关于线程、进程的解释有一篇有趣而生动的解释(http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html)

默认应用程序:是单进程、单线程的。

  进程是资源分配的最小单位。与程序相比,程序只是一组指令的有序集合,它本身没有任何运行的含义,只是一个静态实体。进程是程序在某个数据集上 的执行,是一个动态实体。它因创建而产生,因调度而运行,因等待资源或事件而被处于等待状态,因完成任务而被撤消,反映了一个程序在一定的数据集上运行的 全部动态过程。每个正在系统上运行的程序都是一个进程。每个进程包含一到多个线程。进程也可能是整个程序或者是部分程序的动态执行。

  线程是轻量级的进程或子进程,是CPU调度的最小单位,所有的线程都存在于相同的进程。所以线程基本上是轻量级的进程,它负责在单个程序里执行 多任务。通常由操作系统负责多个线程的调度和执行。多线程是为了同步完成多项任务,不是为了提高运行效率,而是为了提高资源使用效率来提高系统的效率。

 

GIL(全局解释器锁)
我们知道多进程(mutilprocess) 和 多线程(threading)的目的是用来被多颗CPU进行访问, 提高程序的执行效率。 但是在python内部存在一种机制(GIL),在多线程 时同一时刻只允许一个线程来访问CPU。
GIL 并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。
Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有 GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把 GIL 归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
python-线程、进程、协程

虽然python支持多线程,但是由于GIL的限制,在实际运行时,程序运行后开启多个线程,但在通过GIL后同时也只能有一个线程被CPU执行。

x进程和线程对比

 

 对比维度

多进程

多线程

总结

数据共享、同步

数据共享复杂,需要用IPC;数据是分开的,同步简单

因为共享进程数据,数据共享简单,但也是因为这个原因导致同步复杂

各有优势

内存、CPU

占用内存多,切换复杂,CPU利用率低

占用内存少,切换简单,CPU利用率高

线程占优

创建销毁、切换

创建销毁、切换复杂,速度慢

创建销毁、切换简单,速度很快

线程占优

编程、调试

编程简单,调试简单

编程复杂,调试复杂

进程占优

可靠性

进程间不会互相影响

一个线程挂掉将导致整个进程挂掉

进程占优

分布式

适应于多核、多机分布式;如果一台机器不够,扩展到多台机器比较简单

适应于多核分布式

进程占优

 

 

如何选用?

单进程,多线程的程序(io操作不占用CPU):如果是CPU密集型,那么则不能提高效率。如果是IO密集型,那么则能提高效率。

  多进程,单线程的程序:CPU密集型的,一般用多进程提高并发效率。

  小结:

  CPU密集型:多进程

  IO密集型:多线程

 

线程的创建方法:

有两种方式来创建线程:

一种是创建一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象作为参数传入。

一种是通过继承Thread类,重写它的run方法;

第一种:

import threading
def f1(arg):
    print(arg)
t = threading.Thread(target=f1,args=(123,))
t.start()
#什么时候去执行f1方法? 是在执行t.run()方法时执行f1()方法。t.run()不用我们去执行,代码自动执行

从上边可以看出,创建一个多线程程序,只需要3步:

  1. 创建执行函数

  2. 创建threading.Thread对象

        

def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        """This constructor should always be called with keyword arguments. Arguments are:

        *group* should be None; reserved for future extension when a ThreadGroup # 为了将来拓展保留的
        class is implemented.

        *target* is the callable object to be invoked by the run()
        method. Defaults to None, meaning nothing is called. # 一个可调用程序,在线程启动后执行

        *name* is the thread name. By default, a unique name is constructed of
        the form "Thread-N" where N is a small decimal number. # 线程的名字,默认值为“Thread-N“,N是一个数字

        *args* is the argument tuple for the target invocation. Defaults to (). # 参数args表示调用target时的参数列表

        *kwargs* is a dictionary of keyword arguments for the target
        invocation. Defaults to {}. # 参数kwargs表示调用target时的关键字参数。

        If a subclass overrides the constructor, it must make sure to invoke
        the base class constructor (Thread.__init__()) before doing anything
        else to the thread.

        """

 

      3. 启动线程

 

第二种:

import threading
class MyThread(threading.Thread):
    def __init__(self,func,args):
        self.func = func
        self.args = args
        super(MyThread,self).__init__()  #执行父类的构造方法

    def run(self):
        self.func(self.args)

def f2(arg):
    print(arg)

obj = MyThread(f2,123)
obj.start()

多线程的执行方法:

import time
from threading import Thread

def do_thread(num):
    print('this is thread %s' %str(num))
    time.sleep(3)

for i in range(5):
    t = Thread(target=do_thread,args=(i,))
    t.start()

以上方法就开启了一个5个线程,target用来定义开启线程后要执行的方法,args为参数

 线程的其它方法:

1 setName(), getName()

 setName():   给线程设置一个名字
  getName():   获取线程的名称

import time
from threading import Thread
def do_thread(num):
    print('this is thread %s' %str(num))
    time.sleep(2)
for i in range(2):
    t = Thread(target=do_thread,args=(i,))
    t.start()
    t.setName('Mythread_{0}'.format(str(i)))
    print(t.getName())

out:

this is thread 0
Mythread_0
this is thread 1
Mythread_1

2 setDaemon()

setDaemon(True/False): 设置创建的子线程为前台线程或后台线程.设置为True则子线程为后台线程。线程默认为前台线程(不设置此方法)
前台线程: 当子线程创建完成后,主线程和子线程(前台线程)同时运行,如果主线程执行完成,而子线程还未完成则等待子线程执行完成以后整个程序才结束。
后台线程: 当子线程创建完成后,如果子线程还未结束,而主线程运行结束则不管子线程了,程序就结束。
此方法设置必须在 start() 方法前进行设置, 看代码:

import time
from threading import Thread
def do_thread(num):
    print('this is thread %s' %str(num))
    time.sleep(3)
    print('OK',str(num))

for i in range(2):
    t = Thread(target=do_thread,args=(i,))
    #不设置此方法默认为前台线程
    #t.setDaemon(True)
    t.setName('Mythread_{0}'.format(str(i)))
    t.start()
    print(t.getName())

out:

this is thread 0
Mythread_0
this is thread 1
Mythread_1
OK 1
OK 0

后台线程:

import time
from threading import Thread
def do_thread(num):
    print('this is thread %s' %str(num))
    time.sleep(3)
    #执行到此时主线程执行完成了,程序结束,下面的代码不会执行
    print('OK',str(num))

for i in range(2):
    t = Thread(target=do_thread,args=(i,))
    #设置线程为后台线程
    t.setDaemon(True)
    t.setName('Mythread_{0}'.format(str(i)))
    t.start()
    print(t.getName())

out:

this is thread 0
Mythread_0
this is thread 1
Mythread_1

 3 join()
join(timeout) : 多线程的 wait(),当主线程执行 子线程.join() 方法后,主线程将等待子线程执行完再接着执行。当加上timeout参数后,如果超过timeout时间不管子线程有没有执行完都将结束等待
看下面两个例子

 

import time
from threading import Thread

def do_thread(num):
    time.sleep(3)
    print("this is thread %s" % str(num))


for i in range(2):
    t = Thread(target=do_thread, args=(i,))
    t.setName("Mythread_{0}".format(str(i)))
    t.start()
    print("print in main thread: thread name:", t.getName())

out:

print in main thread: thread name: Mythread_0
print in main thread: thread name: Mythread_1
this is thread 0
this is thread 1

 上面无join方法时,主线程执行完print,等待子线程函数中的print执行完成,这个程序退出。 下面我们看看加上join方法后的效果

import time
from threading import Thread

def do_thread(num):
    time.sleep(3)
    print("this is thread %s" % str(num))


for i in range(2):
    t = Thread(target=do_thread, args=(i,))
    t.setName("Mythread_{0}".format(str(i)))
    t.start()
    t.join()
    print("print in main thread: thread name:", t.getName())

out:

this is thread 0
print in main thread: thread name: Mythread_0
this is thread 1
print in main thread: thread name: Mythread_1

当程序运行到join后,将等待子程序执行完成,然后才向下执行。这样真个程序就变成一个单线程的顺序执行了。多线程就没什么鸟用了。

join()与setDaemon()都是等待子线程结束,有什么区别呢:
当执行join()后主线程就停了,直到子线程完成后才开始接着主线程执行,整个程序是线性的
setDaemon() 为前台线程时,所有的线程都在同时运行,主线程也在运行。只不过是主线程运行完以后等待所有子线程结束。这个还是一个并行的执行,执行效率肯定要高于join()方法的。

线程锁

线程锁分类:互斥锁,信号量,条件变量,事件

线程是内存共享的,当多个线程对内存中的同一个公共变量进行操作时,会导致线程争抢的问题,为了解决此问题,可以使用线程锁。

假设这样一种情况,有一个全局的计数num,每个线程获取这个全局的计数,根据num进行一些处理,然后将num加1

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Auther: pangguoping

import threading
import time

NUM = 0
class MyThread(threading.Thread):
    def run(self):
        global NUM
        NUM += 1
        time.sleep(0.5)
        msg = self.name + ' set num to ' + str(NUM)
        print(msg)
if __name__ == '__main__':
    for i in range(5):
        t = MyThread()
        t.start()

out:

Thread-4 set num to 5
Thread-1 set num to 5
Thread-5 set num to 5
Thread-2 set num to 5
Thread-3 set num to 5

      问题产生的原因就是没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。因此有了线程锁的概念。

  线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。 互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将 资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

我们现在引入锁,上面的代码修改为:

互斥锁:

 

import threading
import time

NUM = 0
class MyThread(threading.Thread):
    def run(self,l):
        #加锁
        l.acquire()
        global NUM
        NUM += 1
        time.sleep(0.5)
        msg = self.name + ' set num to ' + str(NUM)
        #解锁
        l.release()
        print(msg)
if __name__ == '__main__':
    lock = threading.Lock()
    for i in range(5):
        t = MyThread()
        t.run(lock)

 

out:

Thread-1 set num to 1
Thread-2 set num to 2
Thread-3 set num to 3
Thread-4 set num to 4
Thread-5 set num to 5

可以看出,加了线程锁之后,尝试任何次数的执行,最终结果都是一样的,消除了线程不安全的隐患。过程是这样的:我们先建立了一个 threading.Lock类对象lock,在run方法里,我们使用lock.acquire()获得了这个锁。此时,其他的线程就无法再获得该锁 了,他们就会阻塞在“l.acquire()”这里,直到锁被另一个线程释放l.release()。

上边的锁机制,叫做“互斥锁”。互斥锁的特点:同一时间仅允许一个线程进行更改数据。如果同一时间有多个线程更改数据的话,我们可以使用 信号量 来解决这个问题

信号量:

信号量(Semaphore)是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

Semaphore信号量管理一个内置的计数器:
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

import threading
import time

def run(n):
    semaphore.acquire()
    time.sleep(1)
    print('run the thread: %s' %n)
    semaphore.release()

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()

执行之后会发现,每个1秒,会有5个线程执行,有print的操作。

import threading
import time

def do():
    semaphro.acquire()
    print('this is {0} set the semaphore'.format(threading.current_thread().getName()))
    time.sleep(2)
    semaphro.release()
    print("\033[1;30mthi is {0} release the semaphore\033[0m".format(threading.current_thread().getName()))

if __name__ == '__main__':
    semaphro = threading.Semaphore(2)
    for i in range(10):
        t = threading.Thread(target=do)
        t.setName("Thread_{0}".format(str(i)))
        t.start()
    print('finished')

上例中,虽然创建了10个线程,但同时只有2个线程在运行,就是因为在线程中通过Semaphore设置了2个信号量。只有其中一个释放后另其它的线程再能开始执行

out:

this is Thread_0 set the semaphore
this is Thread_1 set the semaphore
finished
thi is Thread_0 release the semaphore
thi is Thread_1 release the semaphore
this is Thread_3 set the semaphore
this is Thread_2 set the semaphore
thi is Thread_3 release the semaphore
this is Thread_4 set the semaphore
thi is Thread_2 release the semaphore
this is Thread_5 set the semaphore
thi is Thread_4 release the semaphore
thi is Thread_5 release the semaphore
this is Thread_7 set the semaphore
this is Thread_6 set the semaphore
this is Thread_8 set the semaphore
thi is Thread_6 release the semaphore
thi is Thread_7 release the semaphore
this is Thread_9 set the semaphore
thi is Thread_8 release the semaphore
thi is Thread_9 release the semaphore

条件变量:

互斥锁是最简单的线程同步机制,Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量, 除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一 些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会 重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

  可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition 对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用 notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

  Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock

除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部 锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。

  假设有这样一种场景:有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交互产品。生产者的”策略“是如果市场 上剩余的产品少于1000个,那么就生产100个产品放到市场上;而消费者的”策略“是如果市场上剩余产品的数量多余100个,那么就消费3个产品。用 Condition解决生产者与消费者问题的代码如下:

  参数说明: con.acquire和con.wait   通常在加锁的时候配合使用,表示在这里等待     

con.acquire() # 这三个必须放一起,是固定用法,通常用于解锁的时候
con.notify(num) # 表示一次放多少个线程过去
con.release()

第一种方式:

import threading
import time

class Producer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():
                if count > 1000:
                    con.wait()
                else:
                    count = count+100
                    msg = self.name+' produce 100, count=' + str(count)
                    print(msg)
                    con.notify()
                con.release()
                time.sleep(1)

class Consumer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():
                if count < 100:
                    con.wait()
                else:
                    count = count-3
                    msg = self.name+' consume 3, count='+str(count)
                    print(msg)
                    con.notify()
                con.release()
                time.sleep(1)

count = 500
con = threading.Condition()

def test():
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()
if __name__ == '__main__':
    test()


out:
Thread-1 produce 100, count=600
Thread-2 produce 100, count=700
Thread-3 consume 3, count=697
Thread-4 consume 3, count=694
Thread-5 consume 3, count=691
Thread-6 consume 3, count=688
Thread-7 consume 3, count=685
Thread-1 produce 100, count=785
Thread-3 consume 3, count=782
Thread-2 produce 100, count=882
Thread-4 consume 3, count=879
Thread-5 consume 3, count=876
Thread-6 consume 3, count=873
Thread-7 consume 3, count=870
Thread-4 consume 3, count=867
Thread-1 produce 100, count=967
Thread-5 consume 3, count=964
Thread-3 consume 3, count=961
Thread-2 produce 100, count=1061
Thread-6 consume 3, count=1058
Thread-7 consume 3, count=1055
Thread-3 consume 3, count=1052
Thread-4 consume 3, count=1049
Thread-5 consume 3, count=1046
Thread-7 consume 3, count=1043
Thread-6 consume 3, count=1040
Thread-3 consume 3, count=1037
Thread-4 consume 3, count=1034
Thread-5 consume 3, count=1031
Thread-7 consume 3, count=1028
Thread-6 consume 3, count=1025
Thread-3 consume 3, count=1022
Thread-4 consume 3, count=1019
Thread-5 consume 3, count=1016
Thread-7 consume 3, count=1013
condition解决同步问题案例代码

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-08-06
  • 2021-10-24
  • 2021-12-24
  • 2022-12-23
猜你喜欢
  • 2021-12-30
  • 2021-05-19
  • 2021-11-06
  • 2021-10-19
  • 2021-11-25
  • 2021-09-03
相关资源
相似解决方案