概念
多线程中对共享资源的访问,需要保证“线程安全”。
另外,如果一个线程需要获取另外一个线程的状态,来判断自身的先一步动作,需要一个线程间信号的传递。
Event ***
Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。
名称 | 含义 |
set() | 标记设置为True |
clear() | 标记设置为False |
is_set() | 标记是否为True |
wait(timeout=None) | 设置等待标记为True的时长,None为无限等待。等到返回True,未等到 超时了返回False |
线程通过调用event.wait(timeout=None) 来阻塞自身,None是永久阻塞,只有Flag=True 时或者timeout超时,wait才会返回,线程继续执行。
在主线程中定义的event事件,在各工作线程中是共享的。
如下:
# encoding = utf-8 __author__ = "mcabana.com" import threading import logging FORMAT = "%(asctime)s %(threadName)s: %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) e = threading.Event() def worker(e:threading.Event): # :是类型说明,py3.5之后使用“说明”时,ide工具可以进行提示 cups = [] while not e.wait(0.5): # 使用wait阻塞等待 cups.append(1) logging.info("I'm working, make cups {}".format(len(cups))) if len(cups) >= 10: logging.info("make end") e.set() # 制作完10个,flag=True def boss(e:threading.Event): logging.info("I'm boss, waiting for you") e.wait() # 等待flag信号,等10个杯子 logging.info("Good job.") t1 = threading.Thread(target=worker, name="worker", args=(e,)) t2 = threading.Thread(target=boss, name="boss", args=(e,)) t2.start() t1.start()
worker线程任务完成后,执行event.set(),将flag设为True,而一直等待的boss线程,就可以继续执行下去。
lock **
threead.locak()可以实例化一个lock实例,lock的作用是保证某一时刻只有一个线程操作某个资源。
使用lock后,A线程访问资源时,B线程是不能访问的,此时B有两种状态:
- 第一种,B线程会被阻塞在这里,阻塞时间可以是特定时间,或者等待A线程访问完毕,释放锁资源
- 第二种,B线程不能访问,但也不受影响,继续执行其他内容
lock.acquire() 阻塞锁
线程拿到 锁后,必须要释放,如果不释放,其他线程拿锁时,就会被阻塞。如果是自己拿阻塞锁,也会被阻塞。
整个阻塞锁,执行流程如下:
拿一次锁,release() 一次。
练习:
订单要求生产1000个杯子,组织10个工人生产。请忽略老板,关注工人生成杯子
# encoding = utf-8 __author__ = "mcabana.com" # 订单要求生产1000个杯子,组织10个工人生产。请忽略老板,关注工人生成杯子 import logging import threading import time FORMAT = "%(asctime)s %(threadName)s %(thread)d: %(message)s" logging.basicConfig(format=FORMAT, level=logging.DEBUG) cups = [] lock = threading.Lock() def worker(count=1000): logging.info("in the worker") while True: lock.acquire() if len(cups) >= count: lock.release() break cups.append(1) time.sleep(0.0001) lock.release() logging.info('finishd cups {}'.format(len(cups))) for i in range(10): t1 = threading.Thread(target=worker, name='worker') t1.start()
阻塞锁锁的上下文管理
在上面的例子中,如果线程在释放锁之前,异常退出了,那这个锁就无法释放,也就形成了死锁。
# encoding = utf-8 __author__ = "mcabana.com" # 订单要求生产1000个杯子,组织10个工人生产。请忽略老板,关注工人生成杯子 import logging import threading import time FORMAT = "%(asctime)s %(threadName)s %(thread)d: %(message)s" logging.basicConfig(format=FORMAT, level=logging.DEBUG) cups = [] lock = threading.Lock() def worker(count=1000): logging.info("in the worker") while True: with lock as b: # 适用阻塞锁,lock 的__enter__() 会上锁,退出时__exit__() 释放锁 if len(cups) >= count: break cups.append(1) time.sleep(0.0001) logging.info('finishd cups {}'.format(len(cups))) for i in range(10): t1 = threading.Thread(target=worker, name='worker-{}'.format(i)) t1.start()
锁的应用场景:
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
在多线程环境中,多个线程使用阻塞锁其实在拿锁到释放锁在个过程中,是单线程的,会影响程序的执行效率。
使用锁的注意事项:
- 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
- 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
- 加锁时间越短越好,不需要就立即释放锁
- 一定要避免死锁
非阻塞锁的使用
线程不断地判断能不能获得锁。
线程依然要等待锁,另一种死锁
Queue
仅多线程使用,可以替换为kafka,rabbitMQ
包含三个类:
LifoQueue:后进先出
Queue:先进先出
PriorityQueue:优先队列,最小的先出, 小顶堆实现的。
- q.empty()
- True:为空
- False:不为空
- q.get() # 不为空,去拿,默认阻塞
发生线程切换,就被打断了。
https://www.hugbg.com/archives/2806.html
评论