Python进程间通信与消息队列一、进程间通信概述进程间通信IPC是多进程协作的基础。Python提供了多种IPC机制- 管道Pipe- 队列Queue- 共享内存SharedMemory- 信号Signal- 套接字Socket- 内存映射文件mmap二、multiprocessing.Queuefrom multiprocessing import Process, Queueimport timeclass TaskQueue:基于多进程队列的任务分发系统def __init__(self, num_workers4):self.task_queue Queue()self.result_queue Queue()self.num_workers num_workersself.workers []def start(self):for i in range(self.num_workers):p Process(targetself._worker, args(i,))p.daemon Truep.start()self.workers.append(p)def _worker(self, worker_id):while True:task self.task_queue.get()if task is None: # 毒丸信号breaktask_id, func, args, kwargs tasktry:result func(*args, **kwargs)self.result_queue.put((task_id, success, result))except Exception as e:self.result_queue.put((task_id, error, str(e)))def submit(self, task_id, func, *args, **kwargs):self.task_queue.put((task_id, func, args, kwargs))def get_result(self, timeoutNone):return self.result_queue.get(timeouttimeout)def shutdown(self):for _ in self.workers:self.task_queue.put(None)for p in self.workers:p.join()# 使用def heavy_computation(n):return sum(i * i for i in range(n))tq TaskQueue(num_workers4)tq.start()for i in range(10):tq.submit(i, heavy_computation, 1000000)for i in range(10):task_id, status, result tq.get_result()print(f任务 {task_id}: {status} {result})tq.shutdown()三、Pipe管道from multiprocessing import Process, Pipedef sender(conn, messages):for msg in messages:conn.send(msg)time.sleep(0.1)conn.send(None) # 结束信号conn.close()def receiver(conn):while True:msg conn.recv()if msg is None:breakprint(f收到: {msg})conn.close()# Pipe返回两个连接对象parent_conn, child_conn Pipe()p1 Process(targetsender, args(parent_conn, [hello, world, python]))p2 Process(targetreceiver, args(child_conn,))p1.start()p2.start()p1.join()p2.join()四、共享内存Python 3.8from multiprocessing import shared_memory, Processimport numpy as npdef worker_with_shared_memory(shm_name, shape, dtype):工作进程访问共享内存existing_shm shared_memory.SharedMemory(nameshm_name)array np.ndarray(shape, dtypedtype, bufferexisting_shm.buf)# 直接修改共享数组array * 2existing_shm.close()# 主进程创建共享内存data np.array([1, 2, 3, 4, 5], dtypenp.int64)shm shared_memory.SharedMemory(createTrue, sizedata.nbytes)shared_array np.ndarray(data.shape, dtypedata.dtype, buffershm.buf)shared_array[:] data[:]# 启动工作进程p Process(targetworker_with_shared_memory,args(shm.name, data.shape, data.dtype))p.start()p.join()print(shared_array) # [2, 4, 6, 8, 10]# 清理shm.close()shm.unlink()五、Manager对象from multiprocessing import Manager, Processdef worker(shared_dict, shared_list, lock, worker_id):with lock:shared_dict[fworker_{worker_id}] fresult_{worker_id}shared_list.append(worker_id)# Manager提供进程安全的共享对象with Manager() as manager:shared_dict manager.dict()shared_list manager.list()lock manager.Lock()processes []for i in range(5):p Process(targetworker, args(shared_dict, shared_list, lock, i))processes.append(p)p.start()for p in processes:p.join()print(dict(shared_dict))print(list(shared_list))六、信号处理import signalimport sysclass GracefulShutdown:优雅关闭处理def __init__(self):self.shutdown_requested Falsesignal.signal(signal.SIGTERM, self._handle_signal)signal.signal(signal.SIGINT, self._handle_signal)def _handle_signal(self, signum, frame):print(f\n收到信号 {signum}准备关闭...)self.shutdown_requested Truedef should_continue(self):return not self.shutdown_requested# 使用shutdown GracefulShutdown()while shutdown.should_continue():# 执行工作process_next_task()time.sleep(1)print(清理资源...)cleanup()print(已安全关闭)七、简易消息队列实现import jsonimport threadingfrom queue import Queue, Emptyfrom typing import Callable, Anyclass MessageBroker:简易发布/订阅消息代理def __init__(self):self._topics {}self._subscribers {}self._lock threading.Lock()def create_topic(self, topic_name, max_size1000):with self._lock:if topic_name not in self._topics:self._topics[topic_name] Queue(maxsizemax_size)self._subscribers[topic_name] []def publish(self, topic_name, message):if topic_name not in self._topics:raise ValueError(f主题不存在: {topic_name})self._topics[topic_name].put(message)self._notify_subscribers(topic_name, message)def subscribe(self, topic_name, callback: Callable):if topic_name not in self._subscribers:raise ValueError(f主题不存在: {topic_name})self._subscribers[topic_name].append(callback)def _notify_subscribers(self, topic_name, message):for callback in self._subscribers[topic_name]:threading.Thread(targetcallback, args(message,)).start()def consume(self, topic_name, timeoutNone):if topic_name not in self._topics:raise ValueError(f主题不存在: {topic_name})try:return self._topics[topic_name].get(timeouttimeout)except Empty:return None# 使用broker MessageBroker()broker.create_topic(orders)broker.create_topic(notifications)# 订阅broker.subscribe(orders, lambda msg: print(f处理订单: {msg}))# 发布broker.publish(orders, {order_id: 1, amount: 99.99})八、使用Redis作为消息队列class RedisMessageQueue:基于Redis的消息队列示意实现def __init__(self, redis_client, queue_name):self.redis redis_clientself.queue_name queue_namedef publish(self, message):data json.dumps(message)self.redis.lpush(self.queue_name, data)def consume(self, timeout0):result self.redis.brpop(self.queue_name, timeouttimeout)if result:_, data resultreturn json.loads(data)return Nonedef consume_batch(self, batch_size10):pipe self.redis.pipeline()for _ in range(batch_size):pipe.rpop(self.queue_name)results pipe.execute()return [json.loads(r) for r in results if r]def length(self):return self.redis.llen(self.queue_name)class RedisPubSub:Redis发布/订阅def __init__(self, redis_client):self.redis redis_clientself.pubsub redis_client.pubsub()def subscribe(self, channel, callback):self.pubsub.subscribe(**{channel: callback})thread self.pubsub.run_in_thread(sleep_time0.01)return threaddef publish(self, channel, message):self.redis.publish(channel, json.dumps(message))九、异步消息处理import asyncioclass AsyncMessageQueue:异步消息队列def __init__(self, maxsize0):self.queue asyncio.Queue(maxsizemaxsize)self.handlers []self._running Falsedef register_handler(self, handler):self.handlers.append(handler)async def publish(self, message):await self.queue.put(message)async def start_consuming(self, num_consumers3):self._running Trueconsumers [asyncio.create_task(self._consumer(fconsumer-{i}))for i in range(num_consumers)]await asyncio.gather(*consumers)async def _consumer(self, name):while self._running:try:message await asyncio.wait_for(self.queue.get(), timeout1.0)for handler in self.handlers:try:await handler(message)except Exception as e:print(f{name} 处理消息失败: {e})self.queue.task_done()except asyncio.TimeoutError:continueasync def stop(self):self._running Falseawait self.queue.join()# 使用async def order_handler(message):print(f处理订单: {message})await asyncio.sleep(0.5)async def main():mq AsyncMessageQueue()mq.register_handler(order_handler)# 启动消费者consumer_task asyncio.create_task(mq.start_consuming(3))# 发布消息for i in range(10):await mq.publish({order_id: i, amount: i * 10})await mq.queue.join()await mq.stop()十、总结IPC选择建议- 简单数据传递 - Queue或Pipe- 大量数据共享 - SharedMemory- 复杂共享对象 - Manager- 分布式系统 - Redis/RabbitMQ/Kafka- 异步场景 - asyncio.Queue设计原则1. 优先使用Queue它是进程安全的2. 共享内存性能最高但需要手动同步3. 消息队列解耦生产者和消费者4. 考虑消息丢失、重复消费、顺序性问题5. 生产环境使用成熟的消息中间件