Python 后端开发技术博客专栏 | 第 10 篇 asyncio 协程编程全指南 -- 从事件循环到生产实践
难度等级:高级适合读者:有 Python 基础的开发者,准备面试的中高级工程师前置知识:第 09 篇《GIL 深度解析与并发编程实战》导读上一篇文章我们深入剖析了 GIL、多线程和多进程。我们知道,对于 I/O 密集型任务,多线程可以利用 GIL 的释放实现并发,但线程的数量受限于操作系统资源,通常难以突破数百个。如果你的 Web 服务需要同时处理数千甚至数万个并发连接,多线程就力不从心了。这就是asyncio的舞台。asyncio 是 Python 标准库提供的异步 I/O 框架,基于事件循环和协程实现单线程内的高并发。一个协程仅需几百字节的内存(相比线程的 8MB 默认栈),单个线程可以轻松管理数万个并发协程。FastAPI、Starlette、aiohttp 等现代 Python Web 框架的核心都建立在 asyncio 之上。本文将系统讲解 asyncio 的核心机制:从协程的本质、事件循环的工作原理,到gather、create_task、wait、Semaphore、Queue等核心 API 的实战用法,再到异步生态和生产环境中的最佳实践。学习目标读完本文后,你将能够:理解协程的本质:async def定义的协程函数、await表达式的挂起语义掌握事件循环的工作原理:I/O 多路复用、事件驱动调度模型熟练使用 asyncio 核心 API:gather、create_task、wait、Queue、Semaphore、shield了解 Python 3.11+ 的结构化并发:TaskGroup、asyncio.timeout掌握异步编程的陷阱与最佳实践:阻塞调用检测、run_in_executor桥接、异常处理了解异步生态:aiohttp、aiofiles、aiomysql/asyncpg、aioredis的使用模式在面试中准确回答协程、事件循环、异步并发等高频问题一、协程的本质1.1 什么是协程协程(Coroutine)是一种可以暂停和恢复执行的函数。与线程由操作系统抢占式调度不同,协程由程序员通过await显式让出控制权,属于协作式调度。importasyncioimporttime# ========== 协程函数 vs 普通函数 ==========asyncdefasync_hello()-str:"""协程函数:用 async def 定义"""awaitasyncio.sleep(0.1)# 挂起点:让出控制权return"Hello from coroutine"defsync_hello()-str:"""普通函数"""return"Hello from function"# 调用协程函数不会执行,而是返回协程对象coro=async_hello()print(f"协程对象类型:{type(coro)}")# class 'coroutine'# 协程对象必须通过事件循环调度执行result=asyncio.run(async_hello())print(f"协程返回值:{result}")# Hello from coroutine关键概念:async def定义协程函数,调用后返回协程对象(不会立即执行)await是挂起点,将控制权交还给事件循环,等待右侧的可等待对象完成可等待对象(Awaitable):协程对象、asyncio.Task、asyncio.Future1.2 协程 vs 线程 vs 进程importasyncioimportthreadingimporttime# ========== 对比:协程并发 vs 线程并发 ==========asyncdefasync_io_task(task_id:int,delay:float)-str:"""协程版 I/O 任务"""awaitasyncio.sleep(delay)returnf"async-{task_id}"defsync_io_task(task_id:int,delay:float)-str:"""同步版 I/O 任务"""time.sleep(delay)returnf"sync-{task_id}"# 协程并发:单线程内调度asyncdefrun_async():start=time.perf_counter()tasks=[async_io_task(i,0.1)foriinrange(10)]results=awaitasyncio.gather(*tasks)elapsed=time.perf_counter()-startreturnelapsed,results# 线程并发defrun_threaded():start=time.perf_counter()threads=[]results=[None]*10defworker(tid,delay,idx):results[idx]=sync_io_task(tid,delay)foriinrange(10):t=threading.Thread(target=worker,args=(i,0.1,i))threads.append(t)t.start()fortinthreads:t.join()elapsed=time.perf_counter()-startreturnelapsed,results async_elapsed,_=asyncio.run(run_async())thread_elapsed,_=run_threaded()print(f"协程并发 10 个任务:{async_elapsed:.3f}s")print(f"线程并发 10 个任务:{thread_elapsed:.3f}s")# 两者耗时接近 ~0.1s,但协程的内存开销远低于线程协程 vs 线程 vs 进程对比表:特性协程(asyncio)线程(threading)进程(multiprocessing)调度方式协作式(await让出)抢占式(OS 调度)抢占式(OS 调度)内存开销极低(~几百字节/协程)较高(~8MB 栈/线程)很高(独立地址空间)并发数上限数万~数十万数百~数千~CPU 核心数GIL 影响不涉及(单线程)受限不受限数据共享简单(同一线程内)需要锁保护需要 IPC 机制适用场景高并发 I/O中等并发 I/OCPU 密集型代码风格async/await普通函数 + Thread普通函数 + Process1.3 await 的本质:挂起与恢复await表达式是协程的核心。它的语义是:挂起当前协程,将控制权交还事件循环,等待被 await 的对象完成后再恢复执行。importasyncioimporttimeasyncdefstep_1()-str:print(f" [step_1] 开始执行 t={time.perf_counter():.3f}")awaitasyncio.sleep(0.1)# 挂起 step_1,事件循环可以调度其他协程print(f" [step_1] 恢复执行 t={time.perf_counter():.3f}")return"result_1"asyncdefstep_2()-str:print(f" [step_2] 开始执行 t={time.perf_counter():.3f}")awaitasyncio.sleep(0.1)print(f" [step_2] 恢复执行 t={time.perf_counter():.3f}")return"result_2"asyncdefmain():# 串行 await:两个协程依次执行start=time.perf_counter()r1=awaitstep_1()r2=awaitstep_2()serial_time=time.perf_counter()-startprint(f"串行执行:{serial_time:.3f}s\n")# 并发 await:两个协程同时执行start=time.perf_counter()r1,r2=awaitasyncio.gather(step_1(),step_2())concurrent_time=time.perf_counter()-startprint(f"并发执行:{concurrent_time:.3f}s")asyncio.run(main())# 串行约 0.2s,并发约 0.1s底层原理:Python 协程基于生成器(Generator)的yield机制实现。await在底层相当于yield from,将协程的执行帧保存在堆上(而非线程栈上),因此切换开销极低。二、事件循环(Event Loop)机制2.1 事件循环的工作原理事件循环是 asyncio 的调度核心。它的工作流程可以简化为:while True: # 1. 检查就绪的 I/O 事件(通过 select/epoll/kqueue) # 2. 执行就绪的回调和协程 # 3. 检查定时器,执行到期的 scheduled 回调 # 4. 如果没有待处理的事件,等待新的 I/O 事件底层依赖操作系统的I/O 多路复用:Linux:epoll(高效,O(1) 事件通知)macOS:kqueueWindows:IOCP(I/O Completion Port)importasyncioimportselectorsimportsys# 查看当前事件循环使用的选择器loop=asyncio.new_event_loop()print(f"事件循环类型:{type(loop).__name__}")# Windows: ProactorEventLoop 或 SelectorEventLoop# Linux: _UnixSelectorEventLoop# 查看底层选择器print(f"默认选择器:{selectors.DefaultSelector.__name__}")# Linux: EpollSelector, macOS: KqueueSelector, Windows: SelectSelectorloop.close()2.2 事件循环的启动方式importasyncioasyncdefgreet(name:str)-str:awaitasyncio.sleep(0.01)returnf"Hello,{name}!"# ========== 方式 1:asyncio.run()(推荐,Python 3.7+) ==========# 创建事件循环 - 运行协程 - 关闭事件循环result=asyncio.run(greet("World"))print(f"asyncio.run:{result}")# ========== 方式 2:手动管理事件循环(旧版/特殊场景) ==========loop=asyncio.new_event_loop()try:result=loop.run_until_complete(greet("Python"))print(f"run_until_complete:{result}")finally:loop.close()# ========== 事件循环与线程 ==========# 核心规则:一个线程最多只有一个运行中的事件循环# asyncio.run() 会在当前线程创建并运行事件循环# 不能在已有事件循环运行的线程中再次调用 asyncio.run()asyncio.run()vsloop.run_until_complete()vsloop.run_forever():方法用途生命周期asyncio.run(coro)运行顶层入口协程自动创建/关闭循环loop.run_until_complete(coro)运行直到协程完成手动管理循环loop.run_forever()持续运行直到stop()用于服务器等长期运行场景三、核心 API 详解3.1 asyncio.create_task():创建并发任务create_task()将协程包装为Task对象并调度执行。与直接await协程不同,create_task会立即开始调度(不需要等到await)。importasyncioimporttimeasyncdeffetch_data(source:str,delay:float)-dict:"""模拟从不同数据源获取数据"""print(f" 开始获取:{source}")awaitasyncio.sleep(delay)print(f" 完成获取:{source}")return{"source":source,"data":f"data_from_{source}"}asyncdefmain():start=time.perf_counter()# create_task 立即调度,不阻塞task_db=asyncio.create_task(fetch_data("database",0.1))task_cache=asyncio.create_task(fetch_data("cache",0.05))task_api=asyncio.create_task(fetch_data("api",0.08))# 三个任务已经并发执行,await 只是等待结果db_result=awaittask_db cache_result=awaittask_cache api_result=awaittask_api elapsed=time.perf_counter()-startprint(f"\n总耗时:{elapsed:.3f}s (三个任务中最长的 ~0.1s)")print(f"结果:{[db_result['source'],cache_result['source'],api_result['source']]}")asyncio.run(main())3.2 asyncio.gather():并发执行多个协程gather()是最常用的并发工具,它并发运行多个可等待对象,并收集所有结果。importasyncioimporttimeasyncdefprocess_request(request_id:int)-dict:"""模拟处理 HTTP 请求"""delay=0.02*(request_id%5+1)awaitasyncio.sleep(delay)return{"id":request_id