Hyperf 实现流式输出(SSE)对接 LLM
以下是 Hyperf SSE 流式输出对接 LLM)的完整实现方案 --- 依赖库composerrequire openai-php/client# OpenAI / 兼容接口 composer require anthropics/anthropic-sdk-php # Claude 官方 SDK若有# 或直接用 Hyperf 内置 Guzzle 手动对接任意 LLM API--- 一、SSE 响应头设置 use Hyperf\HttpServer\Contract\ResponseInterface;$response$response-withHeader(Content-Type,text/event-stream; charsetutf-8)-withHeader(Cache-Control,no-cache, no-store)-withHeader(Connection,keep-alive)-withHeader(X-Accel-Buffering,no)// 关键禁止 Nginx 缓冲 -withHeader(Transfer-Encoding,chunked);▎ X-Accel-Buffering: no 是 Nginx 反代场景必须加的否则会攒包。 --- 二、对接 OpenAI 流式openai-php/client?php namespace App\Controller;use Hyperf\HttpServer\Contract\ResponseInterface;use Hyperf\HttpServer\Contract\RequestInterface;use OpenAI\Client;class LlmStreamController{publicfunction__construct(private Client$openai){}publicfunctionstream(RequestInterface$request, ResponseInterface$response){$prompt$request-input(prompt,Hello);$response$response-withHeader(Content-Type,text/event-stream; charsetutf-8)-withHeader(Cache-Control,no-cache)-withHeader(Connection,keep-alive)-withHeader(X-Accel-Buffering,no);// openai-php/clientstreamtrue 返回 Generator$stream$this-openai-chat()-createStreamed([modelgpt-4o,messages[[roleuser,content$prompt]],]);foreach($streamas$chunk){$delta$chunk-choices[0]-delta-content ??;if($delta!){$response-write(data: .json_encode([typedelta,delta$delta,]).\n\n);}// 协程非阻塞让出防止 Swoole 发送缓冲区积压\Swoole\Coroutine::sleep(0);}$response-write(data: .json_encode([typedone]).\n\n);return$response;}}--- 三、对接 Claude / 任意 LLMGuzzle 手动流式?php namespace App\Service;use GuzzleHttp\Client;use Hyperf\HttpServer\Contract\ResponseInterface;class ClaudeSseService{publicfunctionstream(string$prompt, ResponseInterface$response): void{$clientnew Client();// Guzzlestreamtrue 拿到原始流$guzzleResp$client-post(https://api.anthropic.com/v1/messages,[headers[x-api-keyenv(ANTHROPIC_API_KEY),anthropic-version2023-06-01,content-typeapplication/json,],json[modelclaude-sonnet-4-6,max_tokens2048,streamtrue,messages[[roleuser,content$prompt]],],streamtrue, // Guzzle 流式读取]);$body$guzzleResp-getBody();while(!$body-eof()){$line$this-readLine($body);if(str_starts_with($line,data: )){$jsonsubstr($line,6);$eventjson_decode($json,true);// Claude SSE 事件类型if(($event[type]??)content_block_delta){ $text$event[delta][text]??;if($text!){ $response-write(data: . json_encode([ typedelta,delta$text,]). \n\n);\Swoole\Coroutine::sleep(0);//让出协程 } } if(($event[type]??)message_stop){ $response-write(data: . json_encode([typedone]). \n\n);break;} } } } private function readLine(\Psr\Http\Message\StreamInterface $body):string { $line;while(!$body-eof()){$char$body-read(1);if($char\n)break;$line.$char;}returnrtrim($line,\r);}}--- 四、路由注册 // config/routes.php Router::get(/api/llm/stream,[LlmStreamController::class,stream]);--- 五、常见坑与解决方案 ┌────────────────────────────────────────┬─────────────────┬─────────────────────────────────────┐ │ 问题 │ 原因 │ 解决 │ ├────────────────────────────────────────┼─────────────────┼─────────────────────────────────────┤ │ 前端收不到分块全部一次返回 │ Nginx 缓冲开着 │ 加 X-Accel-Buffering: no │ ├────────────────────────────────────────┼─────────────────┼─────────────────────────────────────┤ │ Swoole 发送缓冲区积压 │ 写得太快 │ 每次write后 Coroutine::sleep(0)│ ├────────────────────────────────────────┼─────────────────┼─────────────────────────────────────┤ │ sleep()阻塞整个 Worker │ 用了同步sleep│ 改用\Swoole\Coroutine::sleep()│ ├────────────────────────────────────────┼─────────────────┼─────────────────────────────────────┤ │ openai-php/client 报 stream 方法不存在 │ 版本问题 │ 用 createStreamed()(v0.8)│ ├────────────────────────────────────────┼─────────────────┼─────────────────────────────────────┤ │ 连接断开后服务端还在跑 │ 没有断连检测 │ 在write前判断连接状态或 try/catch │ ├────────────────────────────────────────┼─────────────────┼─────────────────────────────────────┤ │ HTTP/2 下 SSE 帧被合并 │ HTTP/2 多路复用 │ 强制用 HTTP/1.1 或改用 WebSocket │ └────────────────────────────────────────┴─────────────────┴─────────────────────────────────────┘ --- 六、前端接收示例 const esnew EventSource(/api/llm/stream?prompt你好);es.onmessage(e){const dataJSON.parse(e.data);if(data.typedelta){document.getElementById(output).textContentdata.delta;}if(data.typedone){es.close();}};--- 核心要点Hyperf 中 SSE 用$response-write()逐块推送对接 LLM 流式时用 openai-php/client 的 createStreamed()或 Guzzlestreamtrue协程上下文里 必须用 Coroutine::sleep(0)让出执行权避免缓冲区阻塞。