辅助机制
一、线程辅助
一.信号量(Semaphore):
首先,它也是一把锁。和 RLock()类似的是它内部也是维护这多把锁,但区别在于它是并行的锁,而不是像递归锁那样嵌套的。假设现在的锁的数量
count = 4 那么此时开辟一百个线程只有 4 个线程可以进来。(这不是真正的并行,也有先后进入,就好似 CPU 只让它在先进入的四个线程中切换执行,
因此达到控制 100 个线程中的 4 个线程进行并发执行)一旦 count 变为 0,此时其他的线程不能在进去(CPU 只执行这四个线程的切换)。
线程不是越多越好,撑死开到一千多个了,再开下去效率反而会大打折扣
信号量用来控制线程并发数的,BoundedSemaphore 或 Semaphore 管理一个内置的计数 器,每当调用 acquire()时-1,调用 release()时+1。
计数器不能小于 0,当计数器为 0 时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用 release()。(类似于停车位的概念)
(同步锁: 同时只允许一个线程更改数据,而 Semaphore 是同时允许一定数量的线程更改数据)
BoundedSemaphore 与 Semaphore 的唯一区别在于前者将在调用 release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
数据库的连接问题:(线程池&数据库连接池)
1.传统的数据库连接:
用户访问---> 申请连接---> 查询并返回结果---> 断开连接 的这种形式。
2.利用信号量机制限制多线程(线程池)
若同时有一百个线程(即一百个访问者)进行连接数据库的请求,此时的开销对于数据库来说较大,承受不了。此时设置信号量,允许5个线程
的同时访问。此时即可缓解多线程同时访问造成的开销。
3.创建连接池再优化:
虽然利用了信号量控制了多线程的并发数。但传统的访问模式需要创建连接。假设一天10万的访问量,那么数据库服务器就要创建10万次的连接。
这极大的浪费了数据库的资源。并且容易使数据库内存溢出&宕机。
N用户--->访问数据库---->向数据库请求--->开辟N次连接
数据库连接是一种关键的有限的昂贵的资源。建立一个数据库连接池(如JAVA中用LinkedList<Connection>来存放数据库连接构成连接池)。
先向数据库开辟10个连接在连接池中存储起来,此后便一直维护这是个连接,而不是传统的向数据库申请连接、断开。
用户每次请求都向连接池发起请求集合取连接get()。每次访问完再释放回连接池中集合再put(),即可极大的减少开销。
N用户---->访问数据库----->向数据库连接池请求---->线程池中获取连接
4.数据库连接池:
负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是重新建立一个。
<1>数据库连接池在初始化时将创建一定数量的数据库连接放到连接池中,这些数据库连接的数量是由最小数据库连接数来设定的。
<2>无论这些数据库连接是否被使用,连接池都将一直保证至少拥有这么多的连接数量。
<3>连接池的最大数据库连接数量限定了这个连接池能占有的最大连接数,当应用程序向连接池请求的连接数超过最大连接数量时,这些请求
将被加入到等待队列中。
数据库连接池的最小连接数和最大连接数的设置要考虑到以下几个因素:
1.最小连接数:是连接池一直保持的数据库连接,若创建过多则系统启动较慢,但创建后的响应速度就很快,反之。
2.最大连接数:是连接池能申请的最大连接数,如果数据库连接请求超过次数,后面的数据库连接请求将被加入到等待队列中,这会影响以后
的数据库操作
3.如何确保连接池中的最小连接数呢?有动态和静态两种策略。动态即每隔一定时间就对连接池进行检测,如果发现连接数量小于最小连接数,
则补充相应数量的新连接,以保证连接池的正常运转。静态是发现空闲连接不够时再去检查。
条件变量同步(Condition):
条件变量是什么呢,还是一把锁,也是最复杂的一把锁,除了是最后一把锁以外,还能实现线程间的通信,因此是条件变量。
场景:比如一个线程要继续走,需要等待其他线程给出一个通知。(通过标识符进行一个通信)
有一类线程需要满足条件之后才能够继续执行,Python 提供了 threading.Condition 对象用于条件变量线程的支持,它除了能提供 RLock()或 Lock()的方
法外,还提供了 wait()、notify()、notifyAll()方法。
lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传入锁,对象自动创建一个RLock()。
wait():条件不满足时调用,线程会释放锁并进入等待阻塞;(进入等待池时,注意wait是有一个锁的释放的过程)
notify():条件创造后调用,通知等待池激活一个线程;(而notify不会释放锁,要配合release才行)
notifyAll():条件创造后调用,通知等待池激活所有线程。
生产者和消费者问题:
假设现在是五个生产者一个消费者,当消费者拿到锁时,判断是否还有包子,若是没包子则wait进入等待池等待并释放锁给五个生产者去做,生产者做
包子时第一个生产者做好了会告诉消费者我做好了,激活了消费者,此后释放了锁又回到了抢占的状态,其他生产者和消费者抢锁,若是消费者抢到了锁
则开始吃包子,中途判断是否有包子,没有再进入等待池,有则吃了一个便再次释放锁,每次抢锁都是五个生产者和一个消费者抢锁,然而只要消费者的
吃包子速度快于五个生产者的做包子速度,则包子数永远不对多于生产者的数量。
因此条件同步变量良好的保持了生产和消费的线程不会混乱,合理进行。
同步条件对象(Event):
先说说同步:
当你进程处于一个 io 操作的情况下,你的进程只能乖乖的等着,不能进行任何其他的操作。而异步则是我跑我的,你什么时候数据过来了,我再返回来
处理即可。两个线程按理说是同步的吗?当然不是,你走你的我走我的,互不影响。而同步则是为了让两个线程之间处于一个同步状态。
这回不是一把锁了,它的功能类似于Condition,他也可以进行线程之间的信息交换,只不过不需要锁了(用锁其实是为了保证公共数据的)若没有锁也想相
互通信,此时就需要Event,Event比Condition用的更广。
条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。Event里面有一个标志位(变量Flag)
event=threading.Event():条件环境对象,初始值 为False;
event.isSet():返回event的状态值;(返回标志位)
event.wait():如果 event.isSet()==False将阻塞线程;(就是若标志位为Flase则阻塞线程)
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;(把标志位改为True)
event.clear():恢复event的状态值为False。(把标志位改为Flase)
我在一个线程里通过一定的控制,让另一个线程走或是阻塞
加班示例:(很简单,看代码)
老板说加班,工人喊命苦,但老板要先说加班后才会触发工人命苦
线程变量利器----ThreadLocal:
在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,
而全局变量的修改必须加锁。
但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦。
用全局变量也不行,因为每个线程处理不同的对象,不能共享。
如果用一个全局 dict 存放所有的对象,然后以 thread 自身作为 key 获得线程对应的对象如何?
这种方式理论上是可行的,它最大的优点是消除了对象在每层函数中的传递问题,但是,每个函数获取对象的代码有点丑。
有没有更简单的方式?
ThreadLocal 应运而生,不用查找 dict,ThreadLocal 帮你自动做这件事:
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
执行结果:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
全局变量 local_school 就是一个 ThreadLocal 对象,每个 Thread 对它都可以读写 student 属性,但互不影响。你可以把 local_school 看成全局变量,
但每个属性如 local_school.student 都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal 内部会处理。
ThreadLocal 最常用的地方就是为每个线程绑定一个数据库连接,HTTP 请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便
地访问这些资源。
多线程利器----队列(queue):(非常重要!!!)
其实和数据结构的队列差不多。与栈不同 FIFO:先进先出。这就像一个管道
队列只在多线程的情况下才有用,因为单线程时队列若是 get 不到或是 put 超了都会阻塞,而没救了。列表则是一个线程不安全的,因为任何线程都可以对列表
进行更改,对于多线程来说不好控制(主要就是不安全)
而队列就和列表不同,无论是几个线程,当并发get()时绝对不会像list那样获取到同一个值,这是因为队列的数据结构内部自带一把锁
import queue(这里是线程队列,还有进程队列)
q = queue.Queue(maxsize = 10) # 创建一个“队列”对象
queue.Queue
类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示
队列长度无限。
q.put(item, block=True, timeout=None) # 将一个值放入队列中,超出队列长度阻塞
调用队列对象的put()方法在队尾插入一个项目。
item:为必需传入参的,插入队列的值。
block:为可选参数,默认为True。如果队列当前为空,且block为True,put()方法就使调用线程暂停,直到空出一个数据单元。
如果block为Flase,put方法将打印Full异常。
timeout:可选参数,阻塞时间
q.get(block=True, timeout=None) # 将一个值从队列中取出,超出获取范围阻塞
调用队列对象的get()方法从队头删除并返回一个项目。
block:默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
timeout:可选参数,等待时间
Python queue模块有三种队列及构造函数:
1、FIFO队列先进先出。 class queue.Queue(maxsize)
2、LIFO类似于栈,即先进后出。 class queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)
此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False。 q.full 与 maxsize 大小对应
q.get_nowait() 相当q.get(False),即get时为空则报错
q.put_nowait(item) 相当q.put(item, False),即put时为满则报错
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作。接收task_done()发送来的信号,后面的才会继续执行
生产者消费者-队列模型:
此时则无需 condition 条件同步变量来控制资源为空时的线程挂起。而是 get 空、put 满时自动挂起。
concurrent.futures -- 启动并行任务
concurrent.futures
模块提供异步执行回调高层接口。
使用 线程池ThreadPoolExecutor
,进程池ProcessPoolExecutor
来实现异步操作,两者都是抽像类 Executor
的实现。
创建线程池
ThreadPoolExecutor
def func(value):
time.sleep(1)
print("value:", value)
return 111
def main():
pool = ThreadPoolExecutor(max_workers=5)
for i in range(10):
# 将函数放在池里面并返回一个future对象
fut = pool.submit(func, i)
print("future:", fut, type(fut))
main()