SSE服务器推送解决方案
目录
介绍
在现代Web应用中,我们经常需要服务器主动向客户端推送数据。比如,股票价格实时更新、社交媒体的新消息通知、在线协作工具的实时同步等场景。传统的HTTP请求是客户端主动向服务器请求数据,而服务器无法主动推送。这时候就需要用到实时通信技术。
SSE(Server-Sent Events,服务器发送事件)是HTML5标准中定义的一种服务器推送技术。它允许服务器通过HTTP连接主动向客户端推送数据,而客户端只需要简单地监听这些事件即可。
什么是SSE?
SSE是一种基于HTTP的服务器推送技术,它使用了一种特殊的文本格式来传输数据。与WebSocket相比,SSE更加轻量级,使用更简单,但它是单向通信——只能服务器向客户端推送,客户端无法通过SSE连接向服务器发送数据。
SSE的核心特点:
- 单向通信: 只支持服务器到客户端的数据推送
- 基于HTTP: 使用标准HTTP协议,无需特殊协议支持
- 自动重连: 连接断开后会自动尝试重新连接
- 文本格式: 传输的数据是UTF-8编码的文本
- 事件ID: 支持事件标识,断线重连时可以从上次位置继续
为什么选择SSE?
当您的应用需要满足以下条件时,SSE是一个很好的选择:
- 只需要服务器向客户端推送数据(不需要客户端频繁发送数据)
- 需要简单可靠的实现方案
- 希望利用现有的HTTP基础设施
- 传输的数据主要是文本格式
- 需要自动重连功能
如果您需要双向实时通信或传输二进制数据,WebSocket可能是更好的选择。
核心价值
1. 技术优势
- 简单易用: 浏览器原生支持,API简单直观,无需第三方库
- 自动重连: 内置断线重连机制,提高了连接的稳定性
- 轻量级: 相比WebSocket,SSE的实现更轻量,服务器资源消耗更小
- 基于HTTP: 可以复用现有的HTTP基础设施,不需要额外的协议支持
- 穿透代理: 更容易穿透防火墙和代理服务器
2. 开发效率
- 快速集成: 几行代码即可实现基础功能
- 调试友好: 可以直接在浏览器开发者工具中查看消息
- 渐进增强: 可以轻松降级到轮询方案
3. 业务价值
- 提升用户体验: 实时推送减少用户等待,提供即时反馈
- 降低服务器压力: 相比轮询,大幅减少HTTP请求数量
- 节省带宽: 只在有数据更新时才推送,避免无效请求
SSE原理详解
工作原理
SSE的工作原理可以用一个简单的比喻来理解:想象你订阅了一份报纸,报社(服务器)会定期把报纸送到你家(客户端)。你不需要每天去报社询问"今天有新报纸吗?"报社会主动把报纸送给你。
SSE的工作流程:
消息格式
SSE使用一种简单的文本格式来传输数据。每条消息由一个或多个字段组成,字段之间用换行符分隔,消息之间用两个换行符分隔。
基本格式:
data: 这是消息内容
data: 这是第二条消息
完整格式(包含所有字段):
id: 消息ID
event: 事件名称
data: 消息数据
retry: 重连时间(毫秒)
字段说明:
data: 消息的实际内容,可以有多行id: 消息的唯一标识符,用于断线重连时恢复event: 自定义事件类型,默认是"message"retry: 建议客户端重连的等待时间(毫秒)
示例:
id: 1
event: userJoined
data: {"username": "张三", "userId": "123"}
id: 2
event: message
data: {"content": "大家好!", "sender": "张三"}
id: 3
data: 这是一条简单消息
连接建立过程
当客户端创建一个EventSource对象时,浏览器会发起一个特殊的HTTP GET请求:
客户端请求:
GET /events HTTP/1.1
Host: example.com
Accept: text/event-stream
Cache-Control: no-cache
服务器响应:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
data: 连接已建立
关键点在于Content-Type: text/event-stream,这告诉浏览器这是一个SSE连接,需要保持打开状态并持续接收数据。
技术架构设计
一个完善的SSE解决方案需要在客户端和服务端都做好设计。
客户端架构
服务端架构
核心功能实现
基础SSE连接
首先,让我们从最简单的SSE连接开始。这是使用SSE的最基础形式。
客户端实现:
// 创建SSE连接
const eventSource = new EventSource('/api/events');
// 监听消息事件
eventSource.onmessage = function(event) {
console.log('收到消息:', event.data);
// 通常服务器发送的是JSON格式
try {
const data = JSON.parse(event.data);
console.log('解析后的数据:', data);
} catch (e) {
console.error('数据解析失败:', e);
}
};
// 监听连接打开事件
eventSource.onopen = function() {
console.log('SSE连接已建立');
};
// 监听错误事件
eventSource.onerror = function(error) {
console.error('SSE连接错误:', error);
// 检查连接状态
if (eventSource.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
} else if (eventSource.readyState === EventSource.CONNECTING) {
console.log('正在重新连接...');
}
};
// 关闭连接(当不再需要时)
// eventSource.close();
服务端实现(Node.js + Express):
const express = require('express');
const app = express();
app.get('/api/events', (req, res) => {
// 设置SSE相关的响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 允许跨域(如果需要)
res.setHeader('Access-Control-Allow-Origin', '*');
// 发送初始连接消息
res.write('data: {"type":"connected","message":"连接成功"}\n\n');
// 每隔3秒发送一次数据
const intervalId = setInterval(() => {
const data = {
type: 'update',
timestamp: new Date().toISOString(),
message: '这是定时推送的消息'
};
res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 3000);
// 客户端断开连接时清理
req.on('close', () => {
clearInterval(intervalId);
console.log('客户端断开连接');
});
});
app.listen(3000, () => {
console.log('SSE服务器运行在 http://localhost:3000');
});
自动重连机制
SSE虽然有内置的自动重连功能,但在实际应用中,我们需要更精细的控制,比如设置重连次数限制、使用指数退避算法等。
class SSEClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxRetries: options.maxRetries || 5, // 最大重试次数
retryInterval: options.retryInterval || 3000, // 初始重试间隔(毫秒)
maxRetryInterval: options.maxRetryInterval || 30000, // 最大重试间隔
...options
};
this.eventSource = null;
this.retryCount = 0;
this.retryTimer = null;
this.isManualClose = false; // 是否手动关闭
}
// 连接到SSE服务器
connect() {
try {
this.eventSource = new EventSource(this.url);
this.setupEventListeners();
this.isManualClose = false;
} catch (error) {
console.error('创建SSE连接失败:', error);
this.handleRetry();
}
}
// 设置事件监听器
setupEventListeners() {
this.eventSource.onopen = () => {
console.log('SSE连接成功');
this.retryCount = 0; // 重置重试计数
if (this.options.onOpen) {
this.options.onOpen();
}
};
this.eventSource.onmessage = (event) => {
if (this.options.onMessage) {
this.options.onMessage(event.data);
}
};
this.eventSource.onerror = (error) => {
console.error('SSE连接错误:', error);
if (this.options.onError) {
this.options.onError(error);
}
// 如果不是手动关闭,则尝试重连
if (!this.isManualClose) {
this.handleRetry();
}
};
}
// 处理重连逻辑
handleRetry() {
// 关闭当前连接
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
// 检查是否超过最大重试次数
if (this.retryCount >= this.options.maxRetries) {
console.error('已达到最大重试次数,停止重连');
if (this.options.onMaxRetriesReached) {
this.options.onMaxRetriesReached();
}
return;
}
// 计算退避时间(指数退避算法)
const backoffTime = Math.min(
this.options.retryInterval * Math.pow(2, this.retryCount),
this.options.maxRetryInterval
);
console.log(`${backoffTime/1000}秒后尝试第${this.retryCount + 1}次重连...`);
this.retryTimer = setTimeout(() => {
this.retryCount++;
this.connect();
}, backoffTime);
}
// 手动关闭连接
close() {
this.isManualClose = true;
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
console.log('SSE连接已关闭');
}
// 获取连接状态
getState() {
if (!this.eventSource) return 'CLOSED';
const states = {
0: 'CONNECTING',
1: 'OPEN',
2: 'CLOSED'
};
return states[this.eventSource.readyState];
}
}
// 使用示例
const sseClient = new SSEClient('/api/events', {
maxRetries: 5,
retryInterval: 3000,
onOpen: () => {
console.log('连接建立成功');
},
onMessage: (data) => {
console.log('收到消息:', data);
},
onError: (error) => {
console.error('发生错误:', error);
},
onMaxRetriesReached: () => {
console.error('重连失败次数过多,请检查网络或联系管理员');
}
});
sseClient.connect();
心跳检测
心跳检测用于及时发现连接是否真正可用。有时候连接虽然没有断开,但实际上已经无法正常收发数据了。
class SSEClientWithHeartbeat extends SSEClient {
constructor(url, options = {}) {
super(url, options);
this.heartbeatInterval = options.heartbeatInterval || 30000; // 心跳间隔
this.heartbeatTimeout = options.heartbeatTimeout || 10000; // 心跳超时时间
this.heartbeatTimer = null;
this.heartbeatTimeoutTimer = null;
this.lastHeartbeatTime = null;
}
// 重写connect方法,添加心跳
connect() {
super.connect();
this.startHeartbeat();
}
// 启动心跳检测
startHeartbeat() {
this.stopHeartbeat(); // 先停止之前的心跳
this.heartbeatTimer = setInterval(() => {
this.checkHeartbeat();
}, this.heartbeatInterval);
}
// 停止心跳检测
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
this.heartbeatTimeoutTimer = null;
}
}
// 检查心跳
checkHeartbeat() {
const now = Date.now();
// 如果距离上次收到消息时间过长,认为连接可能已断
if (this.lastHeartbeatTime &&
now - this.lastHeartbeatTime > this.heartbeatTimeout) {
console.warn('心跳超时,重新连接');
this.handleRetry();
}
}
// 重写setupEventListeners,记录心跳时间
setupEventListeners() {
super.setupEventListeners();
// 添加心跳事件监听
this.eventSource.addEventListener('heartbeat', (event) => {
this.lastHeartbeatTime = Date.now();
console.log('收到心跳:', event.data);
});
// 记录普通消息的接收时间
const originalOnMessage = this.eventSource.onmessage;
this.eventSource.onmessage = (event) => {
this.lastHeartbeatTime = Date.now();
if (originalOnMessage) {
originalOnMessage(event);
}
};
}
// 重写close方法,停止心跳
close() {
this.stopHeartbeat();
super.close();
}
}
// 服务端心跳实现
app.get('/api/events-with-heartbeat', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 每30秒发送一次心跳
const heartbeatId = setInterval(() => {
res.write('event: heartbeat\n');
res.write(`data: ${new Date().toISOString()}\n\n`);
}, 30000);
// 清理资源
req.on('close', () => {
clearInterval(heartbeatId);
});
});
消息处理与分发
在实际应用中,服务器可能会推送不同类型的消息,我们需要根据消息类型进行不同的处理。
class SSEEventManager extends SSEClientWithHeartbeat {
constructor(url, options = {}) {
super(url, options);
this.eventHandlers = new Map(); // 存储各种事件的处理器
}
// 注册事件处理器
on(eventType, handler) {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, []);
}
this.eventHandlers.get(eventType).push(handler);
// 如果连接已经建立,添加监听器
if (this.eventSource) {
this.addEventListenerToSource(eventType, handler);
}
return this; // 支持链式调用
}
// 移除事件处理器
off(eventType, handler) {
if (!this.eventHandlers.has(eventType)) {
return this;
}
const handlers = this.eventHandlers.get(eventType);
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
// 如果该事件类型没有处理器了,删除
if (handlers.length === 0) {
this.eventHandlers.delete(eventType);
}
return this;
}
// 添加监听器到EventSource
addEventListenerToSource(eventType, handler) {
this.eventSource.addEventListener(eventType, (event) => {
try {
const data = JSON.parse(event.data);
handler(data, event);
} catch (e) {
// 如果不是JSON,直接传递原始数据
handler(event.data, event);
}
});
}
// 重写setupEventListeners,添加自定义事件监听
setupEventListeners() {
super.setupEventListeners();
// 为所有已注册的事件添加监听器
this.eventHandlers.forEach((handlers, eventType) => {
handlers.forEach(handler => {
this.addEventListenerToSource(eventType, handler);
});
});
}
}
// 使用示例
const eventManager = new SSEEventManager('/api/events');
// 监听用户加入事件
eventManager.on('userJoined', (data) => {
console.log(`${data.username}加入了聊天室`);
updateUserList(data.userList);
});
// 监听新消息事件
eventManager.on('newMessage', (data) => {
console.log(`收到来自${data.sender}的消息:`, data.content);
displayMessage(data);
});
// 监听系统通知
eventManager.on('notification', (data) => {
showNotification(data.title, data.message);
});
eventManager.connect();
服务端推送不同类型的事件:
// 存储所有连接的客户端
const clients = new Set();
app.get('/api/events', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 添加到客户端列表
clients.add(res);
// 发送欢迎消息
res.write('event: connected\n');
res.write('data: {"message":"欢迎连接"}\n\n');
// 客户端断开时清理
req.on('close', () => {
clients.delete(res);
});
});
// 广播消息的辅助函数
function broadcast(eventType, data) {
const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
clients.forEach(client => {
try {
client.write(message);
} catch (error) {
console.error('发送消息失败:', error);
clients.delete(client);
}
});
}
// 业务逻辑示例
app.post('/api/send-message', (req, res) => {
const { sender, content } = req.body;
// 广播新消息
broadcast('newMessage', {
sender,
content,
timestamp: new Date().toISOString()
});
res.json({ success: true });
});
错误处理与降级
当SSE不可用时(如浏览器不支持或网络问题),我们需要有降级方案。
class RobustSSEClient extends SSEEventManager {
constructor(url, options = {}) {
// 检查浏览器支持
if (!window.EventSource) {
console.warn('浏览器不支持SSE,将使用轮询方案');
return new PollingFallback(url, options);
}
super(url, options);
this.errorCount = 0;
this.errorThreshold = options.errorThreshold || 3;
}
setupEventListeners() {
super.setupEventListeners();
const originalOnError = this.eventSource.onerror;
this.eventSource.onerror = (error) => {
this.errorCount++;
// 错误次数过多,考虑降级
if (this.errorCount >= this.errorThreshold) {
console.warn('SSE连接错误次数过多,考虑使用降级方案');
if (this.options.onDegradation) {
this.options.onDegradation();
}
}
if (originalOnError) {
originalOnError(error);
}
};
}
}
// 轮询降级方案
class PollingFallback {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.pollingInterval = options.pollingInterval || 5000;
this.pollingTimer = null;
this.lastEventId = null;
this.eventHandlers = new Map();
}
connect() {
this.startPolling();
if (this.options.onOpen) {
this.options.onOpen();
}
}
startPolling() {
this.pollingTimer = setInterval(() => {
this.poll();
}, this.pollingInterval);
// 立即执行一次
this.poll();
}
async poll() {
try {
const response = await fetch(this.url, {
headers: {
'Last-Event-ID': this.lastEventId || ''
}
});
const data = await response.json();
if (data.events && Array.isArray(data.events)) {
data.events.forEach(event => {
this.handleEvent(event);
this.lastEventId = event.id;
});
}
} catch (error) {
console.error('轮询请求失败:', error);
if (this.options.onError) {
this.options.onError(error);
}
}
}
handleEvent(event) {
const handlers = this.eventHandlers.get(event.type) || [];
handlers.forEach(handler => {
handler(event.data);
});
}
on(eventType, handler) {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, []);
}
this.eventHandlers.get(eventType).push(handler);
return this;
}
close() {
if (this.pollingTimer) {
clearInterval(this.pollingTimer);
this.pollingTimer = null;
}
}
}
实战应用场景
场景1: 实时股票行情
// 客户端
const stockSSE = new SSEEventManager('/api/stock-prices');
stockSSE
.on('priceUpdate', (data) => {
updateStockPrice(data.symbol, data.price, data.change);
})
.on('alert', (data) => {
showAlert(data.message);
})
.connect();
// 服务端
const stockClients = new Map();
app.get('/api/stock-prices', (req, res) => {
const symbols = req.query.symbols?.split(',') || [];
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 记录客户端关注的股票
stockClients.set(res, symbols);
req.on('close', () => {
stockClients.delete(res);
});
});
// 当股价更新时推送
function onStockPriceUpdate(symbol, price, change) {
stockClients.forEach((symbols, client) => {
if (symbols.includes(symbol)) {
const message = {
symbol,
price,
change,
timestamp: new Date().toISOString()
};
client.write(`event: priceUpdate\n`);
client.write(`data: ${JSON.stringify(message)}\n\n`);
}
});
}
场景2: 实时通知系统
// 客户端
const notificationSSE = new SSEEventManager('/api/notifications');
notificationSSE
.on('notification', (data) => {
showNotification(data);
updateNotificationBadge();
})
.on('read', (data) => {
markAsRead(data.notificationId);
})
.connect();
function showNotification(data) {
// 浏览器通知API
if ('Notification' in window && Notification.permission === 'granted') {
new Notification(data.title, {
body: data.message,
icon: data.icon
});
}
// 页面内通知
const notificationEl = document.createElement('div');
notificationEl.className = 'notification';
notificationEl.textContent = data.message;
document.body.appendChild(notificationEl);
}
场景3: 协作编辑
// 实时同步其他用户的编辑
const collaborationSSE = new SSEEventManager('/api/document/123/changes');
collaborationSSE
.on('userJoined', (data) => {
addUserCursor(data.userId, data.username);
})
.on('userLeft', (data) => {
removeUserCursor(data.userId);
})
.on('contentChange', (data) => {
applyRemoteChange(data);
})
.on('cursorMove', (data) => {
updateUserCursor(data.userId, data.position);
})
.connect();
场景4: 系统监控
// 服务器状态监控
const monitorSSE = new SSEEventManager('/api/system/monitor');
monitorSSE
.on('metrics', (data) => {
updateChart('cpu', data.cpu);
updateChart('memory', data.memory);
updateChart('network', data.network);
})
.on('alert', (data) => {
if (data.level === 'critical') {
showCriticalAlert(data.message);
}
})
.connect();
性能优化
1. 连接复用
不要为每个功能都创建单独的SSE连接,应该复用一个连接并通过事件类型区分。
// ❌ 不好的做法
const stockSSE = new EventSource('/stock-prices');
const newsSSE = new EventSource('/news');
const notificationSSE = new EventSource('/notifications');
// ✅ 好的做法
const eventManager = new SSEEventManager('/api/events');
eventManager
.on('stock', handleStock)
.on('news', handleNews)
.on('notification', handleNotification)
.connect();
2. 消息压缩
对于大量数据,可以在服务端压缩后再发送。
const zlib = require('zlib');
function sendCompressedData(res, data) {
const json = JSON.stringify(data);
const compressed = zlib.gzipSync(json).toString('base64');
res.write(`data: ${compressed}\n\n`);
}
// 客户端解压
eventManager.on('compressedData', (compressed) => {
// 需要引入pako或其他解压库
const json = pako.inflate(atob(compressed), { to: 'string' });
const data = JSON.parse(json);
handleData(data);
});
3. 消息批处理
将多个小消息合并成一个大消息发送,减少网络开销。
class BatchedSSEServer {
constructor() {
this.messageQueue = [];
this.batchInterval = 100; // 100ms批处理一次
setInterval(() => {
this.flushQueue();
}, this.batchInterval);
}
queueMessage(eventType, data) {
this.messageQueue.push({ eventType, data });
}
flushQueue() {
if (this.messageQueue.length === 0) return;
const batch = {
count: this.messageQueue.length,
messages: this.messageQueue
};
broadcast('batch', batch);
this.messageQueue = [];
}
}
4. 按需订阅
只订阅用户感兴趣的数据,减少不必要的推送。
app.get('/api/events', (req, res) => {
const userId = req.query.userId;
const subscriptions = req.query.subscriptions?.split(',') || [];
const client = {
response: res,
userId,
subscriptions
};
clients.add(client);
// 只推送用户订阅的内容
req.on('close', () => {
clients.delete(client);
});
});
function broadcast(eventType, data, filter) {
clients.forEach(client => {
// 检查客户端是否订阅了这个事件类型
if (client.subscriptions.includes(eventType)) {
if (!filter || filter(client, data)) {
client.response.write(`event: ${eventType}\n`);
client.response.write(`data: ${JSON.stringify(data)}\n\n`);
}
}
});
}
最佳实践
1. 安全性
使用HTTPS
// 生产环境必须使用HTTPS
const sseUrl = process.env.NODE_ENV === 'production'
? 'https://example.com/events'
: 'http://localhost:3000/events';
身份认证
// 客户端:在URL中传递token
const eventSource = new EventSource(`/api/events?token=${authToken}`);
// 或使用withCredentials传递cookie
const eventSource = new EventSource('/api/events', {
withCredentials: true
});
// 服务端:验证token
app.get('/api/events', (req, res) => {
const token = req.query.token || req.cookies.token;
if (!verifyToken(token)) {
res.status(401).send('Unauthorized');
return;
}
// ... SSE逻辑
});
权限控制
function broadcast(eventType, data, requiredPermission) {
clients.forEach(client => {
if (hasPermission(client.userId, requiredPermission)) {
client.response.write(`event: ${eventType}\n`);
client.response.write(`data: ${JSON.stringify(data)}\n\n`);
}
});
}
2. 错误处理
完善的错误处理机制
class ErrorHandledSSE extends SSEEventManager {
constructor(url, options = {}) {
super(url, {
...options,
onError: (error) => {
// 记录错误
this.logError(error);
// 通知用户
this.notifyUser(error);
// 调用用户的错误处理器
if (options.onError) {
options.onError(error);
}
}
});
}
logError(error) {
// 发送到错误监控服务
if (window.errorTracker) {
window.errorTracker.captureException(error);
}
console.error('SSE错误:', error);
}
notifyUser(error) {
// 友好的用户提示
showToast('连接出现问题,正在尝试重新连接...');
}
}
3. 状态管理
维护连接状态
class StatefulSSE extends SSEEventManager {
constructor(url, options = {}) {
super(url, options);
this.state = {
connected: false,
lastMessageTime: null,
messageCount: 0,
errors: []
};
}
setupEventListeners() {
super.setupEventListeners();
this.eventSource.onopen = () => {
this.state.connected = true;
this.state.errors = [];
this.emitStateChange();
};
this.eventSource.onmessage = (event) => {
this.state.lastMessageTime = Date.now();
this.state.messageCount++;
this.emitStateChange();
};
this.eventSource.onerror = (error) => {
this.state.connected = false;
this.state.errors.push(error);
this.emitStateChange();
};
}
emitStateChange() {
if (this.options.onStateChange) {
this.options.onStateChange(this.state);
}
}
getState() {
return { ...this.state };
}
}
4. 内存管理
及时清理资源
// 页面卸载时关闭连接
window.addEventListener('beforeunload', () => {
if (sseClient) {
sseClient.close();
}
});
// SPA路由切换时
router.beforeEach((to, from, next) => {
// 离开需要SSE的页面时关闭连接
if (from.meta.requiresSSE && !to.meta.requiresSSE) {
sseClient.close();
}
next();
});
// React组件中
useEffect(() => {
const sse = new SSEEventManager('/api/events');
sse.connect();
return () => {
sse.close(); // 组件卸载时清理
};
}, []);
完整代码示例
生产级SSE客户端
/**
* 生产级SSE客户端
* 包含完整的重连、心跳、错误处理、事件管理功能
*/
class ProductionSSEClient {
constructor(url, options = {}) {
// 检查浏览器支持
if (!window.EventSource) {
throw new Error('浏览器不支持SSE');
}
this.url = url;
this.options = {
maxRetries: 5,
retryInterval: 3000,
maxRetryInterval: 30000,
heartbeatInterval: 30000,
heartbeatTimeout: 10000,
...options
};
// 状态
this.eventSource = null;
this.retryCount = 0;
this.retryTimer = null;
this.isManualClose = false;
this.heartbeatTimer = null;
this.lastHeartbeatTime = null;
// 事件处理器
this.eventHandlers = new Map();
// 状态回调
this.onOpenCallback = null;
this.onErrorCallback = null;
this.onCloseCallback = null;
}
/**
* 连接到SSE服务器
*/
connect() {
try {
this.eventSource = new EventSource(this.url);
this.setupEventListeners();
this.startHeartbeat();
this.isManualClose = false;
} catch (error) {
console.error('创建SSE连接失败:', error);
this.handleRetry();
}
return this;
}
/**
* 设置事件监听器
*/
setupEventListeners() {
// 连接打开
this.eventSource.onopen = () => {
console.log('SSE连接成功');
this.retryCount = 0;
this.lastHeartbeatTime = Date.now();
if (this.onOpenCallback) {
this.onOpenCallback();
}
};
// 收到消息
this.eventSource.onmessage = (event) => {
this.lastHeartbeatTime = Date.now();
this.handleMessage('message', event);
};
// 连接错误
this.eventSource.onerror = (error) => {
console.error('SSE连接错误:', error);
if (this.onErrorCallback) {
this.onErrorCallback(error);
}
if (!this.isManualClose) {
this.handleRetry();
}
};
// 注册所有自定义事件
this.eventHandlers.forEach((handlers, eventType) => {
this.eventSource.addEventListener(eventType, (event) => {
this.lastHeartbeatTime = Date.now();
this.handleMessage(eventType, event);
});
});
}
/**
* 处理消息
*/
handleMessage(eventType, event) {
const handlers = this.eventHandlers.get(eventType) || [];
handlers.forEach(handler => {
try {
// 尝试解析JSON
let data;
try {
data = JSON.parse(event.data);
} catch (e) {
data = event.data;
}
handler(data, event);
} catch (error) {
console.error(`事件处理器错误[${eventType}]:`, error);
}
});
}
/**
* 处理重连
*/
handleRetry() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
this.stopHeartbeat();
if (this.retryCount >= this.options.maxRetries) {
console.error('已达到最大重试次数');
if (this.onCloseCallback) {
this.onCloseCallback('MAX_RETRIES_REACHED');
}
return;
}
const backoffTime = Math.min(
this.options.retryInterval * Math.pow(2, this.retryCount),
this.options.maxRetryInterval
);
console.log(`${backoffTime/1000}秒后重连(${this.retryCount + 1}/${this.options.maxRetries})`);
this.retryTimer = setTimeout(() => {
this.retryCount++;
this.connect();
}, backoffTime);
}
/**
* 启动心跳检测
*/
startHeartbeat() {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
const now = Date.now();
if (this.lastHeartbeatTime &&
now - this.lastHeartbeatTime > this.options.heartbeatTimeout) {
console.warn('心跳超时,重新连接');
this.handleRetry();
}
}, this.options.heartbeatInterval);
}
/**
* 停止心跳检测
*/
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
/**
* 注册事件处理器
*/
on(eventType, handler) {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, []);
}
this.eventHandlers.get(eventType).push(handler);
// 如果已连接,添加监听
if (this.eventSource && this.eventSource.readyState === EventSource.OPEN) {
this.eventSource.addEventListener(eventType, (event) => {
this.handleMessage(eventType, event);
});
}
return this;
}
/**
* 移除事件处理器
*/
off(eventType, handler) {
if (!this.eventHandlers.has(eventType)) {
return this;
}
const handlers = this.eventHandlers.get(eventType);
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
if (handlers.length === 0) {
this.eventHandlers.delete(eventType);
}
return this;
}
/**
* 关闭连接
*/
close() {
this.isManualClose = true;
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
this.stopHeartbeat();
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.onCloseCallback) {
this.onCloseCallback('MANUAL_CLOSE');
}
}
/**
* 获取连接状态
*/
getState() {
if (!this.eventSource) return 'CLOSED';
const states = {
[EventSource.CONNECTING]: 'CONNECTING',
[EventSource.OPEN]: 'OPEN',
[EventSource.CLOSED]: 'CLOSED'
};
return states[this.eventSource.readyState];
}
/**
* 设置连接打开回调
*/
onOpen(callback) {
this.onOpenCallback = callback;
return this;
}
/**
* 设置错误回调
*/
onError(callback) {
this.onErrorCallback = callback;
return this;
}
/**
* 设置关闭回调
*/
onClose(callback) {
this.onCloseCallback = callback;
return this;
}
}
// 使用示例
const sseClient = new ProductionSSEClient('/api/events', {
maxRetries: 5,
retryInterval: 3000,
heartbeatInterval: 30000
});
sseClient
.onOpen(() => {
console.log('连接已建立');
})
.onError((error) => {
console.error('发生错误:', error);
})
.onClose((reason) => {
console.log('连接已关闭:', reason);
})
.on('notification', (data) => {
console.log('收到通知:', data);
})
.on('update', (data) => {
console.log('收到更新:', data);
})
.connect();
生产级SSE服务端(Node.js)
const express = require('express');
const app = express();
/**
* SSE客户端管理器
*/
class SSEClientManager {
constructor() {
this.clients = new Map();
this.heartbeatInterval = 30000;
this.startHeartbeat();
}
/**
* 添加客户端
*/
addClient(id, res, metadata = {}) {
const client = {
id,
res,
metadata,
addedAt: Date.now()
};
this.clients.set(id, client);
console.log(`客户端${id}已连接,当前总数:${this.clients.size}`);
return client;
}
/**
* 移除客户端
*/
removeClient(id) {
if (this.clients.has(id)) {
this.clients.delete(id);
console.log(`客户端${id}已断开,当前总数:${this.clients.size}`);
}
}
/**
* 向单个客户端发送消息
*/
send(clientId, eventType, data, id) {
const client = this.clients.get(clientId);
if (!client) {
return false;
}
try {
if (id) {
client.res.write(`id: ${id}\n`);
}
if (eventType) {
client.res.write(`event: ${eventType}\n`);
}
client.res.write(`data: ${JSON.stringify(data)}\n\n`);
return true;
} catch (error) {
console.error(`发送消息到客户端${clientId}失败:`, error);
this.removeClient(clientId);
return false;
}
}
/**
* 广播消息给所有客户端
*/
broadcast(eventType, data, filter) {
let successCount = 0;
this.clients.forEach((client, clientId) => {
if (!filter || filter(client)) {
if (this.send(clientId, eventType, data)) {
successCount++;
}
}
});
return successCount;
}
/**
* 启动心跳
*/
startHeartbeat() {
setInterval(() => {
this.broadcast('heartbeat', {
timestamp: new Date().toISOString()
});
}, this.heartbeatInterval);
}
/**
* 获取客户端信息
*/
getClientInfo(clientId) {
return this.clients.get(clientId);
}
/**
* 获取所有客户端
*/
getAllClients() {
return Array.from(this.clients.values());
}
/**
* 获取客户端数量
*/
getClientCount() {
return this.clients.size;
}
}
// 创建客户端管理器
const sseManager = new SSEClientManager();
/**
* SSE连接端点
*/
app.get('/api/events', (req, res) => {
// 设置SSE响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // 禁用Nginx缓冲
// CORS支持
res.setHeader('Access-Control-Allow-Origin', '*');
// 生成客户端ID
const clientId = `client-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// 获取客户端元数据
const metadata = {
userAgent: req.headers['user-agent'],
ip: req.ip,
query: req.query
};
// 添加客户端
sseManager.addClient(clientId, res, metadata);
// 发送欢迎消息
sseManager.send(clientId, 'connected', {
clientId,
message: '连接成功',
timestamp: new Date().toISOString()
});
// 客户端断开处理
req.on('close', () => {
sseManager.removeClient(clientId);
});
});
/**
* 发送通知的API
*/
app.post('/api/send-notification', express.json(), (req, res) => {
const { message, targetClientId } = req.body;
if (targetClientId) {
// 发送给特定客户端
const success = sseManager.send(targetClientId, 'notification', {
message,
timestamp: new Date().toISOString()
});
res.json({ success });
} else {
// 广播给所有客户端
const count = sseManager.broadcast('notification', {
message,
timestamp: new Date().toISOString()
});
res.json({ success: true, receiverCount: count });
}
});
/**
* 获取在线客户端数量
*/
app.get('/api/stats', (req, res) => {
res.json({
clientCount: sseManager.getClientCount(),
clients: sseManager.getAllClients().map(c => ({
id: c.id,
connectedAt: c.addedAt,
metadata: c.metadata
}))
});
});
// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`SSE服务器运行在端口${PORT}`);
});
总结
SSE是一种简单而强大的服务器推送技术,特别适合需要服务器主动推送数据但不需要客户端频繁发送数据的场景。
SSE的优势:
- 浏览器原生支持,使用简单
- 基于HTTP协议,兼容性好
- 自动重连机制
- 轻量级实现
SSE的局限:
- 单向通信(仅服务器到客户端)
- 只支持文本数据
- 某些代理服务器可能有兼容性问题
- 浏览器对同域名的SSE连接数有限制(通常6个)
选择SSE的场景:
- 实时通知系统
- 股票行情、体育比分等实时数据展示
- 服务器状态监控
- 进度追踪
- 日志流显示
- 协作应用中的状态同步
不适合SSE的场景:
- 需要双向频繁通信(考虑WebSocket)
- 需要传输二进制数据(考虑WebSocket)
- 需要低延迟的游戏或视频通话(考虑WebRTC)
通过本文提供的完整解决方案,您可以在项目中快速实现一个生产级的SSE系统,为用户提供流畅的实时体验。