做AI 聊天、Agent 对话或长任务反馈时用户最先感知到的通常不是模型能力而是结果返回的方式。一次性整段返回体验像“卡住”边生成边返回、边解析边更新体验才更接近实时交互。前端实现这类能力时绕不开三个概念SSEEventSourcefetch ReadableStream这三个词经常被混用但它们不在同一层。真正需要想清楚的是前端怎么发请求后端怎么持续返回前端怎么持续读取和更新页面用户点击“停止”时哪一层需要停网络断开后是否重试、如何续接这篇文章就按这条主线来讲。阅读导图先看整篇文章的结构基础概念 - SSE 是什么 - EventSource 是什么 - fetch ReadableStream 是什么 请求与返回 - 前端怎么发 - 后端怎么持续回 - 前端怎么持续读 控制能力 - 停止生成 - 断线重试 - 断点续接 选型建议 - 什么时候选 EventSource - 什么时候选 fetch ReadableStream一、先把三个概念拆开先说结论SSE是一种事件流格式EventSource是浏览器原生的 SSE 客户端 APIfetch ReadableStream是更底层的流式请求与读取方式一句话理解SSE 是“怎么返回”EventSource 是“怎么接 SSE”fetch ReadableStream 是“怎么自己掌控整条流式链路”。再看一张关系图---------------------- | SSE | | 一种事件流格式 | --------------------- | ------------------------ | | v v ------------------------ ----------------------------- | EventSource | | fetch ReadableStream | | 浏览器原生 SSE 客户端 | | 通用流式请求与读取方式 | ------------------------ ----------------------------- 结论 EventSource 常用于“直接接 SSE” fetch ReadableStream 常用于“自己发请求 自己读流 自己控流程”1. SSE 是什么定义SSE全称Server-Sent Events。可以把它理解成一种持续返回的 HTTP 响应客户端发起请求服务端不立即关闭连接服务端持续推送文本事件它本质上是服务端单向推送很适合文本流式输出。格式特点一个典型的 SSE 数据格式如下data: {type:delta,content:你} data: {type:delta,content:好} data: {type:done}关键点只有两个每条事件通常以data:开头事件之间用空行分隔也就是\n\n所以SSE 解决的是“服务端如何分段返回事件”。2. EventSource 是什么它解决什么问题EventSource不是 SSE 本身而是浏览器提供的 SSE 客户端 API。它的优点很明显原生支持不用自己写 SSE 解析逻辑浏览器会处理基础连接和部分断线重连它的边界它也有明确限制只支持GET不能带请求体不能自由设置自定义请求头不适合复杂鉴权和上下文传递最小示例constesnewEventSource(/api/stream)es.onmessage(event){document.getElementById(output).innerTextevent.data}es.onerror(err){console.error(连接异常,err)}更适合什么场景如果只是订阅一个现成事件流EventSource很方便。如果要发起一条复杂的 AI 聊天请求再接收流式响应它通常不够灵活。3. fetch ReadableStream 是什么它的本质如果说EventSource是“浏览器帮你接好了 SSE”那fetch ReadableStream更像是你自己发请求自己拿流自己解析自己更新 UI为什么它更常用于 AI 聊天它的优势就在控制力支持GET也支持POST可以带复杂body可以带聊天上下文可以加自定义headers更适合 AI 聊天这种“复杂请求 流式响应”场景代价也很明确你要自己读流自己做协议解析自己处理done / error / abort自己处理重试和恢复复杂项目里fetch ReadableStream往往更常见原因也在这里。4. 三者关系一张表看懂概念它是什么能不能发复杂请求要不要自己解析流更适合什么场景SSE服务端事件流格式取决于客户端实现取决于客户端实现标准化文本事件流EventSource浏览器原生 SSE 客户端不适合复杂请求通常不用简单订阅、日志流、状态推送fetch ReadableStream通用流式请求与读取方式可以需要AI 聊天、复杂流式交互二、为什么 AI 聊天更常用 fetch而不是 EventSource这个问题不用从规范出发从接口设计出发就够了。请求为什么会变复杂一条真实的 AI 聊天请求通常不只是一句文本还会带上当前用户输入历史对话会话 ID模型参数鉴权信息业务开关租户信息为什么 EventSource 不够自然这时候如果用EventSource会比较别扭。因为它更像“订阅一个GET流”而不是“提交一份复杂请求体再接收增量结果”。为什么 fetch 更贴近业务fetch天然适合这种模式constresponseawaitfetch(/api/chat/stream,{method:POST,headers:{Content-Type:application/json,},body:JSON.stringify({message:inputValue,sessionId,history,}),})所以更准确的结论不是“EventSource 不好”而是简单 SSE 订阅用EventSource更省事AI 聊天流式交互用fetch ReadableStream更实用三、一条真实的 AI 流式交互链路是怎样的先看主链路用户输入问题 - 前端发起 POST /api/chat/stream - 服务端调用大模型流式接口 - 服务端边读上游结果边写回响应 - 前端持续读取 response.body - 前端解析 chunk / event - 前端更新同一条 assistant 消息 - 页面持续显示增量结果如果从角色分工来理解可以看成这样[User] | v [Frontend] - 发送请求 - 读取响应流 - 维护消息状态 | v [Backend] - 接收聊天参数 - 调用上游模型 - 转发增量事件 | v [LLM Provider] - 持续产出 delta真正关键不在“流”而在“同一条消息”流式体验的关键不只是“持续返回”而是同一条 AI 消息被持续追加而不是每收到一段就新建一条消息。一个典型消息结构如下typeChatMessage{id:stringrole:user|assistantcontent:stringstatus?:streaming|done|error}用户发送后前端通常会先做两件事插入用户消息插入一条content: 的 assistant 占位消息后面每收到一个增量片段就把它拼到这条 assistant 消息上。四、后端如何返回流Node.js 里的最小可用做法如果后端用 Node.js / Express常见做法是返回text/event-stream然后持续res.write()。最小代码示例app.post(/api/chat/stream,async(req,res){res.setHeader(Content-Type,text/event-stream; charsetutf-8)res.setHeader(Cache-Control,no-cache, no-transform)res.setHeader(Connection,keep-alive)constuserMessagereq.body.messageconstmodelStreamawaitcallLLMStream(userMessage)forawait(constchunkofmodelStream){consttextchunk.delta||if(!text)continueres.write(data:${JSON.stringify({type:delta,content:text,})}\n\n)}res.write(data:${JSON.stringify({type:done})}\n\n)res.end()})这段代码在做什么callLLMStream(userMessage)调用上游模型流for await ... of modelStream持续读取增量res.write(...)立即转发给前端done告诉前端流结束了如果只想把“后端如何流式返回”讲清楚这段代码已经够用。五、前端如何解析并更新页面不要把“流”当成一次性响应前端调用fetch后不能直接await response.json()而是要持续读取response.body。这也是最容易踩的坑请求发出去了但响应不是一次性到达的。一个典型解析过程constreaderresponse.body?.getReader()constdecodernewTextDecoder(utf-8)letbufferwhile(true){const{done,value}awaitreader.read()if(done)breakbufferdecoder.decode(value,{stream:true})constpartsbuffer.split(\n\n)bufferparts.pop()||for(constpartofparts){if(!part.startsWith(data: ))continueconstjsonStrpart.slice(6)constdataJSON.parse(jsonStr)if(data.typedelta){setMessages(prevprev.map(msgmsg.idassistantMsg.id?{...msg,content:msg.contentdata.content}:msg))}if(data.typedone){setMessages(prevprev.map(msgmsg.idassistantMsg.id?{...msg,status:done}:msg))}}}这段逻辑的三个关键动作持续读reader.read()会不断拿到新的字节块。维护buffer一次read()不一定刚好读到完整事件所以必须做拼接。更新同一条消息delta是追加不是新增消息。为什么一定要有 buffer收到字节块 - TextDecoder 解码 - 追加到 buffer - 按 \n\n 切分完整事件 - 解析 data: 后面的 JSON - 判断是 delta / done / error - 更新同一条 assistant 消息六、除了 SSE流式接口还常见哪些格式很多人会把“流式输出”等同于 SSE其实不完全准确。SSE 只是最常见的一种。常见的三种格式1. SSEdata: {type:delta,content:你} data: {type:delta,content:好}优点是标准化、可扩展适合事件驱动的流式响应。2. NDJSON{text:你} {text:好}一行一个 JSON通常按\n切分。实现简单也适合日志流和结构化输出。3. 纯文本分块服务端不封装 JSON也不封装事件只是不断返回文本片段。这种方式最简单但扩展性最差。因为一旦要加入done、error、eventId、cursor等信息协议很快就会混乱。如果是 AI 聊天页面通常更建议用 SSE 风格 JSON 事件或者用 NDJSON七、停止生成怎么做前端停接收不等于后端停任务流式链路跑起来后第二个必须解决的问题就是“停止生成”。先区分两层停止“停止”实际上分成两层前端不再接收后端不再继续生成很多实现只做了第一层所以页面停了服务端还在跑。为什么这个问题容易被忽略用户点击“停止” | - 前端层 | - 中止 fetch / 关闭 EventSource | - 停止继续读取响应 | - 后端层 - 取消当前任务 - 停止调用或转发上游模型结果1. fetch ReadableStream 的打断常用方案在fetch这条线上主方案是AbortController。letcontrollernullletreadernullasyncfunctionstartStream(){controllernewAbortController()try{constresawaitfetch(/api/chat/stream,{signal:controller.signal,})readerres.body.getReader()while(true){const{done,value}awaitreader.read()if(done)break// 解析 value 并更新页面}}catch(err){if(err.name!AbortError){console.error(err)}}finally{readernullcontrollernull}}asyncfunctionstopStream(){controller?.abort()awaitreader?.cancel(user stopped)}怎么理解这两个动作controller.abort()从请求层终止reader.cancel()从读取层停止消费大多数聊天页面里前者已经够用后者通常是补充。2. EventSource 的打断为什么通常要拆接口EventSource没有AbortController它的停止方式是close()。但如果既想让前端停接收又想让后端停任务通常要拆成三步POST /api/chat/create创建任务返回taskIdGET /api/chat/stream?taskIdxxx用EventSource订阅任务流POST /api/chat/cancel带上taskId通知后端取消一个典型调用过程constcreateRespawaitfetch(/api/chat/create,{method:POST,headers:{Content-Type:application/json},body:JSON.stringify({prompt:解释一下 SSE 的打断机制}),})const{taskId}awaitcreateResp.json()constesnewEventSource(/api/chat/stream?taskId${taskId})es.addEventListener(message,(e){constdataJSON.parse(e.data)console.log(delta:,data.delta)})asyncfunctionstop(){es.close()awaitfetch(/api/chat/cancel,{method:POST,headers:{Content-Type:application/json},body:JSON.stringify({taskId}),})}这个设计的关键是把“前端断开连接”和“后端取消任务”拆开。八、断线重试EventSource 和 fetch 不是一个量级的工作量1. EventSource 的重连浏览器会帮你做一部分EventSource的断线重连浏览器会帮你处理一部分。onerror会触发但它通常不是手动重连的主要入口。很多时候浏览器已经在自动重连业务层更多是在做兜底。2. fetch ReadableStream 的重连为什么这部分工作量更大fetch没有内建断线重连所以这些事都要自己做重试逻辑退避策略错误分类断点续接用户主动停止时的终止控制比较实用的做法是把“是否重试”和“如何重试”拆开。先看重试决策请求失败 - 用户是不是主动停止的 - 是不重试 - 否继续判断 - 是否超过最大重试次数 - 是不重试 - 否继续判断 - 是否属于可重试错误 - 否结束 - 是退避等待后重发请求shouldRetry决定要不要重试functionshouldRetry({error,status,attempt,maxRetries,signal}){if(signal?.aborted)returnfalseif(error?.nameAbortError)returnfalseif(attemptmaxRetries)returnfalseif(typeofstatusnumber){if([400,401,403,404,422].includes(status))returnfalseif([408,429,500,502,503,504].includes(status))returntrue}returntrue}streamWithRetry负责真正重发asyncfunctionstreamWithRetry({url,methodPOST,headers{Content-Type:application/json},buildBody,parseChunk,onDone,onRetry,onOpen,signal,maxRetries5,}){letattempt0letfinishedfalseconststate{cursor:0,buffer:,}while(!finished){if(signal?.aborted){thrownewDOMException(Aborted,AbortError)}letresponseletstatustry{responseawaitfetch(url,{method,headers,body:buildBody?JSON.stringify(buildBody(state)):undefined,signal,})statusresponse.statusif(!response.ok){thrownewError(HTTP${response.status})}onOpen?.({attempt,state})constreaderresponse.body.getReader()constdecodernewTextDecoder()while(true){const{done,value}awaitreader.read()if(done){finishedtrueonDone?.(state)break}state.bufferdecoder.decode(value,{stream:true})parseChunk?.(state.buffer,{state,setBuffer(next){state.buffernext},markDone(){finishedtrue},})if(finished){onDone?.(state)break}}}catch(error){constretryshouldRetry({error,status,attempt,maxRetries,signal,})if(!retry){throwerror}attempt1constdelayMath.min(1000*2**(attempt-1),10000)onRetry?.({attempt,delay,error,status,state,})awaitnewPromise(resolvesetTimeout(resolve,delay))}}}这套结构的重点不是代码多复杂而是职责清楚shouldRetry决定是否重试streamWithRetry负责重发请求state.cursor记录续接位置signal保证用户停止时能直接终止九、如果要做断点续接前后端要怎么配合只重试、不续接意义有限。为什么一定要做续接流式输出最怕两件事已显示的内容又重复一次中途断开的那一小段内容彻底丢失所以更完整的做法是前后端一起维护“已经处理到哪里”。前后端各自负责什么前端保存进度 - cursor / chunkIndex / eventId - 重试时把这些信息带给服务端 服务端理解进度 - 知道客户端已经收到哪里 - 跳过已发送片段 - 从下一个片段继续推送前端可以保存cursorchunkIndexeventId服务端则需要支持根据断点恢复流跳过已经发送过的片段从下一个片段继续发送cursor 和 buffer 不是一回事这也是为什么前面的streamWithRetry里要维护conststate{cursor:0,buffer:,}其中buffer处理半包拼接cursor处理业务续接如果已经做到“断线重连”那就不要只盯着前端。真正决定体验的往往是服务端有没有恢复能力。十、这类实现最常见的几个坑先看一眼常见误区1. 把每个增量都渲染成一条消息错误结果会像这样AI你 AI好 AI我 AI来 AI解 AI释正确做法是始终更新同一条 assistant 消息。2. 忽略半包和粘包一次reader.read()拿到的不一定就是一整条事件。所以buffer不是装饰而是必需品。3. 前端停了但后端还在跑无论是AbortController.abort()还是EventSource.close()前端停止接收都不等于后端任务真的结束。如果服务端调用上游模型是长耗时操作最好把取消信号继续向上传递。4. 只做重试不做恢复没有cursor / eventId / chunkIndex的重试通常只能“重新开始”不能“从断点继续”。十一、最终建议到底该怎么选先按场景划分如果你的场景是简单的服务端单向通知比如日志推送状态推送轻量消息订阅那么EventSource很省事。如果你的场景是 AI 聊天流式交互而且满足这些条件需要POST需要复杂请求体需要自定义headers需要处理上下文需要打断需要重试需要断点续接那更建议从一开始就走fetch ReadableStream。原因不是它更“高级”而是它更接近真实业务的控制面。最后用一张图快速判断需求是否只是简单订阅一个现成事件流 - 是优先考虑 EventSource - 否继续判断 是否需要 POST、请求体、鉴权头、停止、重试、续接 - 是优先考虑 fetch ReadableStream - 否两者都可但 fetch 通常更通用结语回到最开始的问题AI 聊天里的流式交互到底该怎么理解和实现可以把答案压缩成一句话后端按增量持续返回前端持续读取、解析、更新状态再把停止、重试和续接补齐。在这条链路里SSE解决“如何以事件流形式返回”EventSource解决“如何简单接收 SSE”fetch ReadableStream解决“业务变复杂后如何掌控整条流式链路”如果目标是做一个真正可用、可打断、可重试、可恢复的 AI 聊天页面fetch ReadableStream通常会是更稳的路线。