PHPPHP与消息队列RabbitMQ集成
PHP与消息队列RabbitMQ集成RabbitMQ是流行的消息中间件。PHP通过AMQP扩展或php-amqplib库连接RabbitMQ。今天说说PHP与RabbitMQ的集成。连接RabbitMQ。php// composer require php-amqplib/php-amqplibrequire vendor/autoload.php;use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;$connection new AMQPStreamConnection(localhost, 5672, guest, guest);$channel $connection-channel();echo 已连接到RabbitMQ\n;$channel-close();$connection-close();?发送消息到队列。phpfunction sendToQueue(string $queueName, array $data): void{$connection new AMQPStreamConnection(localhost, 5672, guest, guest);$channel $connection-channel();$channel-queue_declare($queueName, false, true, false, false);$message new AMQPMessage(json_encode($data), [delivery_mode AMQPMessage::DELIVERY_MODE_PERSISTENT,]);$channel-basic_publish($message, , $queueName);echo 消息已发送到队列: $queueName\n;$channel-close();$connection-close();}sendToQueue(task_queue, [task send_email, to userexample.com]);?消费消息。phpfunction consumeQueue(string $queueName, callable $handler): void{$connection new AMQPStreamConnection(localhost, 5672, guest, guest);$channel $connection-channel();$channel-queue_declare($queueName, false, true, false, false);echo 等待消息...\n;$channel-basic_consume($queueName, , false, false, false, false, function ($msg) use ($handler) {echo 收到消息\n;$data json_decode($msg-body, true);try {$handler($data);$msg-ack();echo 处理完成\n;} catch (Exception $e) {echo 处理失败: {$e-getMessage()}\n;$msg-nack(true);}});while ($channel-is_consuming()) {$channel-wait();}$channel-close();$connection-close();}consumeQueue(task_queue, function ($data) {echo 处理: {$data[task]}\n;});?发布订阅模式。php// 发布者function publish(string $exchangeName, array $data): void{$connection new AMQPStreamConnection(localhost, 5672, guest, guest);$channel $connection-channel();$channel-exchange_declare($exchangeName, fanout, false, false, false);$message new AMQPMessage(json_encode($data));$channel-basic_publish($message, $exchangeName);echo 已发布到交换器: $exchangeName\n;$channel-close();$connection-close();}// 订阅者function subscribe(string $exchangeName, string $queueName, callable $handler): void{$connection new AMQPStreamConnection(localhost, 5672, guest, guest);$channel $connection-channel();$channel-exchange_declare($exchangeName, fanout, false, false, false);$channel-queue_declare($queueName, false, false, false, false);$channel-queue_bind($queueName, $exchangeName);$channel-basic_consume($queueName, , false, true, false, false, function ($msg) use ($handler) {$handler(json_decode($msg-body, true));});while ($channel-is_consuming()) {$channel-wait();}}?延迟队列的实现。phpfunction sendDelayed(string $queueName, array $data, int $delayMs): void{$connection new AMQPStreamConnection(localhost, 5672, guest, guest);$channel $connection-channel();$message new AMQPMessage(json_encode($data), [delivery_mode AMQPMessage::DELIVERY_MODE_PERSISTENT,application_headers new AMQPTable([x-delay $delayMs,]),]);$channel-basic_publish($message, , $queueName);echo 延迟消息已发送 ({$delayMs}ms后执行)\n;}?RabbitMQ是功能完整的消息队列系统。支持多种消息模式、消息持久化、ACK确认、延迟队列。PHP通过php-amqplib库可以方便地集成RabbitMQ适合需要可靠消息传递的场景。