当 asyncio.Lock 遇上多线程:一个看似简单却三次修错的并发 Bug
在开发 MiQi Desktop一个多通道 AI Agent 桌面应用时我们遇到了一个经典的并发问题两个对话同时进行只有一个能收到回复。排查和修复这个 bug 的过程暴露了 Python asyncio 在多线程场景下一个容易被忽视的陷阱。现象用户反馈“同时对话两个只有一个有回答”日志显示Failed to activate sandbox for desktop:1781234544206: asyncio.locks.Lock object at 0x000002B54A5F7830 [unlocked, waiters:1] is bound to a different event loop一个对话正常响应另一个卡死不动。架构背景MiQi Desktop 的 Bridge 进程通过 stdin/stdout 与 Electron 前端通信。每当用户发送一条消息Bridge 的处理流程是defhandle_chat_send(req_id,params):def_run_in_thread():asyncdef_run():agentbuild_agent(session_key)resultawaitagent.process_direct(content)send_result(result)asyncio.run(_run())# 每次创建新的 event loop# 每条消息启一个新的 daemon threadtthreading.Thread(target_run_in_thread,daemonTrue)t.start()关键设计决策每条消息 一个新 thread 一个新 event loopSandboxManager 是进程级单例所有消息共享同一个实例第一次修延迟创建 Lock失败最初SandboxManager.__init__在构造时就创建了asyncio.Lock()classSandboxManager:def__init__(self):self._lockasyncio.Lock()# 构造时创建我以为问题是构造时没有 event loop于是改成延迟创建classSandboxManager:def__init__(self):self._lockNone# 延迟asyncdefinitialize(self):ifself._lockisNone:self._lockasyncio.Lock()# 第一次 await 时创建测试单条消息发送 → 通过。结果用户再次报错。原因很明显——第一条消息把 Lock 绑定到了 Loop A第二条消息从 Loop B 调用时还是报错。教训只测了串行场景没测并发。第二次修检测 Loop 变化并重建 Lock失败既然问题是Lock 绑错了 Loop那我每次检测当前 Loop 是否跟上次一样不一样就重建asyncdefinitialize(self):current_loopasyncio.get_running_loop()ifself._lockisNoneorself._lock_loopisnotcurrent_loop:self._lockasyncio.Lock()# 重建self._lock_loopcurrent_loop测试两个串行的asyncio.run()调用 → 通过。结果并发时直接挂死。# 真实并发场景挂死defreq1():asyncio.run(...)# thread1, loop Adefreq2():asyncio.run(...)# thread2, loop Bt1.start();t2.start()# 同时跑t1.join();t2.join()# 永远等不到根因分析时间线 t0 thread1: initialize() → 创建 lock_A绑到 loop_A t1 thread2: initialize() → 检测到 loop 不同 → 重建 lock_B绑到 loop_B ↑ 此时 self._lock 从 lock_A 变成了 lock_B t2 thread1: async with self._lock: ← 读到的是 lock_Bloop_B 的锁 → bound to a different event loop 或死等thread2 的重建操作踩掉了thread1 正在使用的锁对象。这是一个经典的 TOCTOUTime-of-check to time-of-use竞态。教训并发场景下检测重建不是原子操作两个 thread 对同一个self._lock字段的读写本身就需要同步用 asyncio 的工具来修 asyncio 的跨线程问题本质上是死局第三次修用 threading.Lock 替代 asyncio.Lock成功回到本质我们需要的是什么保护_sandboxes字典的并发读写微秒级操作允许多个 thread 的 event loop 同时调用 manager 的方法不阻塞 event loop 太久threading.Lock完美满足这三个需求importthreadingclassSandboxManager:def__init__(self):self._lockthreading.Lock()# 无 loop 亲和性self._creating:set[str]set()# 防重复创建asyncdefget_or_create(self,session_key):# 快路径已存在直接返回微秒级锁withself._lock:ifsession_keyinself._sandboxes:returnself._sandboxes[session_key]ifsession_keyinself._creating:returnNone# 另一个 thread 正在创建self._creating.add(session_key)# 慢路径创建 sandbox锁外执行不阻塞其他 threadsandboxBwrapSandbox(session_keysession_key,...)try:awaitsandbox.start()# 可能耗时数秒withself._lock:self._sandboxes[session_key]sandbox self._creating.discard(session_key)returnsandboxexceptException:withself._lock:self._creating.discard(session_key)returnNone设计要点threading.Lock不绑定任何 event loop——哪个 thread 都能用临界区极短——只保护字典操作读写_sandboxes微秒级慢操作在锁外——sandbox.start()是异步操作启动 WSL 子进程放在锁外执行_creating集合防重复——两个 thread 同时对同一个 session 创建 sandbox 时第二个直接返回 None“在 async 代码里用 threading.Lock 不是大忌吗”是的asyncio 社区通常不推荐在协程里使用threading.Lock因为它会阻塞 event loop。但这里有几个关键前提临界区只有字典读写——耗时在微秒级event loop 感知不到每个 thread 有自己独立的 event loop——不存在一个 thread 持锁同 loop 上的其他 task 被阻塞的问题真正的慢操作subprocess、网络在锁外——锁内绝不await如果你的场景是单线程 单 event loop 多个协程并发那asyncio.Lock是正确选择。但我们的场景是多线程 多 event loop 共享对象——这本质上是一个线程安全问题应该用线程安全的工具。并发测试验证importasyncio,threading,timefrommiqi.sandbox.managerimportSandboxManager# 共享单例模拟 BridgeState._sandbox_managermgrSandboxManager(workspacePath(/tmp/test))defreq1():asyncdefrun():awaitmgr.initialize()sandboxawaitmgr.activate(session_A)assertsandboxisnotNoneasyncio.run(run())defreq2():time.sleep(0.05)asyncdefrun():awaitmgr.initialize()sandboxawaitmgr.activate(session_B)assertsandboxisnotNoneasyncio.run(run())t1threading.Thread(targetreq1)t2threading.Thread(targetreq2)t1.start();t2.start()t1.join(timeout30);t2.join(timeout30)# 不再挂死两个 session 都成功创建总结asyncio.Lockthreading.Lock适用场景单 loop 内多协程并发多线程含多 loop并发Loop 亲和性有绑定到创建时的 loop无阻塞行为挂起当前协程不阻塞 loop阻塞当前线程跨 thread 安全不安全安全关键教训asyncio.run()创建新 event loop。如果你的代码路径会被多次asyncio.run()调用每次新 loopasyncio 原语Lock、Queue、Event、Semaphore都不能跨调用共享。“测试通过不等于正确”。串行通过不代表并发安全。如果你的代码会被多线程调用必须写并发测试。asyncio 和 threading 不是互斥的。在多线程 每线程独立 loop的架构下线程间的共享状态应该用threading.Lockloop 内的协程协作才用asyncio.Lock。把慢操作移到锁外。无论用哪种锁临界区越短越好。检查 dict → 创建对象 → 启动子进程 → 写回 dict这种长临界区是并发问题的温床。正确做法锁内只做 dict 操作subprocess 启动放在锁外。