GIL
CPython 在解释器进程级别有一把锁,叫做GIL,即全局解释器锁。 GIL 保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也只允许同时只能 有一个CPU核心上运行该进程的一个线程。
- CPython中
- IO密集型,某个线程阻塞,GIL会释放,就会调度其他就绪线程(线程因为IO阻塞,这个线程的GIL锁就会解开)
- CPU密集型,当前线程可能会连续的获得GIL,导致其它线程几乎无法使用CPU(加锁解锁是需要时间的)
- 在CPython中由于有GIL存在,IO密集型,使用多线程较为合算;CPU密集型,使用多进程,要绕开GIL
Python中绝大多数内置数据结构的读、写操作都是原子操作。
由于GIL的存在,Python的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它 们本身 不是线程安全类型。
cpython中因为GIL的存在,多线程中对CPU利用效率是低下的。IO密集型的影响并不大,CPU密集型对效率影响较大。CPU密集型可采用多进程,每个进程有一个主线程进行计算,绕开这个把锁。
但是多进程间通信的代价很高,所以要减少多进程间的通信。
多进程
python中使用multiprocessing模块实现多进程,有几个类:
Process类
进程的使用方式类似线程,都要创建一个进程实例,然后启动它。
但是这个实例必须在 main中
多进程计算:
# encoding = utf-8 __author__ = "mcabana.com" import multiprocessing import logging import datetime FORMAT = "%(asctime)s %(processName)s %(process)d: %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) def calc(): sum = 0 for _ in range(1000000000): sum += 1 logging.info('sum is {}'.format(sum)) # print(multiprocessing.current_process()) # 当前进程 if __name__ == '__main__': # 进程必须在main中 start = datetime.datetime.now() ps = [] for i in range(4): p = multiprocessing.Process(target=calc, name='calc-{}'.format(i)) # 创建一个进程 p.start() # 启动进程 ps.append(p) print('active_children', multiprocessing.active_children()) # 所有活着的进程 for p in ps: p.join() # join deltime = (datetime.datetime.now() - start).total_seconds() print(deltime)
进程属性:
名称 | 说明 |
pid | 进程id |
exitcode | 进程的退出状态码 |
terminate() | 终止指定的进程 |
进程中执行的函数的返回值目前拿不到。
因为GIL线程是交替执行的,而进程则是真正的并行。
进程间同步
Python在进程间同步提供了和线程同步类似的类,使用的方法一样,使用的效果也类似。
不过,进程间代价要高于线程间,而且系统底层实现是不同的,只不过Python屏蔽了这些不同之处,让用户简单使用多进程。
multiprocessing模块还提供共享内存、服务器进程来共享数据,还提供了用于进程间通讯的Queue队列、Pipe管道。
通信方式不同:
网络通信是进程间通信的主要方式。
- 多进程就是启动多个解释器进程,进程间通信必须序列化、反序列化
- 数据的线程安全性问题,如果每个进程中没有实现多线程,GIL可以说没什么用了
多进程多线程的选择
- 对于IO密集型
- 可使用多线程, 因为需要等待IO的原因, 可以切换到其他线程执行, 真并行并不会调高多少性能, 所以 GIL的影响并不大。
- 而且线程的开销是小于进程的, 所以使用多线程性能更优
- 对于CPU密集型
- 可使用多进程
- 多线程中因为GIL的存在, 相当于单线程执行, 多核优势无法发挥, 并且多线程还会竞争cpu锁, 所以性能不高
- 使用多进程只创建一个主线程,这样就不会参数线程锁竞争, 从而利用多核心并发的优势, 提高执行效率。
池
对于一些创建代价很高,频繁使用的,往往使用 池 来处理,这样可以重复利用,减少开销。
concurrent.futures
3.2 版本引入的模块,处理高并发问题。
提供了2个池执行器:
ThreadPoolExecutor 异步调用的线程池的Executor
ProcessPoolExecutor 异步调用的进程池的Executor
执行器可以分派任务,执行任务,结束任务。
线程池:
方法 | 含义 |
ThreadPoolExecutor(max_workers=1) | 池中至多创建max_workers个线程的池来同时异步执行,返回 Executor实例 支持上下文,进入时返回自己,退出时调用shutdown(wait=True) |
submit(fn, *args, **kwargs) | 提交执行的函数及其参数,如有空闲开启daemon线程,返回Future 类的实例。 fn就是目标函数 |
shutdown(wait=True) | 清理池,wait表示是否等待到任务线程完成 |
future类的方法,线程池中启动一个线程或进程,将其赋给future
方法 | 含义 |
done() | 判断是否执行成功或执行完成。 如果调用被成功的取消或者执行完成,返回True |
cancelled() | 如果调用被成功的取消,返回True |
running() | 如果正在运行且不能被取消,返回True |
cancel() | 尝试取消调用。如果已经执行且不能取消返回False,否则返回True,取消,结果也是done的,running态不可取消 |
result(timeout=None) | 取返回的结果,timeout为None,一直阻塞等待结果返回;timeout设置到期,抛出 concurrent.futures.TimeoutError 异常 一般结合done() 使用 |
exception(timeout=None) | 取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出 concurrent.futures.TimeoutError 异常 |
线程池的使用示例:
# encoding = utf-8 __author__ = "mcabana.com" import threading import datetime import time import logging from concurrent.futures import ThreadPoolExecutor start = datetime.datetime.now() FORMAT = "%(asctime)s %(threadName)s %(thread)d: %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) # 先定义一个线程池执行器 executor = ThreadPoolExecutor(4) # 此线程池中最多创建 4 个线程异步执行 # 如果是None 则是 os.cpu_count() * 5 或者 1 * 5 def calc(): sums = 0 logging.info('in the worker') for _ in range(1000000000): sums += 1 return threading.current_thread(), sums # 执行函数的返回值就是 future.result() f = executor.submit(calc) # 如果线程池没有达到最大数,则启动线程,如果满了就等待 # 用线程池启动, 返回一个future给f, f中包含了启动的线程的信息 # 线程池创建线程,其实就是调用的threading.Thread(), daemon=True print(type(f), f) # running状态 while True: # 因为没有阻塞主线程,所以要手动判断线程是否执行完了,多个线程要逐个判断 time.sleep(1) if f.done(): # 因为没有阻塞主线程,所以要手动判断f是否执行完了 print(f.result()) # 如果执行完,打印结果。如果没有执行完就打印,会阻塞在这 break # 此时f状态为finished # 关闭线程池执行器 executor.shutdown() # 关闭 会等待所有线程执行完 deltime = (datetime.datetime.now() - start).total_seconds() print('end ~~~~~~~~~~~~~~~~~~~~~~', deltime)
如果要启动多个线程:
# encoding = utf-8 __author__ = "mcabana.com" import threading import datetime import time import logging from concurrent.futures import ThreadPoolExecutor start = datetime.datetime.now() FORMAT = "%(asctime)s %(threadName)s %(thread)d: %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) # 先定义一个线程池执行器 executor = ThreadPoolExecutor(4) # 此线程池中最多创建 4 个线程异步执行 # 如果是None 则是 os.cpu_count() * 5 或者 1 * 5 def calc(): sums = 0 logging.info('in the worker') for _ in range(100000000): sums += 1 return threading.current_thread(), sums # 执行函数的返回值就是 future.result() fs = [] # 多个启动线程 for i in range(4): f = executor.submit(calc) fs.append(f) # 加入线程列表 # 获取多个线程的状态 while True: time.sleep(1) flag = True for f in fs: if f.done(): # 判断每个线程是否done了 logging.info(f.result()) flag = f.done() and True # 有一个线程没有done(),就会是False if flag: # 如果是False,就继续等待 break # 关闭线程池执行器 # executor.shutdown() # 关闭 会等待所有线程执行完 deltime = (datetime.datetime.now() - start).total_seconds() print('end ~~~~~~~~~~~~~~~~~~~~~~', deltime)
因为 执行器是支持上下文的,所以可以使用with管理,如下:
# encoding = utf-8 __author__ = "mcabana.com" import threading import datetime import logging from concurrent.futures import ThreadPoolExecutor start = datetime.datetime.now() FORMAT = "%(asctime)s %(threadName)s %(thread)d: %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) # 先定义一个线程池执行器 executor = ThreadPoolExecutor(4) # 此线程池中最多创建 4 个线程异步执行 # 如果是None 则是 os.cpu_count() * 5 或者 1 * 5 def calc(): sums = 0 logging.info('in the worker') for _ in range(100000000): sums += 1 return threading.current_thread(), sums # 执行函数的返回值就是 future.result() with executor: # 离开时要调用ThreadPoolExecutor.__exit__()方法,而源码中是self.shutdown(wait=True) # 要等待 各个线程执行完成。就是主线程会阻塞在这。 fs = [] # 多个启动线程 for i in range(4): f = executor.submit(calc) fs.append(f) # 加入线程列表 for f in fs: # 主线程执行到这时,所有线程肯定都执行完成了是done的,所以可以直接获取result() print(f.result()) # # 获取多个线程的状态 # while True: # time.sleep(1) # flag = True # for f in fs: # if f.done(): # 判断每个线程是否done了 # logging.info(f.result()) # # flag = f.done() and True # 有一个线程没有done(),就会是False # if flag: # 如果是False,就继续等待 # break # # 关闭线程池执行器 # executor.shutdown() # 关闭 会等待所有线程执行完 deltime = (datetime.datetime.now() - start).total_seconds() print('end ~~~~~~~~~~~~~~~~~~~~~~', deltime)
进程池:
进程池的使用方法与线程池完全相同,只需要更改下 使用的类名称即可。
其他不变,如下:
# encoding = utf-8 __author__ = "mcabana.com" import multiprocessing import datetime import logging from concurrent.futures import ProcessPoolExecutor start = datetime.datetime.now() FORMAT = "%(asctime)s %(processName)s %(process)d %(threadName)s %(thread)d: %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) def calc(): sums = 0 logging.info('in the worker') for _ in range(1000000000): sums += 1 return "{}, {}".format(multiprocessing.current_process(), sums) # 执行函数的返回值就是 future.result() if __name__ == '__main__': # 进程要写在 main 中 # 先定义一个进程池执行器 executor = ProcessPoolExecutor(4) # 此进程池中最多创建 4 个进程同步执行 # 如果是None 则是 os.cpu_count() * 5 或者 1 * 5 with executor: fs = [] # 多个启动进程 for i in range(4): f = executor.submit(calc) fs.append(f) # 加入进程列表 # 获取多个进程的执行结果 for f in fs: print(f.result()) # 关闭进程池执行器 executor.shutdown() # 关闭 会等待所有进程程执行完 delta = (datetime.datetime.now() - start).total_seconds() print('end ~~~~~~~~~~~~~~~~~~~~~~', delta)
https://www.hugbg.com/archives/2824.html
评论