AI 服务端最低配(Node)
副标题:转发、SSE 流式、鉴权与限流(前端视角)
目标:让你具备“半个后端”的最低配能力——能把 AI 能力安全地接进产品,并做出可上线的稳定性保障。
目录
- 为什么前端+AI 一定要会服务端
- 最小架构:浏览器 → 你的服务端 → 模型 API
- 最小可运行 Demo(服务端版)
- API 转发:保护 Key 与统一鉴权
- 流式输出:SSE 的关键点(实现与坑)
- 鉴权、限流与成本控制:上线前必须有
- 日志与可观测:出了问题能定位
- 完整可运行代码(SSE 服务端)
为什么前端+AI 一定要会服务端
只在浏览器里直连模型 API 会带来一堆硬伤:
- Key 暴露:任何人都能从网络请求里拿到你的 Key
- 无法统一鉴权/限流:很难按用户、按组织、按 IP 控制访问
- 无法做流式体验:很多 SDK/跨域/代理限制会让流式实现变复杂
- 无法做审计与成本控制:你需要在服务端记录使用量与失败原因
因此最低配的后端能力,是“前端+AI”的必修课。
最小架构:浏览器 → 你的服务端 → 模型 API
推荐的最小可上线架构:
- 浏览器
- 只调用你自己的 API(同源)
- 只传业务输入(不传模型 Key)
- 你的服务端(Node 优先)
- 负责:鉴权、限流、转发、流式、日志、成本控制
- 模型 API
- 只对你的服务端开放(Key 放在服务端环境变量)
如果你做 RAG,还会再多一层:
- 文档解析/切分 → 向量库检索 → 拼 Prompt → 调用模型
图示:最小可上线架构(Mermaid)
图示:一次“流式聊天”的时序(Mermaid)
端到端示例(可直接照抄跑通):Express + SSE 流式代理
下面给一个“你自己的服务端 → 模型API”的最小可运行骨架。重点不是 SDK,而是:
- Key 永远留在服务端(环境变量)
- 用 SSE 把流式结果推回浏览器
- 支持前端“停止生成”,服务端同步中止上游请求
- 统一错误格式,前端好兜底
说明:示例以 OpenAI 风格的“流式返回”为例(不同模型 provider 解析 chunk 的方式略不同,但架构一致)。
1)服务端:POST /api/ai/chat/stream(SSE)
import express from "express";
import crypto from "crypto";
const app = express();
app.use(express.json({ limit: "1mb" }));
function getTraceId() {
return crypto.randomUUID?.() || crypto.randomBytes(16).toString("hex");
}
function sseInit(res: express.Response) {
res.status(200);
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache, no-transform");
res.setHeader("Connection", "keep-alive");
// 立刻把 headers 刷到客户端,避免“卡住很久才开始显示”
res.flushHeaders?.();
}
function sseSend(res: express.Response, payload: unknown, event?: string) {
if (event) res.write(`event: ${event}\n`);
res.write(`data: ${JSON.stringify(payload)}\n\n`);
}
function normalizeError(e: unknown) {
// 你可以按 provider 的错误结构再细分,这里给一个最小版本
const message = e instanceof Error ? e.message : String(e);
return { code: "AI_UPSTREAM_ERROR", message };
}
// 示例:最低配鉴权(生产请接入你自己的登录体系)
function requireAuth(req: express.Request, res: express.Response, next: express.NextFunction) {
const token = req.headers.authorization?.replace(/^Bearer\s+/i, "");
if (!token) return res.status(401).json({ code: "UNAUTHORIZED", message: "缺少登录信息" });
// TODO: 校验 token 并写入 req.user
(req as any).user = { id: "demo-user" };
next();
}
app.post("/api/ai/chat/stream", requireAuth, async (req, res) => {
const traceId = getTraceId();
res.setHeader("X-Trace-Id", traceId);
const { messages, model = "gpt-4.1-mini", temperature = 0.3, max_tokens = 600 } = req.body || {};
if (!Array.isArray(messages) || messages.length === 0) {
return res.status(400).json({ code: "BAD_REQUEST", message: "messages 不能为空", traceId });
}
sseInit(res);
sseSend(res, { type: "meta", traceId, model }, "meta");
const ac = new AbortController();
// 前端断开时:立刻中止上游,避免继续消耗 Token
req.on("close", () => {
ac.abort("client_disconnected");
});
try {
const upstream = await fetch("https://api.openai.com/v1/chat/completions", {
method: "POST",
signal: ac.signal,
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model,
temperature,
max_tokens,
stream: true,
messages,
}),
});
if (!upstream.ok || !upstream.body) {
const text = await upstream.text().catch(() => "");
sseSend(res, { type: "error", traceId, code: "UPSTREAM_HTTP_ERROR", message: text || upstream.statusText }, "error");
return res.end();
}
const reader = upstream.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// OpenAI 的流式响应是按行的 SSE(data: ...),这里做一个最小解析器
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed.startsWith("data:")) continue;
const data = trimmed.replace(/^data:\s*/, "");
if (data === "[DONE]") {
sseSend(res, { type: "done", traceId }, "done");
res.end();
return;
}
try {
const json = JSON.parse(data);
const delta = json?.choices?.[0]?.delta?.content;
if (delta) sseSend(res, { type: "delta", traceId, text: delta }, "delta");
} catch {
// 忽略解析失败的行(不同 provider/SDK 可能会有 keep-alive 行)
}
}
}
sseSend(res, { type: "done", traceId }, "done");
res.end();
} catch (e) {
const ne = normalizeError(e);
sseSend(res, { type: "error", traceId, ...ne }, "error");
res.end();
}
});
app.listen(3001, () => {
// eslint-disable-next-line no-console
console.log("SSE server listening on http://localhost:3001");
});
2)前端:fetch 读流 + AbortController 取消
下面示例不依赖 EventSource(因为 EventSource 只支持 GET,很多项目用 POST 更方便带 body)。
type SSEPayload =
| { type: "meta"; traceId: string; model: string }
| { type: "delta"; traceId: string; text: string }
| { type: "done"; traceId: string }
| { type: "error"; traceId: string; code: string; message: string };
export function startChatStream(params: {
messages: Array<{ role: "system" | "user" | "assistant"; content: string }>;
onDelta: (text: string) => void;
onMeta?: (traceId: string) => void;
onError?: (err: { traceId?: string; code?: string; message: string }) => void;
onDone?: () => void;
token: string;
}) {
const ac = new AbortController();
(async () => {
try {
const res = await fetch("/api/ai/chat/stream", {
method: "POST",
signal: ac.signal,
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${params.token}`,
},
body: JSON.stringify({ messages: params.messages }),
});
if (!res.ok || !res.body) {
const text = await res.text().catch(() => "");
params.onError?.({ message: text || `HTTP ${res.status}` });
return;
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
// 解析 SSE:按 \n\n 分隔事件块
const chunks = buf.split("\n\n");
buf = chunks.pop() || "";
for (const block of chunks) {
const lines = block.split("\n");
const dataLine = lines.find((l) => l.startsWith("data:"));
if (!dataLine) continue;
const jsonStr = dataLine.replace(/^data:\s*/, "");
const payload = JSON.parse(jsonStr) as SSEPayload;
if (payload.type === "meta") params.onMeta?.(payload.traceId);
if (payload.type === "delta") params.onDelta(payload.text);
if (payload.type === "error") params.onError?.(payload);
if (payload.type === "done") params.onDone?.();
}
}
} catch (e) {
if ((e as any)?.name === "AbortError") return;
params.onError?.({ message: e instanceof Error ? e.message : String(e) });
}
})();
return {
abort: () => ac.abort(),
};
}
体验建议:把
traceId显示在“调试信息”里(或提供一键复制),线上排障会省很多时间。
3)“停止生成”要同时停止两端
- 前端:调用
abort(),中断 fetch 流 - 服务端:监听
req.close,并AbortController.abort()中止上游
否则会出现最常见的“隐形成本坑”:用户停了,但服务端还在继续拉模型流,Token 继续计费。
最小可运行 Demo(服务端版)
目标:保证你能在本地把“流式 + 取消 + 统一错误码”跑通。
完整 Demo 结构(推荐最小骨架)
demo-ai-sse/
server/
server.ts
.env
web/
src/
App.tsx
api.ts
常见坑排查清单
- 一直没有流式输出:确认
Content-Type: text/event-stream+res.flushHeaders() - 前端无法取消:未使用
AbortController或服务端未监听close - 上游超时频繁:增加服务端超时配置 + 降低 max_tokens
API 转发:保护 Key 与统一鉴权
你应该做到的最低要求
- Key 不出现在前端:放环境变量、密钥管理系统
- 用户鉴权在你的服务端:JWT/Session 都行,关键是“能识别是谁”
- 统一入口:所有模型调用走同一个网关层(便于加限流/日志/灰度)
常见工程要点(前端视角)
- 请求结构统一:前端传
messages/input,服务端补model/temperature/max_tokens - 错误码统一:把不同模型的错误收敛成统一错误码,前端好做兜底
- 超时与重试:服务端控制超时(不要靠浏览器默认)
流式输出:SSE 的关键点(实现与坑)
SSE(Server-Sent Events)非常适合“打字机流式输出”:
- 单向:服务端 → 浏览器
- 基于 HTTP,部署与穿透一般比 WebSocket 简单
关键点 1:前端要支持“可取消”
你需要能在用户点击“停止生成”时:
- 终止浏览器端连接
- 同时让服务端尽快停止向模型继续拉流(避免浪费 Token)
关键点 2:服务端要做“代理转发流”
服务端通常需要把模型的流式响应,转换成 SSE 的 data: 分段:
- 保证逐段 flush
- 保证连接不断
- 处理客户端断开(close)时及时清理资源
关键点 3:流式与渲染要解耦
前端常见坑:
- 每个 token 都触发一次 React render,导致卡顿
- Markdown 渲染频繁重算,导致掉帧
解决思路:
- 做缓冲(比如 50-100ms 合并一次更新)
- 增量拼接 + 节流渲染
鉴权、限流与成本控制:上线前必须有
鉴权(Auth)
最小实现建议:
- 未登录:禁止调用
- 已登录:按用户维度记录用量
如果是企业场景,还需要:
- 组织/租户维度的配额
- 管理端查看用量与账单
限流(Rate Limit)
要回答两个问题:
- 单用户 QPS 限制?
- 同一用户并发对话限制?(比如最多 2 个流式请求同时进行)
成本控制(Token Budget)
你至少要控制:
- max_tokens:限制输出长度
- 输入长度:前端 UI 字数限制 + 服务端二次校验
- 历史对话策略:摘要/截断/只保留关键信息
日志与可观测:出了问题能定位
建议你在服务端至少记录(注意脱敏):
- 请求 id(traceId)
- userId / tenantId
- 选用的模型与参数(temperature、max_tokens)
- 关键路径耗时:鉴权、检索、模型响应、流式总时长
- 错误码与原始错误(脱敏)
前端配合:
- 把 traceId 显示在 debug 面板(或复制按钮)
- 失败时引导用户“带着 traceId 去反馈”
这套东西在面试时非常加分:它证明你能做“可上线的 AI 产品”。
完整可运行代码(SSE 服务端)
源码目录:
docs/demos/ai-chat-demo/server
cd docs/demos/ai-chat-demo/server
npm i
npm run dev