概念
多线程中对共享资源的访问,需要保证“线程安全”。
另外,如果一个线程需要获取另外一个线程的状态,来判断自身的先一步动作,需要一个线程间信号的传递。
Event ***
Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。
| 名称 | 含义 |
| set() | 标记设置为True |
| clear() | 标记设置为False |
| is_set() | 标记是否为True |
| wait(timeout=None) | 设置等待标记为True的时长,None为无限等待。等到返回True,未等到 超时了返回False |
线程通过调用event.wait(timeout=None) 来阻塞自身,None是永久阻塞,只有Flag=True 时或者timeout超时,wait才会返回,线程继续执行。
在主线程中定义的event事件,在各工作线程中是共享的。
如下:
# encoding = utf-8
__author__ = "mcabana.com"
import threading
import logging
FORMAT = "%(asctime)s %(threadName)s: %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
e = threading.Event()
def worker(e:threading.Event): # :是类型说明,py3.5之后使用“说明”时,ide工具可以进行提示
cups = []
while not e.wait(0.5): # 使用wait阻塞等待
cups.append(1)
logging.info("I'm working, make cups {}".format(len(cups)))
if len(cups) >= 10:
logging.info("make end")
e.set() # 制作完10个,flag=True
def boss(e:threading.Event):
logging.info("I'm boss, waiting for you")
e.wait() # 等待flag信号,等10个杯子
logging.info("Good job.")
t1 = threading.Thread(target=worker, name="worker", args=(e,))
t2 = threading.Thread(target=boss, name="boss", args=(e,))
t2.start()
t1.start()

worker线程任务完成后,执行event.set(),将flag设为True,而一直等待的boss线程,就可以继续执行下去。
lock **
threead.locak()可以实例化一个lock实例,lock的作用是保证某一时刻只有一个线程操作某个资源。
使用lock后,A线程访问资源时,B线程是不能访问的,此时B有两种状态:
- 第一种,B线程会被阻塞在这里,阻塞时间可以是特定时间,或者等待A线程访问完毕,释放锁资源
- 第二种,B线程不能访问,但也不受影响,继续执行其他内容
lock.acquire() 阻塞锁
线程拿到 锁后,必须要释放,如果不释放,其他线程拿锁时,就会被阻塞。如果是自己拿阻塞锁,也会被阻塞。

整个阻塞锁,执行流程如下:

拿一次锁,release() 一次。
练习:
订单要求生产1000个杯子,组织10个工人生产。请忽略老板,关注工人生成杯子
# encoding = utf-8
__author__ = "mcabana.com"
# 订单要求生产1000个杯子,组织10个工人生产。请忽略老板,关注工人生成杯子
import logging
import threading
import time
FORMAT = "%(asctime)s %(threadName)s %(thread)d: %(message)s"
logging.basicConfig(format=FORMAT, level=logging.DEBUG)
cups = []
lock = threading.Lock()
def worker(count=1000):
logging.info("in the worker")
while True:
lock.acquire()
if len(cups) >= count:
lock.release()
break
cups.append(1)
time.sleep(0.0001)
lock.release()
logging.info('finishd cups {}'.format(len(cups)))
for i in range(10):
t1 = threading.Thread(target=worker, name='worker')
t1.start()
阻塞锁锁的上下文管理
在上面的例子中,如果线程在释放锁之前,异常退出了,那这个锁就无法释放,也就形成了死锁。
# encoding = utf-8
__author__ = "mcabana.com"
# 订单要求生产1000个杯子,组织10个工人生产。请忽略老板,关注工人生成杯子
import logging
import threading
import time
FORMAT = "%(asctime)s %(threadName)s %(thread)d: %(message)s"
logging.basicConfig(format=FORMAT, level=logging.DEBUG)
cups = []
lock = threading.Lock()
def worker(count=1000):
logging.info("in the worker")
while True:
with lock as b: # 适用阻塞锁,lock 的__enter__() 会上锁,退出时__exit__() 释放锁
if len(cups) >= count:
break
cups.append(1)
time.sleep(0.0001)
logging.info('finishd cups {}'.format(len(cups)))
for i in range(10):
t1 = threading.Thread(target=worker, name='worker-{}'.format(i))
t1.start()
锁的应用场景:
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
在多线程环境中,多个线程使用阻塞锁其实在拿锁到释放锁在个过程中,是单线程的,会影响程序的执行效率。
使用锁的注意事项:
- 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
- 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
- 加锁时间越短越好,不需要就立即释放锁
- 一定要避免死锁
非阻塞锁的使用
线程不断地判断能不能获得锁。
线程依然要等待锁,另一种死锁

Queue
仅多线程使用,可以替换为kafka,rabbitMQ
包含三个类:
LifoQueue:后进先出
Queue:先进先出
PriorityQueue:优先队列,最小的先出, 小顶堆实现的。

- q.empty()
- True:为空
- False:不为空
- q.get() # 不为空,去拿,默认阻塞
发生线程切换,就被打断了。

https://www.hugbg.com/archives/2806.html


评论