admin管理员组

文章数量:1547451

第九节:多线程与多进程
特注:本节9.1部分由卢钧轶(cenalulu)发布在GitHub,在此基础进行了改进展示
9.1.GIL(全局解释器锁)是什么?
文章欢迎转载,但转载时请保留本段文字,并置于文章的顶部 作者:卢钧轶(cenalulu) 本文原文地址:http://cenalulu.github.io/python/gil-in-python/

全局解释器锁(英语: GlobalInterpreter Lock,缩写GIL)是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行即便在多核心处理器上,使用GIL的解释器也只允许同一时间执行一个线程。常见的使用GIL的解释器有CPython与Ruby MRI。

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。
然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
普及知识:
• 所谓IO密集型任务,是指磁盘IO、网络IO占主要的任务,计算量很小。比如请求网页、读写文件等。当然我们在Python中可以利用sleep达到IO密集型任务的目的。
• 所谓计算密集型任务,是指CPU计算占主要的任务,CPU一直处于满负荷状态。比如在一个很大的列表中查找元素(当然这不合理),复杂的加减乘除等。
1)为什么会有GIL?
由于物理上得限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据一致性和状态同步的困难。即使在CPU内部的Cache也不例外,为了有效解决多份缓存之间的数据同步时各厂商花费了不少心思,也不可避免的带来了一定的性能损失。
Python当然也逃不开,为了利用多核,Python开始支持多线程。而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操作)。
慢慢的这种实现方式被发现是没用且低效的。但当大家试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。有多难?做个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分成各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,并且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更何况Python这样核心开发和代码贡献者高度社区化的团队呢?
所以简单的说GIL的存在更多的是历史原因。如果推到重来,多线程的问题依然还是要面对,但是至少会比目前GIL这种方式会更优雅。

2).GIL的影响
从上文的介绍和官方的定义来看,GIL无疑就是一把全局排他锁。毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。 那么读者就会说了,全局锁只要释放的勤快效率也不会差啊。只要在进行耗时的IO操作的时候,能释放GIL,这样也还是可以提升运行效率的嘛。或者说再差也不会比单线程的效率差吧。理论上是这样,而实际上呢?Python比你想的更糟。
下面我们就对比下Python在多线程和单线程下得效率对比。测试方法很简单,一个循环1亿次的计数器函数。一个通过单线程执行两次,一个多线程执行。最后比较执行总时间。测试环境为双核的Mac pro。注:为了减少线程库本身性能损耗对测试结果带来的影响,这里单线程的代码同样使用了线程。只是顺序的执行两次,模拟单线程。

	顺序执行的单线程(single_thread.py)
#! /usr/bin/python
 
from threading import Thread
import time
 
def my_counter():
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True
 
def main():
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = Thread(target=my_counter)
        t.start()
        t.join()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))
 
if __name__ == '__main__':
    main()
•	同时执行的两个并发线程(multi_thread.py)
#! /usr/bin/python
 
from threading import Thread
import time
 
def my_counter():
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True
 
def main():
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = Thread(target=my_counter)
        t.start()
        thread_array[tid] = t
    for i in range(2):
        thread_array[i].join()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))
 
if __name__ == '__main__':
    main()

线程并发并不如单线程顺序执行快,这是得不偿失的,造成这个情况的原因就是GIL,这里是计算密集型,所以不适用
3).当前GIL设计的缺陷
 基于pcode数量的调度方式
按照Python社区的想法,操作系统本身的线程调度已经非常成熟稳定了,没有必要自己搞一套。所以Python的线程就是C语言的一个pthread,并通过操作系统调度算法进行调度(例如linux是CFS)。为了让各个线程能够平均利用CPU时间,python会计算当前已执行的微代码数量,达到一定阈值后就强制释放GIL。而这时也会触发一次操作系统的线程调度(当然是否真正进行上下文切换由操作系统自主决定)。
伪代码
while True:
acquire GIL
for i in 1000:
do something
release GIL
/* Give Operating System a chance to do thread scheduling */
这种模式在只有一个CPU核心的情况下毫无问题。任何一个线程被唤起时都能成功获得到GIL(因为只有释放了GIL才会引发线程调度)。但当CPU有多个核心的时候,问题就来了。从伪代码可以看到,从release GIL到acquire GIL之间几乎是没有间隙的。所以当其他在其他核心上的线程被唤醒时,大部分情况下主线程已经又再一次获取到GIL了。这个时候被唤醒执行的线程只能白白的浪费CPU时间,看着另一个线程拿着GIL欢快的执行着。然后达到切换时间后进入待调度状态,再被唤醒,再等待,以此往复恶性循环。
PS:当然这种实现方式是原始而丑陋的,Python的每个版本中也在逐渐改进GIL和线程调度之间的互动关系。例如先尝试持有GIL在做线程上下文切换,在IO等待时释放GIL等尝试。但是无法改变的是GIL的存在使得操作系统线程调度的这个本来就昂贵的操作变得更奢侈了。关于GIL影响的扩展阅读
为了直观的理解GIL对于多线程带来的性能影响,这里直接借用的一张测试结果图(见下图)。图中表示的是两个线程在双核CPU上得执行情况。两个线程均为CPU密集型运算线程。绿色部分表示该线程在运行,且在执行有用的计算,红色部分为线程被调度唤醒,但是无法获取GIL导致无法进行有效运算等待的时间。

		多线程并发计算密集型一个线程大多数时间都在堵塞

由图可见,GIL的存在导致多线程无法很好的立即多核CPU的并发处理能力。
那么Python的IO密集型线程能否从多线程中受益呢?我们来看下面这张测试结果。颜色代表的含义和上图一致。白色部分表示IO线程处于等待。可见,当IO线程收到数据包引起终端切换后,仍然由于一个CPU密集型线程的存在,导致无法获取GIL锁,从而进行无尽的循环等待。

		         多线程在IO密集的时候有不错的性能

简单的总结下就是:Python的多线程在多核CPU上,只对于IO密集型计算产生正面效果;而当有至少有一个CPU密集型线程存在,那么多线程效率会由于GIL而大幅下降。

4)如何避免受到GIL的影响?
说了那么多,如果不说解决方案就仅仅是个科普帖,然并卵。GIL这么烂,有没有办法绕过呢?我们来看看有哪些现成的方案。
 用multiprocessing替代Thread(多进程代替多线程)

优点:多进程代替多线程,它使用了多进程而不是多线程。
每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。
缺点:多进程的创建和销毁开销也会更大,成本高
进程间无法看到对方数据,需要使用栈或者队列进行获取,
编程复杂度提升。
multiprocessing库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。
当然multiprocessing也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocessing由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。
 用其他解析器
之前也提到了既然GIL只是CPython的产物,那么其他解析器是不是更好呢?没错,像JPython和IronPython这样的解析器由于实现语言的特性,他们不需要GIL的帮助。然而由于用了Java/C#用于解析器实现,他们也失去了利用社区众多C语言模块有用特性的机会。所以这些解析器也因此一直都比较小众。毕竟功能和性能大家在初期都会选择前者,Done is better than perfect。

当然Python社区也在非常努力的不断改进GIL,甚至是尝试去除GIL。并在各个小版本中有了不少的进步。有兴趣的读者可以扩展阅读这个Slide 另一个改进Reworking the GIL
将切换颗粒度从基于opcode计数改成基于时间片计数
避免最近一次释放GIL锁的线程再次被立即调度
新增线程优先级功能(高优先级线程可以迫使其他线程释放所持有的GIL锁)
总结
Python GIL其实是功能和性能之间权衡后的产物,它尤其存在的合理性,也有较难改变的客观因素。从本分的分析中,我们可以做以下一些简单的总结:
因为GIL的存在,只有IO Bound场景下得多线程会得到较好的性能
如果对并行计算性能较高的程序可以考虑把核心部分也成C模块,或者索性用其他语言实现
GIL在较长一段时间内将会继续存在,但是会不断对其进行改进

9.2.多线程.守护线程.非守护线程
1)非守护线程,
主线程会跳过创建的线程接着执行,直到创建的线程运行完毕,程序才结束

代码如下:

# f非守护线程
import threading
import time

def start(i):
	time.sleep(i)
	print(threading.current_thread().name)   #打印名字
	print(threading.current_thread().isAlive) #是否活着
	print(threading.current_thread().ident)   #编号

print("start")
t=threading.Thread(target=start,name="thread",args=(5,)) 
 #target 目标运行函数目标  name名字 如果不写就申请个自定义pid编号 args 可以传参
t.start()  #必须start才运行
print("stop")

结果:
start
stop
thread
<bound method Thread.is_alive of <Thread(thread, started 22192)>>
22192


2)非守护线程,
守护线程会伴随着主线程一起结束
代码和守护基本一样,只不过要在start之前加上t.setDaemon(True)即可
代码如下:
# 守护线程
import threading
import time

def start(i):
	time.sleep(i)
	print(threading.current_thread().name)   #打印名字
	print(threading.current_thread().isAlive) #是否活着
	print(threading.current_thread().ident)   #编号

print("start")
t=threading.Thread(target=start,name="thread",args=(5,)) 
 #target 目标运行函数目标  name名字 如果不写就申请个自定义pid编号 args 可以传参
t.setDaemon(True)  #守护
t.start()  #必须start才运行
print("stop")

结果:
start
stop

9.3进程不安全(lock锁)
我们先运行下如下的代码:

import threading
import time

number = 0

def addNumber():    #进程+1
	global number
	for i in range(1000000):
		number+=1

def dowmNumber():   #进程-1
	global number
	for i in range(1000000):
		number-=1

print("start")
t1 = threading.Thread(target=addNumber)
t2 = threading.Thread(target=dowmNumber)
t1.start()
t2.start()

t1.join()
t2.join()
#join是指阻塞在这里,知道我们的阻塞线程执行完毕才会向后执行
print(number)
print("end")

结果:

start
-372750
End

其结果显示是不固定的乱码数字,按道理讲如果我们正常赋值计算的话得到应该是0,而结果显然不是这样,
这是因为赋值操作一共两步:

计算                                   赋值
正常操作            		              本次操作
number=0 					 				 number=0
a=number+1 0+1=1							 a=number+1 0+1=1		  
number=a                  				 b=number-1 0-1=-1
b=number-1 1-1=0           				 number=b

number=b									 number=a
number=0									 number= 1
正确.                                   错误

这个时候为了得到正确的结果,就要引用lock锁的概念了,我们可以更改代码如下,也就是加入lock锁,让其能够有足够的时间循序完成计算和赋值操作

#进程不安全有lock锁
import threading
import time

lock = threading.Lock()
number = 0

def addNumber():    #进程+1
	global number
	for i in range(1000000):
		lock.acquire()
		number+=1
		#让其能够循序完成计算赋值操作
		lock.release()

def dowmNumber():   #进程-1
	global number
	for i in range(1000000):
		lock.acquire()
		number-=1
		lock.release()

print("start")
t1 = threading.Thread(target=addNumber)
t2 = threading.Thread(target=dowmNumber)
t1.start()
t2.start()

t1.join()
t2.join()
#join是指阻塞在这里,知道我们的阻塞线程执行完毕才会向后执行
print(number)
print("end")

结果:

start
0
End

特别的:递归锁(用于需要更进一步高精度的操作的时候)
特点:只有拿到锁的线程才能释放锁,同一线程可以多次拿到锁
代码如下:

import threading

class Test:
	rlock=threading.RLock()
	def __init__(self):
		self.number=0

	def execute(self,n):
		with Test.rlock:
			self.number+=n

	def add(self):
		with Test.rlock:
			self.execute(1)

	def down(self):
		with Test.rlock:
			self.execute(-1)

def add(test):
	for i in range(1000000):
		test.add()

def down(test):
	for i in range(1000000):
		test.down()

if __name__ == '__main__':
	t=Test()
	t1 =threading.Thread(target=add,args=(t,))
	t2 =threading.Thread(target=down,args=(t,))
	t1.start()
	t2.start()
	t1.join()
	t2.join()
	print(t.number)
输出:
0
[Finished in 2.6s]

9.4.多进程

import multiprocessing
import time

def start(i):
	time.sleep(3)
	print(i)
	print(multiprocessing.current_process().name)
	print(multiprocessing.current_process().pid)  #进程启动描述 一个数字
	print(multiprocessing.current_process().is_alive)

if __name__ == '__main__':
	print('start')
	p = multiprocessing.Process(target=start,args=(1,))
	p.start()
#p.join()
	print("stop")
不加join
会直接执行到stop
之后等待进程结束  start
stop
1
Process-1
16136
<bound method BaseProcess.is_alive of <Process(Process-1, started)>>

加了join则会堵塞在这里:
start
1
Process-1
24228
<bound method BaseProcess.is_alive of <Process(Process-1, started)>>
stop

9.5.进程之间通信
Python多进程之间默认是无法通信的,因为是并发执行的,就比如运行在你cpu里的谷歌浏览器和其他浏览器,他们之间是相互独立的,默认是无法进行沟通的,
所以需要借助其他数据结构来实现进程之间的通信,比如队列和栈都可以实现
我们这里提供的方法就是利用队列:

#进程通信
#
import multiprocessing
import time
from multiprocessing import Queue,Process  #导入队列

def write(q):
	print("Process to write :%s" %Process.pid)
	for i in range(10):
		print("put %d to queue..." %i)
		q.put(i)

def read(q):
	print("Process to read :%s" %Process.pid)
	while True:
		value = q.get()
		print("Get %d to queue..." %value)

if __name__ == '__main__':
	#父进程创建queue,并且传给每个子进程
	q=Queue()
	pw=Process(target=write,args=(q,))
	pr=Process(target=read,args=(q,))
	#启动子进程pw,写入
	pw.start()
	#启动子进程pr,读取
	pr.start()
	#等待pw结束
	pw.join()
#说明是同内内存区不同进程
Process to read :<property object at 0x0000020FC0E2F598> 
Process to write :<property object at 0x0000028444D2F598>
put 0 to queue...
put 1 to queue...
put 2 to queue...
put 3 to queue...
Get 0 to queue...
put 4 to queue...
Get 1 to queue...
put 5 to queue...
Get 2 to queue...
Get 3 to queue...
put 6 to queue...
Get 4 to queue...
put 7 to queue...
Get 5 to queue...
put 8 to queue...
Get 6 to queue...
put 9 to queue...
Get 7 to queue...
Get 8 to queue...
Get 9 to queue...

9.6 进程池
事先创建好的,销毁创建的资源就被节省了,这样我们直接调用就可以了,
#进程池

import multiprocessing

def function_square(data):
	res=data*data
	return res

if __name__ == '__main__':
	inputs=list(range(100))
	pool=multiprocessing.Pool(processes=4) #不写参数的话会根据你电脑创建
	#1.map把任务交给进程池 100
	pool_outputs = pool.map(function_square,inputs)
	#2.apply   1
	#pool_outputs = pool.apply(function_square,args=(10,))
	pool.close()
	pool.join()
	print("pool  :",pool_outputs)

结果:

pool  : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]

9.7 线程池

Pip install threadpool

单线程30秒的任务
10线程池并发3秒完成

代码:

import threadpool
import time
#执行比较耗时的函数,需要开启多线程
def get_html(url):
	time.sleep(3)
	print(url)
#使用多线程执行telnet函数
urls = [i for i in range (100)]
pool = threadpool.ThreadPool(10)
#提交任务給线程池
requests = threadpool.makeRequests(get_html,urls)
#开始执行任务
for req in requests:
	pool.putRequest(req)

pool.wait()

有了线程池,这样原来需要300s的程序很快就可以完成了

本文标签: 爬虫多线程进程系列Python