SSE 协议与流式传输实践(前端 AI 场景)
目录
- SSE 是什么,为什么 AI 聊天常用它
- SSE 与 WebSocket 的区别
- SSE 最小协议格式
- 后端实现:Node SSE 接口
- 前端实现:fetch 读流与中断
- 端到端时序图
- 常见故障与排查
SSE 是什么,为什么 AI 聊天常用它
SSE(Server-Sent Events)是一种“服务端单向推送到客户端”的流式协议,基于 HTTP。
在 AI 聊天里,典型场景是模型逐 token 输出,前端边收边展示。
它适合 AI 聊天的原因:
- 语义简单:就是“后端不断推文本事件”
- 对基础设施友好:走标准 HTTP,更容易接入网关和日志体系
- 开发成本低:不需要维护复杂双向状态
SSE 与 WebSocket 的区别
- SSE:单向(服务端 -> 客户端),更适合“流式输出”
- WebSocket:双向,适合实时协作、多人互动、复杂信令
简单记忆:
“只要后端往前端推结果,且前端不需要频繁实时上行,SSE 通常够用。”
补充:实时语音对话(端到端语音模型)通常用 WebRTC/WebSocket 而不是 SSE,因为需要双向、低延迟、可打断(见 WebRTC 实践)。文本聊天流式则 SSE 最省心。
进阶:裸 SSE vs 结构化流式协议
上面是“裸 SSE + 自定义 JSON”。实际项目里,Vercel AI SDK 在 SSE 之上定义了一套 UI Message Stream Protocol:流里不只有文本增量,还能传工具调用、来源引用、错误、推理过程等多种结构化 part,前端 useChat 自动解析。
- 要完全自定义协议/网关 → 手写裸 SSE(理解本文即可)。
- 要快速做带工具/结构化/Generative UI 的标准化前端 → 用 AI SDK 的流式协议。
- 二者底层都是 HTTP 流式,理解了裸 SSE 就理解了 AI SDK 流式的“地基”。
SSE 最小协议格式
SSE 事件按文本行组织,最常见是 data: 行,事件之间用空行分隔:
data: {"type":"delta","text":"你"}
data: {"type":"delta","text":"好"}
data: {"type":"done"}
后端实现:Node SSE 接口
app.post("/api/chat/stream", async (req, res) => {
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const ac = new AbortController();
req.on("close", () => ac.abort());
try {
for await (const delta of callModelStream(req.body, ac.signal)) {
res.write(`data: ${JSON.stringify({ type: "delta", text: delta })}\n\n`);
}
res.write(`data: ${JSON.stringify({ type: "done" })}\n\n`);
} catch (e) {
res.write(`data: ${JSON.stringify({ type: "error", message: String(e) })}\n\n`);
} finally {
res.end();
}
});
前端实现:fetch 读流与中断
export async function startSSE(message: string, onEvent: (e: any) => void) {
const controller = new AbortController();
const res = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
signal: controller.signal,
});
const reader = res.body?.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (reader) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const parts = buffer.split("\n\n");
buffer = parts.pop() || "";
for (const p of parts) {
const line = p.split("\n").find((x) => x.startsWith("data:"));
if (!line) continue;
onEvent(JSON.parse(line.replace(/^data:\s*/, "")));
}
}
return () => controller.abort();
}
端到端时序图
常见故障与排查
- 前端收不到流:检查
Content-Type是否为text/event-stream - 停止无效:前端是否调用
abort,服务端是否监听close - 文本重复:检查 buffer 分片逻辑是否重复解析
- 响应卡顿:检查是否被代理缓冲,可尝试关闭缓冲或降低 chunk 间隔