Python29_并发编程
Python29_并发编程文章目录Python29_并发编程[toc]基本概念1 并发 vs 并行2 Python 的并发模型多线程编程1 基本使用2 线程同步3 线程间通信多进程编程1 基本使用2 进程池3 进程间通信异步IO(asyncio)1 基本概念2 基本使用3 高级特性并发编程选择指南实际应用示例1 并发下载器2 并行计算常见问题并发编程 是现代软件开发中提高程序性能的重要手段。Python提供了多种并发编程方式包括多线程、多进程和异步IO。基本概念1 并发 vs 并行并发(Concurrency): 多个任务交替执行看起来像是同时运行并行(Parallelism): 多个任务真正同时执行需要多核CPU支持2 Python 的并发模型I/O密集型任务: 适合使用多线程或异步IOCPU密集型任务: 适合使用多进程多线程编程Python通过threading模块提供线程支持但由于GIL(全局 解释器 锁)的存在多线程不适合CPU密集型任务。1 基本使用importthreadingimporttimedeftask(name):print(f任务{name}开始)time.sleep(2)# 模拟I/O操作print(f任务{name}完成)# 创建线程threads[]foriinrange(3):tthreading.Thread(targettask,args(fThread-{i},))threads.append(t)t.start()# 等待所有线程完成fortinthreads:t.join()print(所有任务完成)2 线程同步使用锁Lock/RLock# 使用锁Lock/RLockimportthreadingimporttime counter0lockthreading.Lock()defincrement():globalcounterfor_inrange(100000):withlock:# 自动获取和释放锁counter1if__name____main__:thread_num5print(f{thread_num}个线程开始时间:{time.time():.2f})threads[]for_inrange(thread_num):tthreading.Thread(targetincrement)threads.append(t)t.start()fortinthreads:t.join()print(f最终计数器值:{counter})# 应该是500000print(f{thread_num}个线程结束时间:{time.time():.2f})使用Semaphore信号量注意如果信号量不等于1还是不能解决并发导致的【不安全的数据类型操作】问题# 使用Semaphore信号量importthreadingimporttime counter0lockthreading.Lock()defincrement(thread_name):globalcounterfor_inrange(20):withsemaphore:# 自动获取和释放锁print(f{thread_name}正在使用资源 )counter1print(f{thread_name}释放资源 )if__name____main__:# 使用信号量(如果信号量不等于1还是不能解决并发导致的【不安全的数据类型操作】问题)semaphorethreading.Semaphore(3)# 最多3个线程同时访问thread_num10threads[]print(f{thread_num}个线程开始时间:{time.time():.2f})foriinrange(thread_num):tthreading.Thread(targetincrement,args(fThreadName-{i},))threads.append(t)t.start()fortinthreads:t.join()print(f最终计数器值:{counter})# 应该是500000print(f{thread_num}个线程结束时间:{time.time():.2f})3 线程间通信# 使用队列importqueuedefproducer(q):foriinrange(5):print(f生产物品{i})q.put(i)time.sleep(0.5)q.put(None)# 结束信号defconsumer(q):whileTrue:itemq.get()ifitemisNone:breakprint(f消费物品{item})time.sleep(1)qqueue.Queue()threading.Thread(targetproducer,args(q,)).start()threading.Thread(targetconsumer,args(q,)).start()多进程编程多进程可以绕过GIL限制适合CPU密集型任务但进程间通信开销较大。1 基本使用frommultiprocessingimportProcessimportosdefcpu_bound_task(n):print(f进程{os.getpid()}计算{n}的平方)returnn*nif__name____main__:processes[]foriinrange(4):pProcess(targetcpu_bound_task,args(i,))processes.append(p)p.start()forpinprocesses:p.join()2 进程池frommultiprocessingimportPooldefsquare(x):returnx*xif__name____main__:withPool(4)aspool:# 4个工作进程# map方法resultspool.map(square,range(10))print(results)# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]# apply_async方法resultpool.apply_async(square,(10,))print(result.get())# 1003 进程间通信frommultiprocessingimportProcess,Queuedefworker(q):whileTrue:itemq.get()ifitemisNone:breakprint(f处理:{item})if__name____main__:qQueue()pProcess(targetworker,args(q,))p.start()foriinrange(5):q.put(i)q.put(None)# 结束信号p.join()异步IO(asyncio)asyncio是Python3.4引入的 标准库 适合I/O密集型任务使用单线程实现高并发。1 基本概念协程(Coroutine): 使用async def定义的函数事件循环(Event Loop): 协程的调度器Future/Task: 表示异步操作的结果2 基本使用importasyncioasyncdeffetch_data(url):print(f开始获取{url})awaitasyncio.sleep(2)# 模拟I/O操作print(f完成获取{url})returnf{url}的数据asyncdefmain():# 顺序执行result1awaitfetch_data(url1)result2awaitfetch_data(url2)print(result1,result2)# 并发执行task1asyncio.create_task(fetch_data(url3))task2asyncio.create_task(fetch_data(url4))awaittask1awaittask2# 使用gatherresultsawaitasyncio.gather(fetch_data(url5),fetch_data(url6),fetch_data(url7))print(results)asyncio.run(main())3 高级特性# 超时控制asyncdefslow_operation():awaitasyncio.sleep(5)return完成asyncdefmain():try:resultawaitasyncio.wait_for(slow_operation(),timeout3.0)exceptasyncio.TimeoutError:print(操作超时)# 事件循环控制asyncdefperiodic_task():whileTrue:print(执行周期性任务)awaitasyncio.sleep(1)asyncdefmain():taskasyncio.create_task(periodic_task())awaitasyncio.sleep(5)task.cancel()try:awaittaskexceptasyncio.CancelledError:print(任务已取消)并发编程选择指南方法适用场景优点缺点多线程I/O密集型GUI应用轻量级共享内存方便受GIL限制不适合CPU密集型多进程CPU密集型任务绕过GIL利用多核内存开销大进程间通信复杂异步IO高并发I/O操作网络应用高效单线程高并发需要特殊库支持学习曲线陡峭实际应用示例1 并发下载器importaiohttpimportasyncioasyncdefdownload(url):asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresponse:contentawaitresponse.read()print(f下载{url}完成长度:{len(content)})returncontentasyncdefmain():urls[https://www.python.org,https://www.google.com,https://www.github.com]tasks[download(url)forurlinurls]awaitasyncio.gather(*tasks)asyncio.run(main())2 并行计算frommultiprocessingimportPoolimportmathdefis_prime(n):ifn2:returnFalseforiinrange(2,int(math.sqrt(n))1):ifn%i0:returnFalsereturnTrueif__name____main__:numbersrange(1000000,1000100)withPool(4)aspool:resultspool.map(is_prime,numbers)primes[nforn,primeinzip(numbers,results)ifprime]print(f找到{len(primes)}个质数)常见问题GIL限制使用多进程代替多线程处理CPU密集型任务使用C扩展释放GIL死锁问题按固定顺序获取锁使用带超时的锁资源竞争使用线程安全的数据结构尽量减少共享状态协程阻塞避免在协程中使用阻塞I/O使用专门的异步库(aiohttp, asyncpg等)通过合理选择并发模型并正确实现可以显著提高Python程序的性能特别是在处理I/O密集型或CPU密集型任务时。