事件循环
asyncio ———— 不同线程的事件循环
https://www.cnblogs.com/yanzi-meng/p/8533734.html
不同线程的事件循环
1.同一线程:
import asyncio,time
async def func1(num):
print(num,'before---func1----')
await asyncio.sleep(num)
return "recv num %s"%num
if __name__ == "__main__":
begin = time.time()
coroutine1 = func1(5)
coroutine2 = func1(3)
coroutine3 = func1(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.run_forever()
end = time.time()
print(end-begin)
【发现问题】:
上述代码在一个线程中执行的事件循环,除非只有我们主动关闭事件 close,事件循环才会结束。否则输出完 534 结果一致会进行阻塞,等待其他协程 任务到达,那么此时不想让线程阻塞,而去完成别的工作呢?
【解决办法】:
在当前线程中创建一个事件循环(不启用,单纯获取标识),开启一个新的线程,在新的线程中启动事件循环。在当前线程依据事件循环标识, 可以向事件中添加协程对象。当前线程不会由于事件循环而阻塞了。
###2.不同线程事件循环(不涉及协程): 【场景】:
事件循环用来注册协程,协程需要动态地添加到事件循环中,然而还不想主线程因此而阻塞卡死(block),此时可以利用多线程
import asyncio,time,threading
def func1(num):
print(num,'before---func1----')
time.sleep(num)
return "recv num %s"%num
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
begin = time.time()
new_loop = asyncio.new_event_loop() #在当前线程下创建时间循环,(未启用)
t = threading.Thread(target=start_loop,args=(new_loop,)) #开启新的线程去启动事件循环
t.start()
new_loop.call_soon_threadsafe(func1,3)
new_loop.call_soon_threadsafe(func1,2)
new_loop.call_soon_threadsafe(func1,6)
end = time.time()
print(end-begin) #当前线程未阻塞,耗时0.02800154685974121
输出结果:
3 before---func1----
0.02800154685974121
2 before---func1----
6 before---func1----
由于 func1() 中 time.sleep 操作是同步阻塞的, 因此运行完毕 所有的 func 需要大致 6 + 2 + 3 秒
loop 调用回调函数相关方法
loop.call_soon(callback, *args): 立即执行, call_soon 比 call_later 优先执行(非线程安全的异步 API)
loop.call_later(time, callback, *args): 指定时间之后再运行,执行的顺序和指定的时间有关(非线程安全的异步 API)
loop.call_at(loop.time() + x, callback, *args): 也是指定时间后执行,但这里的时间为 loop 里面的时间(非线程安全的异步 API)
loop.call_soon_threadsafe(callback, *args): 用于调度不同 OS 线程的回调函数
###3.新线程协程:run_coroutine_threadsafe
import asyncio,time,threading
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_some_work(n):
print(f'work_waiting {n}')
await asyncio.sleep(n)
print(f'work_done {n}')
async def more_work(n):
print(f'more_work {n} start')
time.sleep(n)
print(f'more_work {n} end')
if __name__ == "__main__":
start = time.time()
new_loop = asyncio.new_event_loop() # 在当前线程下创建时间循环,(未启用)
t = threading.Thread(target=start_loop, args=(new_loop,)) # 开启新的线程去启动事件循环
t.start()
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop) # 传参必须是协程对象
asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
end = time.time()
print(f'当前IO所消耗时间{start - end}秒') #当前线程未阻塞,耗时-0.0019998550415039062秒
输出结果:
work_waiting 4
当前IO所消耗时间-0.0019998550415039062秒
work_waiting 6
work_done 4
work_done 6
【描述】:
同样,主线程创建一个 new_loop, 然后在另外的子线程中开启一个无限事件循环。主线程通过 run_coroutine_threadsafe 新注册协程对象。 这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被 block。一共执行的时间大概在 6s 左右。
若要从不同的 OS 线程调度一个协程对象,应该使用 run_coroutine_threadsafe() 函数。它返回一个 concurrent.futures.Future 。
async def coro_func():
return await asyncio.sleep(1, 42)
# Later in another OS thread:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()
- asyncio.run_coroutine_threadsafe(coro, loop):
- 此方法提交一个协程任务到循环中,loop 作为参数
- 返回 Future 供查询结果
- 当事件循环运行时, 必须在不同线程下添加协程任务到此循环中
- 其内部也用到了 call_soon_threadsafe
##由异步爬虫引发的协程问题
在写异步爬虫时,发现很多请求莫名其妙地超时。
【现象】:解析网页耗费了太多时间,使得部分请求超过预定时间。
【根本原因】:asyncio 的协程是非抢占式的。协程如果不主动交出控制权,就会一直执行下去。假如一个协程占用了太多时间,那么其他协程就有可能超时挂掉。
模拟实验:
import asyncio
import time
async def long_calc():
print('long calc start')
time.sleep(3)
print('long calc end')
async def waiting_task(i):
print(f'waiting task {i} start')
try:
await asyncio.wait_for(asyncio.sleep(1), 1.5)
except asyncio.TimeoutError:
print(f'waiting task {i} timeout')
else:
print(f'waiting task {i} end')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coros = [long_calc()]
coros.extend(waiting_task(x) for x in range(3))
loop.run_until_complete(asyncio.wait(coros))
- long_calc:模拟高耗时的同步处理
- waiting_task:模拟有时间限制的异步处理,时间设为 1 秒,时限设为 1.5 秒。
运行三个异步,一个同步,结果是这样的:
waitting task 0 start
waitting task 2 start
long_calc start
long_calc end
waitting task 1 start
waiting task 0 timeout
waiting task 2 timeout
waiting task 1 end
【运行现象】:
在 long_calc 启动前的 waiting_task 超时报错,long_calc 完成后的 waiting_task 正常结束。很明显,开过多同步任务和这种情况是一样的。 说白了就是当有同步任务执行时间过长时,会把有时间限制的异步任务卡死让其未执行而超时异常。
【解决办法】:
在异步爬虫的场景来说,就是过多或过长的解析让请求超时。合理的架构设计当然能避免这个冲突,这里提供另外的解决办法。
利用新线程协程解决:
import asyncio
import time
import threading
import datetime
def print_with_time(fmt):
print(str(datetime.datetime.now()) + ': ' + fmt)
async def long_calc():
print_with_time('long calc start')
time.sleep(8)
print_with_time('long calc end')
async def waiting_task(i):
print_with_time(f'waiting task {i} start')
try:
await asyncio.wait_for(asyncio.sleep(3), 5)
except asyncio.TimeoutError:
print_with_time(f'waiting task {i} timeout')
else:
print_with_time(f'waiting task {i} end')
if __name__ == '__main__':
sub_loop = asyncio.new_event_loop()
thread = threading.Thread(target=sub_loop.run_forever)
thread.start()
loop = asyncio.get_event_loop()
task = loop.create_task(long_calc())
futs = [asyncio.run_coroutine_threadsafe(waiting_task(x), loop=sub_loop) for x in range(3)]
futs = [asyncio.wrap_future(f, loop=loop) for f in futs]
loop.run_until_complete(asyncio.wait([task, *futs]))
sub_loop.call_soon_threadsafe(sub_loop.stop)
thread.join()