跳到主要内容

SSE服务器推送事件

概述

什么是SSE

Server-Sent Events (SSE) 是一种Web标准,允许服务器向客户端推送实时数据。它是一种基于HTTP的单向通信技术,特别适合需要服务器主动推送数据的场景。

SSE的优势

  1. 简单易用:基于标准HTTP协议,无需额外协议
  2. 自动重连:浏览器自动处理连接断开和重连
  3. 轻量级:相比WebSocket更轻量,适合单向数据推送
  4. 兼容性好:现代浏览器广泛支持
  5. 易于调试:使用标准HTTP工具即可调试

适用场景

  • 实时通知推送
  • 股票价格更新
  • 社交媒体动态
  • 系统状态监控
  • 日志实时查看
  • 进度条更新

SSE工作原理

基本概念

SSE基于HTTP协议,使用特殊的响应头和数据格式:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

data: 这是第一条消息

data: 这是第二条消息

event: custom-event
data: 自定义事件消息

id: 123
data: 带ID的消息

数据格式

SSE使用特定的数据格式,每条消息以data:开头:

data: 消息内容

data: 多行消息
data: 第二行内容

event: 事件类型
data: 事件数据

id: 消息ID
data: 消息内容

: 注释行(客户端忽略)

连接机制

  1. 建立连接:客户端发起HTTP请求,服务器响应text/event-stream
  2. 保持连接:连接保持开放状态,服务器可以持续发送数据
  3. 自动重连:连接断开时,浏览器自动尝试重连
  4. 事件处理:客户端通过事件监听器处理接收到的数据

SSE API详解

基本用法

// 创建EventSource连接
const eventSource = new EventSource('/api/events');

// 监听消息
eventSource.onmessage = function(event) {
console.log('收到消息:', event.data);
console.log('消息ID:', event.lastEventId);
console.log('事件类型:', event.type);
};

// 监听连接打开
eventSource.onopen = function(event) {
console.log('连接已建立');
};

// 监听错误
eventSource.onerror = function(event) {
console.error('连接错误:', event);
if (event.target.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
}
};

// 关闭连接
eventSource.close();

事件类型处理

// 监听自定义事件
eventSource.addEventListener('notification', function(event) {
console.log('通知事件:', event.data);
});

eventSource.addEventListener('update', function(event) {
console.log('更新事件:', event.data);
});

// 移除事件监听器
const handler = function(event) {
console.log('处理事件:', event.data);
};
eventSource.addEventListener('custom-event', handler);
eventSource.removeEventListener('custom-event', handler);

连接状态管理

// 检查连接状态
function checkConnectionStatus(eventSource) {
switch (eventSource.readyState) {
case EventSource.CONNECTING:
console.log('连接中...');
break;
case EventSource.OPEN:
console.log('连接已建立');
break;
case EventSource.CLOSED:
console.log('连接已关闭');
break;
}
}

// 重连策略
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;

eventSource.onerror = function(event) {
if (event.target.readyState === EventSource.CLOSED) {
if (reconnectAttempts < maxReconnectAttempts) {
setTimeout(() => {
console.log(`尝试重连 (${reconnectAttempts + 1}/${maxReconnectAttempts})`);
reconnectAttempts++;
// 重新创建连接
initSSEConnection();
}, 1000 * Math.pow(2, reconnectAttempts)); // 指数退避
}
}
};

服务器端实现

Node.js实现

const express = require('express');
const app = express();

app.get('/api/events', (req, res) => {
// 设置SSE响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
});

// 发送初始连接消息
res.write('data: 连接已建立\n\n');

// 模拟实时数据推送
const interval = setInterval(() => {
const data = {
timestamp: new Date().toISOString(),
message: `实时更新 ${Date.now()}`,
type: 'update'
};

res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 1000);

// 处理客户端断开连接
req.on('close', () => {
clearInterval(interval);
res.end();
});
});

app.listen(3000, () => {
console.log('SSE服务器运行在端口3000');
});

自定义事件发送

// 发送自定义事件
function sendCustomEvent(res, eventName, data) {
res.write(`event: ${eventName}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
}

// 发送带ID的消息
function sendMessageWithId(res, id, data) {
res.write(`id: ${id}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
}

// 发送注释
function sendComment(res, comment) {
res.write(`: ${comment}\n`);
}

// 使用示例
app.get('/api/notifications', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});

// 发送不同类型的通知
setTimeout(() => {
sendCustomEvent(res, 'info', { message: '系统信息' });
}, 1000);

setTimeout(() => {
sendCustomEvent(res, 'warning', { message: '系统警告' });
}, 2000);

setTimeout(() => {
sendCustomEvent(res, 'error', { message: '系统错误' });
}, 3000);
});

错误处理和重连

// 服务器端错误处理
app.get('/api/robust-events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});

let messageCount = 0;
const maxMessages = 100;

const sendMessage = () => {
try {
if (messageCount >= maxMessages) {
res.write('event: complete\ndata: 消息发送完成\n\n');
res.end();
return;
}

const data = {
id: messageCount,
timestamp: new Date().toISOString(),
message: `消息 ${messageCount}`,
count: messageCount
};

res.write(`id: ${data.id}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
messageCount++;

// 检查连接状态
if (res.destroyed) {
return;
}

setTimeout(sendMessage, 1000);
} catch (error) {
console.error('发送消息错误:', error);
res.write(`event: error\ndata: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
};

sendMessage();

req.on('close', () => {
console.log('客户端断开连接');
});
});

实际应用场景

实时通知系统

// 客户端实现
class NotificationService {
constructor() {
this.eventSource = null;
this.notifications = [];
this.listeners = new Map();
}

connect() {
this.eventSource = new EventSource('/api/notifications');

this.eventSource.onmessage = (event) => {
const notification = JSON.parse(event.data);
this.notifications.push(notification);
this.notifyListeners('new-notification', notification);
};

this.eventSource.addEventListener('notification-read', (event) => {
const data = JSON.parse(event.data);
this.markAsRead(data.id);
});
}

addListener(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
}

notifyListeners(event, data) {
const callbacks = this.listeners.get(event) || [];
callbacks.forEach(callback => callback(data));
}

markAsRead(id) {
const notification = this.notifications.find(n => n.id === id);
if (notification) {
notification.read = true;
this.notifyListeners('notification-updated', notification);
}
}

disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}

// 使用示例
const notificationService = new NotificationService();
notificationService.addListener('new-notification', (notification) => {
showNotification(notification);
});
notificationService.connect();

实时数据监控

// 系统监控仪表板
class SystemMonitor {
constructor() {
this.eventSource = null;
this.metrics = {};
this.charts = new Map();
}

startMonitoring() {
this.eventSource = new EventSource('/api/system-metrics');

this.eventSource.addEventListener('cpu-usage', (event) => {
const data = JSON.parse(event.data);
this.updateMetric('cpu', data.value);
this.updateChart('cpu-chart', data);
});

this.eventSource.addEventListener('memory-usage', (event) => {
const data = JSON.parse(event.data);
this.updateMetric('memory', data.value);
this.updateChart('memory-chart', data);
});

this.eventSource.addEventListener('network-traffic', (event) => {
const data = JSON.parse(event.data);
this.updateMetric('network', data.value);
this.updateChart('network-chart', data);
});
}

updateMetric(type, value) {
this.metrics[type] = value;
this.updateUI(type, value);
}

updateChart(chartId, data) {
const chart = this.charts.get(chartId);
if (chart) {
chart.addData(data);
}
}

updateUI(type, value) {
const element = document.getElementById(`${type}-value`);
if (element) {
element.textContent = `${value}%`;
element.className = this.getStatusClass(value);
}
}

getStatusClass(value) {
if (value < 50) return 'status-ok';
if (value < 80) return 'status-warning';
return 'status-critical';
}
}

实时聊天应用

// 聊天室实现
class ChatRoom {
constructor(roomId) {
this.roomId = roomId;
this.eventSource = null;
this.messages = [];
this.users = new Set();
}

join() {
this.eventSource = new EventSource(`/api/chat/${this.roomId}`);

this.eventSource.addEventListener('message', (event) => {
const message = JSON.parse(event.data);
this.addMessage(message);
});

this.eventSource.addEventListener('user-joined', (event) => {
const user = JSON.parse(event.data);
this.users.add(user.username);
this.updateUserList();
});

this.eventSource.addEventListener('user-left', (event) => {
const user = JSON.parse(event.data);
this.users.delete(user.username);
this.updateUserList();
});
}

addMessage(message) {
this.messages.push(message);
this.displayMessage(message);
}

displayMessage(message) {
const messageElement = document.createElement('div');
messageElement.className = 'message';
messageElement.innerHTML = `
<span class="username">${message.username}</span>
<span class="timestamp">${new Date(message.timestamp).toLocaleTimeString()}</span>
<span class="content">${message.content}</span>
`;

document.getElementById('messages').appendChild(messageElement);
this.scrollToBottom();
}

updateUserList() {
const userList = document.getElementById('users');
userList.innerHTML = '';
this.users.forEach(username => {
const userElement = document.createElement('li');
userElement.textContent = username;
userList.appendChild(userElement);
});
}

scrollToBottom() {
const messagesContainer = document.getElementById('messages');
messagesContainer.scrollTop = messagesContainer.scrollHeight;
}
}

性能优化策略

连接管理优化

// 智能重连策略
class SmartEventSource {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxRetries: 5,
retryDelay: 1000,
backoffMultiplier: 2,
...options
};

this.retryCount = 0;
this.retryTimeout = null;
this.eventSource = null;
this.listeners = new Map();
}

connect() {
try {
this.eventSource = new EventSource(this.url);
this.setupEventHandlers();
this.retryCount = 0;
} catch (error) {
console.error('连接失败:', error);
this.scheduleRetry();
}
}

setupEventHandlers() {
this.eventSource.onopen = () => {
console.log('SSE连接已建立');
this.retryCount = 0;
};

this.eventSource.onerror = (event) => {
if (this.eventSource.readyState === EventSource.CLOSED) {
this.scheduleRetry();
}
};

this.eventSource.onmessage = (event) => {
this.notifyListeners('message', event);
};
}

scheduleRetry() {
if (this.retryCount < this.options.maxRetries) {
const delay = this.options.retryDelay * Math.pow(this.options.backoffMultiplier, this.retryCount);
this.retryTimeout = setTimeout(() => {
this.retryCount++;
console.log(`尝试重连 (${this.retryCount}/${this.options.maxRetries})`);
this.connect();
}, delay);
} else {
console.error('达到最大重连次数');
this.notifyListeners('max-retries-reached');
}
}

addEventListener(type, callback) {
if (!this.listeners.has(type)) {
this.listeners.set(type, []);
}
this.listeners.get(type).push(callback);
}

notifyListeners(type, data) {
const callbacks = this.listeners.get(type) || [];
callbacks.forEach(callback => callback(data));
}

disconnect() {
if (this.retryTimeout) {
clearTimeout(this.retryTimeout);
}
if (this.eventSource) {
this.eventSource.close();
}
}
}

数据处理优化

// 消息批处理
class MessageBatcher {
constructor(batchSize = 10, batchTimeout = 1000) {
this.batchSize = batchSize;
this.batchTimeout = batchTimeout;
this.messages = [];
this.batchTimer = null;
this.processCallback = null;
}

addMessage(message) {
this.messages.push(message);

if (this.messages.length >= this.batchSize) {
this.processBatch();
} else if (this.messages.length === 1) {
this.startBatchTimer();
}
}

startBatchTimer() {
this.batchTimer = setTimeout(() => {
this.processBatch();
}, this.batchTimeout);
}

processBatch() {
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}

if (this.messages.length > 0 && this.processCallback) {
const batch = [...this.messages];
this.messages = [];
this.processCallback(batch);
}
}

onBatchProcess(callback) {
this.processCallback = callback;
}
}

// 使用示例
const messageBatcher = new MessageBatcher(5, 500);
messageBatcher.onBatchProcess((messages) => {
console.log('处理消息批次:', messages);
// 批量更新UI
updateUIWithBatch(messages);
});

// 在SSE消息处理中使用
eventSource.onmessage = (event) => {
const message = JSON.parse(event.data);
messageBatcher.addMessage(message);
};

内存管理优化

// 消息历史管理
class MessageHistory {
constructor(maxMessages = 1000) {
this.maxMessages = maxMessages;
this.messages = [];
this.messageIds = new Set();
}

addMessage(message) {
// 检查是否重复
if (message.id && this.messageIds.has(message.id)) {
return false;
}

this.messages.push(message);
if (message.id) {
this.messageIds.add(message.id);
}

// 限制消息数量
if (this.messages.length > this.maxMessages) {
const removed = this.messages.shift();
if (removed.id) {
this.messageIds.delete(removed.id);
}
}

return true;
}

getMessages(limit = 50, offset = 0) {
return this.messages.slice(offset, offset + limit);
}

searchMessages(query) {
return this.messages.filter(message =>
message.content && message.content.includes(query)
);
}

clear() {
this.messages = [];
this.messageIds.clear();
}
}

错误处理和调试

常见错误类型

// 错误处理工具
class SSEErrorHandler {
static handleConnectionError(error, eventSource) {
console.error('SSE连接错误:', error);

switch (error.type) {
case 'network':
console.log('网络错误,尝试重连...');
break;
case 'timeout':
console.log('连接超时,检查服务器状态...');
break;
case 'server':
console.log('服务器错误,等待恢复...');
break;
default:
console.log('未知错误类型');
}
}

static validateMessage(data) {
try {
const parsed = JSON.parse(data);
if (!parsed.timestamp || !parsed.content) {
throw new Error('消息格式无效');
}
return parsed;
} catch (error) {
console.error('消息解析错误:', error);
return null;
}
}

static logConnectionStatus(eventSource) {
const status = {
readyState: eventSource.readyState,
url: eventSource.url,
withCredentials: eventSource.withCredentials
};

console.log('连接状态:', status);
return status;
}
}

调试工具

// SSE调试器
class SSEDebugger {
constructor(eventSource) {
this.eventSource = eventSource;
this.logs = [];
this.maxLogs = 100;
this.enabled = true;
}

enable() {
this.enabled = true;
this.setupDebugging();
}

disable() {
this.enabled = false;
}

setupDebugging() {
if (!this.enabled) return;

// 监听所有事件
this.eventSource.addEventListener('open', (event) => {
this.log('连接已建立', event);
});

this.eventSource.addEventListener('message', (event) => {
this.log('收到消息', event);
});

this.eventSource.addEventListener('error', (event) => {
this.log('连接错误', event);
});

// 重写close方法
const originalClose = this.eventSource.close;
this.eventSource.close = () => {
this.log('连接已关闭');
originalClose.call(this.eventSource);
};
}

log(message, data = null) {
if (!this.enabled) return;

const logEntry = {
timestamp: new Date().toISOString(),
message,
data,
readyState: this.eventSource.readyState
};

this.logs.push(logEntry);

if (this.logs.length > this.maxLogs) {
this.logs.shift();
}

console.log(`[SSE Debug] ${message}`, data);
}

getLogs() {
return [...this.logs];
}

exportLogs() {
return JSON.stringify(this.logs, null, 2);
}

clearLogs() {
this.logs = [];
}
}

// 使用示例
const eventSource = new EventSource('/api/events');
const debugger = new SSEDebugger(eventSource);
debugger.enable();

// 在控制台查看日志
console.log('SSE日志:', debugger.getLogs());

最佳实践

设计原则

  1. 单一职责:每个SSE连接只负责一种类型的数据推送
  2. 错误恢复:实现智能重连和错误处理机制
  3. 资源管理:及时清理不需要的连接和监听器
  4. 性能监控:监控连接状态和消息处理性能

安全考虑

// 安全配置
const secureEventSource = new EventSource('/api/secure-events', {
withCredentials: true // 发送认证信息
});

// 服务器端安全验证
app.get('/api/secure-events', authenticateUser, (req, res) => {
// 验证用户身份
if (!req.user) {
res.status(401).end();
return;
}

res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});

// 只发送用户相关的数据
const userData = getUserData(req.user.id);
res.write(`data: ${JSON.stringify(userData)}\n\n`);
});

性能优化建议

  1. 批量处理:将多个小消息合并为批次处理
  2. 连接复用:在可能的情况下复用SSE连接
  3. 消息压缩:对大量文本数据进行压缩
  4. 缓存策略:合理使用缓存减少重复数据传输

与其他技术对比

SSE vs WebSocket

特性SSEWebSocket
通信方向单向(服务器到客户端)双向
协议HTTP自定义协议
自动重连需要手动实现
实现复杂度简单复杂
浏览器支持广泛广泛
适用场景实时通知、数据推送实时聊天、游戏

SSE vs 轮询

特性SSE轮询
实时性
服务器负载
网络开销
实现复杂度简单简单
浏览器支持现代浏览器所有浏览器

总结

SSE服务器推送事件是一种简单而强大的实时通信技术,特别适合需要服务器主动推送数据的场景。通过本文的学习,你应该能够:

  1. 理解SSE原理:掌握SSE的工作原理和通信机制
  2. 实现SSE应用:能够在前端和后端实现SSE功能
  3. 优化性能:了解SSE性能优化的各种策略
  4. 处理错误:掌握SSE错误处理和调试方法
  5. 选择技术:根据具体需求选择合适的实时通信技术

SSE是现代Web应用中实现实时功能的重要工具,掌握它将为你的应用开发带来更多可能性。在实际项目中,建议结合具体需求,选择最适合的实时通信方案,并注意性能优化和错误处理,确保应用的稳定性和用户体验。

相关资源

官方文档

技术标准

学习资源