SSE服务器推送事件
概述
什么是SSE
Server-Sent Events (SSE) 是一种Web标准,允许服务器向客户端推送实时数据。它是一种基于HTTP的单向通信技术,特别适合需要服务器主动推送数据的场景。
SSE的优势
- 简单易用:基于标准HTTP协议,无需额外协议
- 自动重连:浏览器自动处理连接断开和重连
- 轻量级:相比WebSocket更轻量,适合单向数据推送
- 兼容性好:现代浏览器广泛支持
- 易于调试:使用标准HTTP工具即可调试
适用场景
- 实时通知推送
- 股票价格更新
- 社交媒体动态
- 系统状态监控
- 日志实时查看
- 进度条更新
SSE工作原理
基本概念
SSE基于HTTP协议,使用特殊的响应头和数据格式:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: 这是第一条消息
data: 这是第二条消息
event: custom-event
data: 自定义事件消息
id: 123
data: 带ID的消息
数据格式
SSE使用特定的数据格式,每条消息以data:开头:
data: 消息内容
data: 多行消息
data: 第二行内容
event: 事件类型
data: 事件数据
id: 消息ID
data: 消息内容
: 注释行(客户端忽略)
连接机制
- 建立连接:客户端发起HTTP请求,服务器响应
text/event-stream - 保持连接:连接保持开放状态,服务器可以持续发送数据
- 自动重连:连接断开时,浏览器自动尝试重连
- 事件处理:客户端通过事件监听器处理接收到的数据
SSE API详解
基本用法
// 创建EventSource连接
const eventSource = new EventSource('/api/events');
// 监听消息
eventSource.onmessage = function(event) {
console.log('收到消息:', event.data);
console.log('消息ID:', event.lastEventId);
console.log('事件类型:', event.type);
};
// 监听连接打开
eventSource.onopen = function(event) {
console.log('连接已建立');
};
// 监听错误
eventSource.onerror = function(event) {
console.error('连接错误:', event);
if (event.target.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
}
};
// 关闭连接
eventSource.close();
事件类型处理
// 监听自定义事件
eventSource.addEventListener('notification', function(event) {
console.log('通知事件:', event.data);
});
eventSource.addEventListener('update', function(event) {
console.log('更新事件:', event.data);
});
// 移除事件监听器
const handler = function(event) {
console.log('处理事件:', event.data);
};
eventSource.addEventListener('custom-event', handler);
eventSource.removeEventListener('custom-event', handler);
连接状态管理
// 检查连接状态
function checkConnectionStatus(eventSource) {
switch (eventSource.readyState) {
case EventSource.CONNECTING:
console.log('连接中...');
break;
case EventSource.OPEN:
console.log('连接已建立');
break;
case EventSource.CLOSED:
console.log('连接已关闭');
break;
}
}
// 重连策略
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;
eventSource.onerror = function(event) {
if (event.target.readyState === EventSource.CLOSED) {
if (reconnectAttempts < maxReconnectAttempts) {
setTimeout(() => {
console.log(`尝试重连 (${reconnectAttempts + 1}/${maxReconnectAttempts})`);
reconnectAttempts++;
// 重新创建连接
initSSEConnection();
}, 1000 * Math.pow(2, reconnectAttempts)); // 指数退避
}
}
};
服务器端实现
Node.js实现
const express = require('express');
const app = express();
app.get('/api/events', (req, res) => {
// 设置SSE响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
});
// 发送初始连接消息
res.write('data: 连接已建立\n\n');
// 模拟实时数据推送
const interval = setInterval(() => {
const data = {
timestamp: new Date().toISOString(),
message: `实时更新 ${Date.now()}`,
type: 'update'
};
res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 1000);
// 处理客户端断开连接
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
app.listen(3000, () => {
console.log('SSE服务器运行在端口3000');
});
自定义事件发送
// 发送自定义事件
function sendCustomEvent(res, eventName, data) {
res.write(`event: ${eventName}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
// 发送带ID的消息
function sendMessageWithId(res, id, data) {
res.write(`id: ${id}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
// 发送注释
function sendComment(res, comment) {
res.write(`: ${comment}\n`);
}
// 使用示例
app.get('/api/notifications', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 发送不同类型的通知
setTimeout(() => {
sendCustomEvent(res, 'info', { message: '系统信息' });
}, 1000);
setTimeout(() => {
sendCustomEvent(res, 'warning', { message: '系统警告' });
}, 2000);
setTimeout(() => {
sendCustomEvent(res, 'error', { message: '系统错误' });
}, 3000);
});
错误处理和重连
// 服务器端错误处理
app.get('/api/robust-events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
let messageCount = 0;
const maxMessages = 100;
const sendMessage = () => {
try {
if (messageCount >= maxMessages) {
res.write('event: complete\ndata: 消息发送完成\n\n');
res.end();
return;
}
const data = {
id: messageCount,
timestamp: new Date().toISOString(),
message: `消息 ${messageCount}`,
count: messageCount
};
res.write(`id: ${data.id}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
messageCount++;
// 检查连接状态
if (res.destroyed) {
return;
}
setTimeout(sendMessage, 1000);
} catch (error) {
console.error('发送消息错误:', error);
res.write(`event: error\ndata: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
};
sendMessage();
req.on('close', () => {
console.log('客户端断开连接');
});
});
实际应用场景
实时通知系统
// 客户端实现
class NotificationService {
constructor() {
this.eventSource = null;
this.notifications = [];
this.listeners = new Map();
}
connect() {
this.eventSource = new EventSource('/api/notifications');
this.eventSource.onmessage = (event) => {
const notification = JSON.parse(event.data);
this.notifications.push(notification);
this.notifyListeners('new-notification', notification);
};
this.eventSource.addEventListener('notification-read', (event) => {
const data = JSON.parse(event.data);
this.markAsRead(data.id);
});
}
addListener(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
}
notifyListeners(event, data) {
const callbacks = this.listeners.get(event) || [];
callbacks.forEach(callback => callback(data));
}
markAsRead(id) {
const notification = this.notifications.find(n => n.id === id);
if (notification) {
notification.read = true;
this.notifyListeners('notification-updated', notification);
}
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
// 使用示例
const notificationService = new NotificationService();
notificationService.addListener('new-notification', (notification) => {
showNotification(notification);
});
notificationService.connect();
实时数据监控
// 系统监控仪表板
class SystemMonitor {
constructor() {
this.eventSource = null;
this.metrics = {};
this.charts = new Map();
}
startMonitoring() {
this.eventSource = new EventSource('/api/system-metrics');
this.eventSource.addEventListener('cpu-usage', (event) => {
const data = JSON.parse(event.data);
this.updateMetric('cpu', data.value);
this.updateChart('cpu-chart', data);
});
this.eventSource.addEventListener('memory-usage', (event) => {
const data = JSON.parse(event.data);
this.updateMetric('memory', data.value);
this.updateChart('memory-chart', data);
});
this.eventSource.addEventListener('network-traffic', (event) => {
const data = JSON.parse(event.data);
this.updateMetric('network', data.value);
this.updateChart('network-chart', data);
});
}
updateMetric(type, value) {
this.metrics[type] = value;
this.updateUI(type, value);
}
updateChart(chartId, data) {
const chart = this.charts.get(chartId);
if (chart) {
chart.addData(data);
}
}
updateUI(type, value) {
const element = document.getElementById(`${type}-value`);
if (element) {
element.textContent = `${value}%`;
element.className = this.getStatusClass(value);
}
}
getStatusClass(value) {
if (value < 50) return 'status-ok';
if (value < 80) return 'status-warning';
return 'status-critical';
}
}
实时聊天应用
// 聊天室实现
class ChatRoom {
constructor(roomId) {
this.roomId = roomId;
this.eventSource = null;
this.messages = [];
this.users = new Set();
}
join() {
this.eventSource = new EventSource(`/api/chat/${this.roomId}`);
this.eventSource.addEventListener('message', (event) => {
const message = JSON.parse(event.data);
this.addMessage(message);
});
this.eventSource.addEventListener('user-joined', (event) => {
const user = JSON.parse(event.data);
this.users.add(user.username);
this.updateUserList();
});
this.eventSource.addEventListener('user-left', (event) => {
const user = JSON.parse(event.data);
this.users.delete(user.username);
this.updateUserList();
});
}
addMessage(message) {
this.messages.push(message);
this.displayMessage(message);
}
displayMessage(message) {
const messageElement = document.createElement('div');
messageElement.className = 'message';
messageElement.innerHTML = `
<span class="username">${message.username}</span>
<span class="timestamp">${new Date(message.timestamp).toLocaleTimeString()}</span>
<span class="content">${message.content}</span>
`;
document.getElementById('messages').appendChild(messageElement);
this.scrollToBottom();
}
updateUserList() {
const userList = document.getElementById('users');
userList.innerHTML = '';
this.users.forEach(username => {
const userElement = document.createElement('li');
userElement.textContent = username;
userList.appendChild(userElement);
});
}
scrollToBottom() {
const messagesContainer = document.getElementById('messages');
messagesContainer.scrollTop = messagesContainer.scrollHeight;
}
}
性能优化策略
连接管理优化
// 智能重连策略
class SmartEventSource {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxRetries: 5,
retryDelay: 1000,
backoffMultiplier: 2,
...options
};
this.retryCount = 0;
this.retryTimeout = null;
this.eventSource = null;
this.listeners = new Map();
}
connect() {
try {
this.eventSource = new EventSource(this.url);
this.setupEventHandlers();
this.retryCount = 0;
} catch (error) {
console.error('连接失败:', error);
this.scheduleRetry();
}
}
setupEventHandlers() {
this.eventSource.onopen = () => {
console.log('SSE连接已建立');
this.retryCount = 0;
};
this.eventSource.onerror = (event) => {
if (this.eventSource.readyState === EventSource.CLOSED) {
this.scheduleRetry();
}
};
this.eventSource.onmessage = (event) => {
this.notifyListeners('message', event);
};
}
scheduleRetry() {
if (this.retryCount < this.options.maxRetries) {
const delay = this.options.retryDelay * Math.pow(this.options.backoffMultiplier, this.retryCount);
this.retryTimeout = setTimeout(() => {
this.retryCount++;
console.log(`尝试重连 (${this.retryCount}/${this.options.maxRetries})`);
this.connect();
}, delay);
} else {
console.error('达到最大重连次数');
this.notifyListeners('max-retries-reached');
}
}
addEventListener(type, callback) {
if (!this.listeners.has(type)) {
this.listeners.set(type, []);
}
this.listeners.get(type).push(callback);
}
notifyListeners(type, data) {
const callbacks = this.listeners.get(type) || [];
callbacks.forEach(callback => callback(data));
}
disconnect() {
if (this.retryTimeout) {
clearTimeout(this.retryTimeout);
}
if (this.eventSource) {
this.eventSource.close();
}
}
}
数据处理优化
// 消息批处理
class MessageBatcher {
constructor(batchSize = 10, batchTimeout = 1000) {
this.batchSize = batchSize;
this.batchTimeout = batchTimeout;
this.messages = [];
this.batchTimer = null;
this.processCallback = null;
}
addMessage(message) {
this.messages.push(message);
if (this.messages.length >= this.batchSize) {
this.processBatch();
} else if (this.messages.length === 1) {
this.startBatchTimer();
}
}
startBatchTimer() {
this.batchTimer = setTimeout(() => {
this.processBatch();
}, this.batchTimeout);
}
processBatch() {
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}
if (this.messages.length > 0 && this.processCallback) {
const batch = [...this.messages];
this.messages = [];
this.processCallback(batch);
}
}
onBatchProcess(callback) {
this.processCallback = callback;
}
}
// 使用示例
const messageBatcher = new MessageBatcher(5, 500);
messageBatcher.onBatchProcess((messages) => {
console.log('处理消息批次:', messages);
// 批量更新UI
updateUIWithBatch(messages);
});
// 在SSE消息处理中使用
eventSource.onmessage = (event) => {
const message = JSON.parse(event.data);
messageBatcher.addMessage(message);
};
内存管理优化
// 消息历史管理
class MessageHistory {
constructor(maxMessages = 1000) {
this.maxMessages = maxMessages;
this.messages = [];
this.messageIds = new Set();
}
addMessage(message) {
// 检查是否重复
if (message.id && this.messageIds.has(message.id)) {
return false;
}
this.messages.push(message);
if (message.id) {
this.messageIds.add(message.id);
}
// 限制消息数量
if (this.messages.length > this.maxMessages) {
const removed = this.messages.shift();
if (removed.id) {
this.messageIds.delete(removed.id);
}
}
return true;
}
getMessages(limit = 50, offset = 0) {
return this.messages.slice(offset, offset + limit);
}
searchMessages(query) {
return this.messages.filter(message =>
message.content && message.content.includes(query)
);
}
clear() {
this.messages = [];
this.messageIds.clear();
}
}
错误处理和调试
常见错误类型
// 错误处理工具
class SSEErrorHandler {
static handleConnectionError(error, eventSource) {
console.error('SSE连接错误:', error);
switch (error.type) {
case 'network':
console.log('网络错误,尝试重连...');
break;
case 'timeout':
console.log('连接超时,检查服务器状态...');
break;
case 'server':
console.log('服务器错误,等待恢复...');
break;
default:
console.log('未知错误类型');
}
}
static validateMessage(data) {
try {
const parsed = JSON.parse(data);
if (!parsed.timestamp || !parsed.content) {
throw new Error('消息格式无效');
}
return parsed;
} catch (error) {
console.error('消息解析错误:', error);
return null;
}
}
static logConnectionStatus(eventSource) {
const status = {
readyState: eventSource.readyState,
url: eventSource.url,
withCredentials: eventSource.withCredentials
};
console.log('连接状态:', status);
return status;
}
}
调试工具
// SSE调试器
class SSEDebugger {
constructor(eventSource) {
this.eventSource = eventSource;
this.logs = [];
this.maxLogs = 100;
this.enabled = true;
}
enable() {
this.enabled = true;
this.setupDebugging();
}
disable() {
this.enabled = false;
}
setupDebugging() {
if (!this.enabled) return;
// 监听所有事件
this.eventSource.addEventListener('open', (event) => {
this.log('连接已建立', event);
});
this.eventSource.addEventListener('message', (event) => {
this.log('收到消息', event);
});
this.eventSource.addEventListener('error', (event) => {
this.log('连接错误', event);
});
// 重写close方法
const originalClose = this.eventSource.close;
this.eventSource.close = () => {
this.log('连接已关闭');
originalClose.call(this.eventSource);
};
}
log(message, data = null) {
if (!this.enabled) return;
const logEntry = {
timestamp: new Date().toISOString(),
message,
data,
readyState: this.eventSource.readyState
};
this.logs.push(logEntry);
if (this.logs.length > this.maxLogs) {
this.logs.shift();
}
console.log(`[SSE Debug] ${message}`, data);
}
getLogs() {
return [...this.logs];
}
exportLogs() {
return JSON.stringify(this.logs, null, 2);
}
clearLogs() {
this.logs = [];
}
}
// 使用示例
const eventSource = new EventSource('/api/events');
const debugger = new SSEDebugger(eventSource);
debugger.enable();
// 在控制台查看日志
console.log('SSE日志:', debugger.getLogs());
最佳实践
设计原则
- 单一职责:每个SSE连接只负责一种类型的数据推送
- 错误恢复:实现智能重连和错误处理机制
- 资源管理:及时清理不需要的连接和监听器
- 性能监控:监控连接状态和消息处理性能
安全考虑
// 安全配置
const secureEventSource = new EventSource('/api/secure-events', {
withCredentials: true // 发送认证信息
});
// 服务器端安全验证
app.get('/api/secure-events', authenticateUser, (req, res) => {
// 验证用户身份
if (!req.user) {
res.status(401).end();
return;
}
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 只发送用户相关的数据
const userData = getUserData(req.user.id);
res.write(`data: ${JSON.stringify(userData)}\n\n`);
});
性能优化建议
- 批量处理:将多个小消息合并为批次处理
- 连接复用:在可能的情况下复用SSE连接
- 消息压缩:对大量文本数据进行压缩
- 缓存策略:合理使用缓存减少重复数据传输
与其他技术对比
SSE vs WebSocket
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务器到客户端) | 双向 |
| 协议 | HTTP | 自定义协议 |
| 自动重连 | 是 | 需要手动实现 |
| 实现复杂度 | 简单 | 复杂 |
| 浏览器支持 | 广泛 | 广泛 |
| 适用场景 | 实时通知、数据推送 | 实时聊天、游戏 |
SSE vs 轮询
| 特性 | SSE | 轮询 |
|---|---|---|
| 实时性 | 高 | 低 |
| 服务器负载 | 低 | 高 |
| 网络开销 | 低 | 高 |
| 实现复杂度 | 简单 | 简单 |
| 浏览器支持 | 现代浏览器 | 所有浏览器 |
总结
SSE服务器推送事件是一种简单而强大的实时通信技术,特别适合需要服务器主动推送数据的场景。通过本文的学习,你应该能够:
- 理解SSE原理:掌握SSE的工作原理和通信机制
- 实现SSE应用:能够在前端和后端实现SSE功能
- 优化性能:了解SSE性能优化的各种策略
- 处理错误:掌握SSE错误处理和调试方法
- 选择技术:根据具体需求选择合适的实时通信技术
SSE是现代Web应用中实现实时功能的重要工具,掌握它将为你的应用开发带来更多可能性。在实际项目中,建议结合具体需求,选择最适合的实时通信方案,并注意性能优化和错误处理,确保应用的稳定性和用户体验。