事件循环

五十岚2022年10月8日大约 5 分钟

asyncio ———— 不同线程的事件循环

https://www.cnblogs.com/yanzi-meng/p/8533734.htmlopen in new window

不同线程的事件循环

1.同一线程:

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]


    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.run_forever()
    end = time.time()
    print(end-begin)

【发现问题】:

上述代码在一个线程中执行的事件循环,除非只有我们主动关闭事件 close,事件循环才会结束。否则输出完 534 结果一致会进行阻塞,等待其他协程 任务到达,那么此时不想让线程阻塞,而去完成别的工作呢?

【解决办法】:

在当前线程中创建一个事件循环(不启用,单纯获取标识),开启一个新的线程,在新的线程中启动事件循环。在当前线程依据事件循环标识, 可以向事件中添加协程对象。当前线程不会由于事件循环而阻塞了。

###2.不同线程事件循环(不涉及协程): 【场景】:

事件循环用来注册协程,协程需要动态地添加到事件循环中,然而还不想主线程因此而阻塞卡死(block),此时可以利用多线程

import asyncio,time,threading

def func1(num):
    print(num,'before---func1----')
    time.sleep(num)
    return "recv num %s"%num

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

if __name__ == "__main__":
    begin = time.time()

    new_loop = asyncio.new_event_loop() #在当前线程下创建时间循环,(未启用)
    t = threading.Thread(target=start_loop,args=(new_loop,))    #开启新的线程去启动事件循环
    t.start()

    new_loop.call_soon_threadsafe(func1,3)
    new_loop.call_soon_threadsafe(func1,2)
    new_loop.call_soon_threadsafe(func1,6)

    end = time.time()
    print(end-begin)    #当前线程未阻塞,耗时0.02800154685974121


输出结果:

3 before---func1----
0.02800154685974121
2 before---func1----
6 before---func1----

由于 func1() 中 time.sleep 操作是同步阻塞的, 因此运行完毕 所有的 func 需要大致 6 + 2 + 3 秒

loop 调用回调函数相关方法

  • loop.call_soon(callback, *args): 立即执行, call_soon 比 call_later 优先执行(非线程安全的异步 API)

  • loop.call_later(time, callback, *args): 指定时间之后再运行,执行的顺序和指定的时间有关(非线程安全的异步 API)

  • loop.call_at(loop.time() + x, callback, *args): 也是指定时间后执行,但这里的时间为 loop 里面的时间(非线程安全的异步 API)

  • loop.call_soon_threadsafe(callback, *args): 用于调度不同 OS 线程的回调函数

###3.新线程协程:run_coroutine_threadsafe

import asyncio,time,threading

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_some_work(n):
    print(f'work_waiting {n}')
    await asyncio.sleep(n)
    print(f'work_done {n}')


async def more_work(n):
    print(f'more_work {n} start')
    time.sleep(n)
    print(f'more_work {n} end')


if __name__ == "__main__":
    start = time.time()

    new_loop = asyncio.new_event_loop() # 在当前线程下创建时间循环,(未启用)

    t = threading.Thread(target=start_loop, args=(new_loop,)) # 开启新的线程去启动事件循环
    t.start()

    asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)  # 传参必须是协程对象
    asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)

    end = time.time()

    print(f'当前IO所消耗时间{start - end}秒') #当前线程未阻塞,耗时-0.0019998550415039062秒

输出结果:

work_waiting 4
当前IO所消耗时间-0.0019998550415039062秒
work_waiting 6
work_done 4
work_done 6

【描述】:

同样,主线程创建一个 new_loop, 然后在另外的子线程中开启一个无限事件循环。主线程通过 run_coroutine_threadsafe 新注册协程对象。 这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被 block。一共执行的时间大概在 6s 左右。

若要从不同的 OS 线程调度一个协程对象,应该使用 run_coroutine_threadsafe() 函数。它返回一个 concurrent.futures.Future 。

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()
  • asyncio.run_coroutine_threadsafe(coro, loop):
    • 此方法提交一个协程任务到循环中,loop 作为参数
    • 返回 Future 供查询结果
    • 当事件循环运行时, 必须在不同线程下添加协程任务到此循环中
    • 其内部也用到了 call_soon_threadsafe

##由异步爬虫引发的协程问题

在写异步爬虫时,发现很多请求莫名其妙地超时。

【现象】:解析网页耗费了太多时间,使得部分请求超过预定时间。

【根本原因】:asyncio 的协程是非抢占式的。协程如果不主动交出控制权,就会一直执行下去。假如一个协程占用了太多时间,那么其他协程就有可能超时挂掉。

模拟实验:

import asyncio
import time


async def long_calc():
    print('long calc start')
    time.sleep(3)
    print('long calc end')


async def waiting_task(i):
    print(f'waiting task {i} start')
    try:
        await asyncio.wait_for(asyncio.sleep(1), 1.5)
    except asyncio.TimeoutError:
        print(f'waiting task {i} timeout')
    else:
        print(f'waiting task {i} end')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coros = [long_calc()]
    coros.extend(waiting_task(x) for x in range(3))
    loop.run_until_complete(asyncio.wait(coros))
  • long_calc:模拟高耗时的同步处理
  • waiting_task:模拟有时间限制的异步处理,时间设为 1 秒,时限设为 1.5 秒。

运行三个异步,一个同步,结果是这样的:

waitting task 0 start
waitting task 2 start
long_calc start
long_calc end
waitting task 1 start
waiting task 0 timeout
waiting task 2 timeout
waiting task 1 end

【运行现象】:

在 long_calc 启动前的 waiting_task 超时报错,long_calc 完成后的 waiting_task 正常结束。很明显,开过多同步任务和这种情况是一样的。 说白了就是当有同步任务执行时间过长时,会把有时间限制的异步任务卡死让其未执行而超时异常。

【解决办法】:

在异步爬虫的场景来说,就是过多或过长的解析让请求超时。合理的架构设计当然能避免这个冲突,这里提供另外的解决办法。

利用新线程协程解决:

import asyncio
import time
import threading
import datetime


def print_with_time(fmt):
    print(str(datetime.datetime.now()) + ': ' + fmt)


async def long_calc():
    print_with_time('long calc start')
    time.sleep(8)
    print_with_time('long calc end')


async def waiting_task(i):
    print_with_time(f'waiting task {i} start')
    try:
        await asyncio.wait_for(asyncio.sleep(3), 5)
    except asyncio.TimeoutError:
        print_with_time(f'waiting task {i} timeout')
    else:
        print_with_time(f'waiting task {i} end')


if __name__ == '__main__':
    sub_loop = asyncio.new_event_loop()
    thread = threading.Thread(target=sub_loop.run_forever)
    thread.start()

    loop = asyncio.get_event_loop()
    task = loop.create_task(long_calc())
    futs = [asyncio.run_coroutine_threadsafe(waiting_task(x), loop=sub_loop) for x in range(3)]
    futs = [asyncio.wrap_future(f, loop=loop) for f in futs]

    loop.run_until_complete(asyncio.wait([task, *futs]))

    sub_loop.call_soon_threadsafe(sub_loop.stop)
    thread.join()
上次编辑于: 2022/10/8 01:58:06