跳到主要内容

WebSocket实时通信解决方案

目录

介绍

在现代Web应用中,实时通信已经成为不可或缺的功能。从即时聊天、在线游戏,到实时协作工具、股票交易系统,都需要客户端和服务器之间进行快速、双向的数据交换。传统的HTTP协议采用请求-响应模式,客户端必须主动发起请求才能获取数据,这种模式在需要实时更新的场景下效率很低。

WebSocket是HTML5提供的一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。

什么是WebSocket?

WebSocket是一种网络通信协议,它提供了在单个TCP连接上进行全双工通信的能力。与HTTP协议不同,WebSocket协议在建立连接后,客户端和服务器可以随时相互发送数据,而不需要重新建立连接。

WebSocket的核心特点:

  1. 全双工通信: 客户端和服务器可以同时发送和接收数据
  2. 持久连接: 一旦建立连接,会保持连接状态,无需重复握手
  3. 低延迟: 没有HTTP请求的开销,数据传输更快
  4. 支持二进制: 可以传输文本和二进制数据
  5. 更少的开销: 相比HTTP轮询,大幅减少网络流量

为什么选择WebSocket?

当您的应用需要满足以下条件时,WebSocket是最佳选择:

  • 需要实时双向通信(客户端和服务器都需要主动发送数据)
  • 要求低延迟的数据传输
  • 需要传输二进制数据(如文件、图片、音视频流)
  • 频繁的数据交换(如在线游戏、协作编辑)
  • 需要保持长连接状态

如果只需要服务器单向推送数据且主要是文本格式,SSE可能是更简单的选择。

WebSocket vs HTTP轮询

HTTP轮询的问题:

  • 客户端需要不断发送请求,造成大量无效请求
  • 每次请求都包含完整的HTTP头部,浪费带宽
  • 服务器压力大,需要处理大量重复请求
  • 实时性差,受轮询间隔限制

WebSocket的优势:

  • 建立一次连接,持续使用
  • 数据帧开销小,传输效率高
  • 服务器可以主动推送,实时性好
  • 减少服务器压力和网络带宽消耗

核心价值

1. 技术优势

  • 真正的实时通信: 毫秒级的消息传输延迟
  • 全双工通信: 客户端和服务器可以同时发送数据
  • 高效传输: 数据帧开销小,相比HTTP节省大量带宽
  • 协议灵活: 支持文本和二进制数据传输
  • 广泛支持: 现代浏览器和服务器都已支持

2. 开发效率

  • 简单的API: 浏览器提供原生WebSocket API,易于使用
  • 丰富的生态: 有众多成熟的库和框架(Socket.IO、ws等)
  • 易于调试: 浏览器开发者工具提供WebSocket调试功能
  • 统一标准: W3C和IETF标准化,跨平台兼容性好

3. 业务价值

  • 提升用户体验: 实时响应,无需等待和刷新
  • 降低成本: 减少服务器资源消耗和带宽费用
  • 支持复杂场景: 可实现实时协作、在线游戏等复杂功能
  • 增强竞争力: 提供现代化的实时交互体验

WebSocket原理详解

工作原理

WebSocket的工作可以分为两个阶段:握手阶段和数据传输阶段。

握手阶段: 客户端首先发送一个HTTP请求,请求升级协议到WebSocket。服务器如果支持WebSocket,则返回同意升级的响应,之后连接就从HTTP协议切换到WebSocket协议。

数据传输阶段: 握手成功后,客户端和服务器之间就建立了一个全双工的通信通道,双方可以随时相互发送数据,数据以帧(frame)的形式传输。

握手过程详解

客户端握手请求:

GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com

关键字段说明:

  • Upgrade: websocket: 告诉服务器要升级到WebSocket协议
  • Connection: Upgrade: 表示需要升级连接
  • Sec-WebSocket-Key: 一个Base64编码的随机密钥,用于安全验证
  • Sec-WebSocket-Version: WebSocket协议版本,当前为13
  • Origin: 请求的源,用于防止跨站攻击

服务器握手响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

关键字段说明:

  • HTTP/1.1 101 Switching Protocols: 状态码101表示协议切换
  • Sec-WebSocket-Accept: 服务器根据客户端的Key计算出的值,用于验证

数据帧格式

WebSocket使用数据帧进行数据传输,每个帧包含了元数据和实际的数据内容。

帧结构:

 0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+

主要字段:

  • FIN: 标识是否是消息的最后一个分片
  • opcode: 操作码,表示帧的类型(文本、二进制、关闭、ping、pong等)
  • MASK: 标识数据是否被掩码处理(客户端到服务器必须掩码)
  • Payload len: 数据长度
  • Payload Data: 实际的数据内容

操作码类型:

  • 0x0: 连续帧
  • 0x1: 文本帧
  • 0x2: 二进制帧
  • 0x8: 连接关闭
  • 0x9: ping帧
  • 0xA: pong帧

连接状态

WebSocket连接有四种状态:

WebSocket.CONNECTING  // 0 - 正在连接
WebSocket.OPEN // 1 - 已连接,可以通信
WebSocket.CLOSING // 2 - 正在关闭
WebSocket.CLOSED // 3 - 已关闭或无法打开

技术架构设计

客户端架构

服务端架构

核心功能实现

基础连接实现

最基础的WebSocket连接非常简单,但在生产环境中需要考虑更多细节。

客户端基础实现:

// 创建WebSocket连接
const ws = new WebSocket('ws://localhost:8080');

// 连接打开时触发
ws.onopen = function(event) {
console.log('WebSocket连接已建立');

// 发送消息
ws.send('Hello Server!');

// 发送JSON数据
ws.send(JSON.stringify({
type: 'greeting',
message: 'Hello from client'
}));
};

// 收到消息时触发
ws.onmessage = function(event) {
console.log('收到消息:', event.data);

// 处理JSON数据
try {
const data = JSON.parse(event.data);
console.log('解析后的数据:', data);
} catch (e) {
console.log('文本消息:', event.data);
}
};

// 发生错误时触发
ws.onerror = function(error) {
console.error('WebSocket错误:', error);
};

// 连接关闭时触发
ws.onclose = function(event) {
console.log('WebSocket连接已关闭');
console.log('关闭码:', event.code);
console.log('关闭原因:', event.reason);
console.log('是否正常关闭:', event.wasClean);
};

// 主动关闭连接
// ws.close(1000, '正常关闭');

服务端基础实现(Node.js + ws库):

const WebSocket = require('ws');

// 创建WebSocket服务器
const wss = new WebSocket.Server({ port: 8080 });

// 监听连接事件
wss.on('connection', function connection(ws, req) {
console.log('新客户端连接');
console.log('客户端IP:', req.socket.remoteAddress);

// 发送欢迎消息
ws.send(JSON.stringify({
type: 'welcome',
message: '欢迎连接到WebSocket服务器'
}));

// 监听消息
ws.on('message', function incoming(message) {
console.log('收到消息:', message.toString());

// 回显消息
ws.send(`服务器收到: ${message}`);

// 广播给所有客户端
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(message.toString());
}
});
});

// 监听错误
ws.on('error', function(error) {
console.error('客户端错误:', error);
});

// 监听关闭
ws.on('close', function(code, reason) {
console.log('客户端断开连接');
console.log('关闭码:', code);
console.log('关闭原因:', reason.toString());
});
});

console.log('WebSocket服务器运行在 ws://localhost:8080');

心跳保活机制

WebSocket连接可能因为网络问题、代理服务器超时等原因被断开,但双方可能都不知道连接已断开。心跳机制通过定期发送ping/pong消息来检测连接是否正常。

class WebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.ws = null;

// 心跳配置
this.heartbeatInterval = options.heartbeatInterval || 30000; // 30秒
this.heartbeatTimeout = options.heartbeatTimeout || 10000; // 10秒
this.heartbeatTimer = null;
this.heartbeatTimeoutTimer = null;
this.lastPongTime = null;

// 事件处理器
this.eventHandlers = {
open: [],
message: [],
error: [],
close: []
};
}

// 连接到服务器
connect() {
this.ws = new WebSocket(this.url);

this.ws.onopen = (event) => {
console.log('WebSocket连接成功');
this.startHeartbeat();
this.emit('open', event);
};

this.ws.onmessage = (event) => {
// 检查是否是心跳响应
if (event.data === 'pong') {
this.handlePong();
return;
}

this.emit('message', event);
};

this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
this.emit('error', error);
};

this.ws.onclose = (event) => {
console.log('WebSocket连接关闭');
this.stopHeartbeat();
this.emit('close', event);
};
}

// 启动心跳
startHeartbeat() {
this.stopHeartbeat();

this.heartbeatTimer = setInterval(() => {
this.sendHeartbeat();
}, this.heartbeatInterval);

// 首次立即发送心跳
this.sendHeartbeat();
}

// 停止心跳
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}

if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
this.heartbeatTimeoutTimer = null;
}
}

// 发送心跳
sendHeartbeat() {
if (this.ws.readyState === WebSocket.OPEN) {
console.log('发送心跳ping');
this.ws.send('ping');

// 设置超时检测
this.heartbeatTimeoutTimer = setTimeout(() => {
console.warn('心跳超时,连接可能已断开');
this.ws.close();
}, this.heartbeatTimeout);
}
}

// 处理心跳响应
handlePong() {
console.log('收到心跳pong');
this.lastPongTime = Date.now();

// 清除超时检测
if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
this.heartbeatTimeoutTimer = null;
}
}

// 发送消息
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
} else {
console.warn('WebSocket未连接,无法发送消息');
}
}

// 关闭连接
close() {
this.stopHeartbeat();
if (this.ws) {
this.ws.close();
}
}

// 事件监听
on(event, handler) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].push(handler);
}
}

// 触发事件
emit(event, data) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].forEach(handler => handler(data));
}
}
}

// 使用示例
const client = new WebSocketClient('ws://localhost:8080', {
heartbeatInterval: 30000,
heartbeatTimeout: 10000
});

client.on('open', () => {
console.log('连接已建立');
client.send('Hello Server!');
});

client.on('message', (event) => {
console.log('收到消息:', event.data);
});

client.connect();

服务端心跳处理:

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
// 为每个连接设置心跳检测
let isAlive = true;

ws.on('pong', () => {
isAlive = true;
});

// 监听客户端发来的ping
ws.on('message', (message) => {
const data = message.toString();

if (data === 'ping') {
// 响应pong
ws.send('pong');
} else {
// 处理正常消息
console.log('收到消息:', data);
}
});

// 定期检查连接状态
const heartbeatInterval = setInterval(() => {
if (isAlive === false) {
console.log('客户端无响应,断开连接');
clearInterval(heartbeatInterval);
return ws.terminate();
}

isAlive = false;
ws.ping();
}, 30000);

ws.on('close', () => {
clearInterval(heartbeatInterval);
});
});

自动重连策略

网络不稳定时,WebSocket连接可能会意外断开。实现自动重连可以提升用户体验,避免用户手动刷新页面。

class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxReconnectAttempts: options.maxReconnectAttempts || 10,
reconnectInterval: options.reconnectInterval || 3000,
maxReconnectInterval: options.maxReconnectInterval || 30000,
reconnectDecay: options.reconnectDecay || 1.5,
...options
};

this.ws = null;
this.reconnectAttempts = 0;
this.reconnectTimer = null;
this.forcedClose = false;
this.timedOut = false;

this.eventHandlers = {
open: [],
message: [],
error: [],
close: [],
reconnect: []
};
}

// 连接到服务器
connect() {
this.forcedClose = false;

try {
this.ws = new WebSocket(this.url);
this.setupEventHandlers();
} catch (error) {
console.error('创建WebSocket失败:', error);
this.scheduleReconnect();
}
}

// 设置事件处理器
setupEventHandlers() {
this.ws.onopen = (event) => {
console.log('WebSocket连接成功');
this.reconnectAttempts = 0;
this.emit('open', event);
};

this.ws.onmessage = (event) => {
this.emit('message', event);
};

this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
this.emit('error', error);
};

this.ws.onclose = (event) => {
console.log('WebSocket连接关闭', event);
this.emit('close', event);

// 如果不是主动关闭,则尝试重连
if (!this.forcedClose) {
this.scheduleReconnect();
}
};
}

// 安排重连
scheduleReconnect() {
if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
console.error('已达到最大重连次数');
return;
}

// 计算退避时间(指数退避)
const timeout = Math.min(
this.options.reconnectInterval * Math.pow(this.options.reconnectDecay, this.reconnectAttempts),
this.options.maxReconnectInterval
);

console.log(`${timeout/1000}秒后尝试第${this.reconnectAttempts + 1}次重连...`);

this.reconnectTimer = setTimeout(() => {
this.reconnectAttempts++;
this.emit('reconnect', {
attempt: this.reconnectAttempts,
maxAttempts: this.options.maxReconnectAttempts
});
this.connect();
}, timeout);
}

// 发送消息
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
return true;
} else {
console.warn('WebSocket未连接');
return false;
}
}

// 关闭连接
close() {
this.forcedClose = true;

if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}

if (this.ws) {
this.ws.close();
}
}

// 获取连接状态
getState() {
return this.ws ? this.ws.readyState : WebSocket.CLOSED;
}

// 事件监听
on(event, handler) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].push(handler);
}
return this;
}

// 移除事件监听
off(event, handler) {
if (this.eventHandlers[event]) {
const index = this.eventHandlers[event].indexOf(handler);
if (index > -1) {
this.eventHandlers[event].splice(index, 1);
}
}
return this;
}

// 触发事件
emit(event, data) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`事件处理器错误[${event}]:`, error);
}
});
}
}
}

// 使用示例
const ws = new ReconnectingWebSocket('ws://localhost:8080', {
maxReconnectAttempts: 10,
reconnectInterval: 3000,
maxReconnectInterval: 30000,
reconnectDecay: 1.5
});

ws.on('open', () => {
console.log('连接已建立');
});

ws.on('message', (event) => {
console.log('收到消息:', event.data);
});

ws.on('reconnect', (info) => {
console.log(`正在重连(${info.attempt}/${info.maxAttempts})`);
});

ws.on('close', (event) => {
console.log('连接已关闭');
});

ws.connect();

消息队列管理

在连接断开期间,可能有消息需要发送。消息队列可以缓存这些消息,待连接恢复后自动发送。

class WebSocketWithQueue extends ReconnectingWebSocket {
constructor(url, options = {}) {
super(url, options);

this.messageQueue = [];
this.maxQueueSize = options.maxQueueSize || 100;
this.enableQueue = options.enableQueue !== false;

// 连接打开时发送队列中的消息
this.on('open', () => {
this.flushQueue();
});
}

// 发送消息(支持队列)
send(data) {
const message = {
data,
timestamp: Date.now(),
attempts: 0
};

// 如果连接已打开,直接发送
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
try {
this.ws.send(data);
return true;
} catch (error) {
console.error('发送消息失败:', error);

// 发送失败,加入队列
if (this.enableQueue) {
this.enqueue(message);
}
return false;
}
} else {
// 连接未打开,加入队列
if (this.enableQueue) {
this.enqueue(message);
console.log('消息已加入队列,等待连接恢复');
return true;
} else {
console.warn('WebSocket未连接,消息发送失败');
return false;
}
}
}

// 加入队列
enqueue(message) {
if (this.messageQueue.length >= this.maxQueueSize) {
console.warn('消息队列已满,移除最早的消息');
this.messageQueue.shift();
}

this.messageQueue.push(message);
console.log(`消息已加入队列,当前队列长度:${this.messageQueue.length}`);
}

// 发送队列中的所有消息
flushQueue() {
if (this.messageQueue.length === 0) {
return;
}

console.log(`开始发送队列中的${this.messageQueue.length}条消息`);

const queue = [...this.messageQueue];
this.messageQueue = [];

queue.forEach(message => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
try {
this.ws.send(message.data);
console.log('队列消息发送成功');
} catch (error) {
console.error('队列消息发送失败:', error);
// 发送失败,重新加入队列
message.attempts++;
if (message.attempts < 3) {
this.enqueue(message);
}
}
}
});
}

// 清空队列
clearQueue() {
const count = this.messageQueue.length;
this.messageQueue = [];
console.log(`已清空${count}条队列消息`);
}

// 获取队列长度
getQueueLength() {
return this.messageQueue.length;
}
}

// 使用示例
const ws = new WebSocketWithQueue('ws://localhost:8080', {
enableQueue: true,
maxQueueSize: 100
});

ws.on('open', () => {
console.log('连接已建立');
});

ws.connect();

// 即使连接断开,也可以发送消息(会被加入队列)
ws.send('这条消息会被加入队列');

二进制数据传输

WebSocket不仅可以传输文本数据,还可以传输二进制数据,这对于传输文件、图片、音视频流等场景非常有用。

class BinaryWebSocket extends WebSocketWithQueue {
constructor(url, options = {}) {
super(url, options);

// 设置二进制类型
this.binaryType = options.binaryType || 'arraybuffer'; // 或 'blob'

this.on('open', () => {
if (this.ws) {
this.ws.binaryType = this.binaryType;
}
});
}

// 发送文件
sendFile(file) {
if (!(file instanceof File || file instanceof Blob)) {
console.error('参数必须是File或Blob对象');
return Promise.reject(new Error('Invalid file type'));
}

return new Promise((resolve, reject) => {
const reader = new FileReader();

reader.onload = (event) => {
// 发送文件元数据
this.send(JSON.stringify({
type: 'file-meta',
name: file.name,
size: file.size,
mimeType: file.type
}));

// 发送文件数据
setTimeout(() => {
this.send(event.target.result);
resolve();
}, 100);
};

reader.onerror = (error) => {
console.error('读取文件失败:', error);
reject(error);
};

reader.readAsArrayBuffer(file);
});
}

// 发送图片
sendImage(imageData) {
// imageData可以是canvas.toBlob()的结果
if (imageData instanceof Blob) {
return this.sendFile(imageData);
}

// 或者是ImageData对象
if (imageData instanceof ImageData) {
const buffer = imageData.data.buffer;

this.send(JSON.stringify({
type: 'image-meta',
width: imageData.width,
height: imageData.height
}));

setTimeout(() => {
this.send(buffer);
}, 100);
}
}

// 分块发送大文件
sendFileInChunks(file, chunkSize = 64 * 1024) {
const totalChunks = Math.ceil(file.size / chunkSize);
let currentChunk = 0;

// 发送文件元数据
this.send(JSON.stringify({
type: 'chunked-file-start',
name: file.name,
size: file.size,
totalChunks: totalChunks,
chunkSize: chunkSize
}));

const sendNextChunk = () => {
if (currentChunk >= totalChunks) {
// 发送完成信号
this.send(JSON.stringify({
type: 'chunked-file-end',
name: file.name
}));
return;
}

const start = currentChunk * chunkSize;
const end = Math.min(start + chunkSize, file.size);
const chunk = file.slice(start, end);

const reader = new FileReader();
reader.onload = (event) => {
// 发送块数据
this.send(JSON.stringify({
type: 'chunked-file-data',
chunkIndex: currentChunk,
totalChunks: totalChunks
}));

setTimeout(() => {
this.send(event.target.result);
currentChunk++;

// 延迟发送下一块,避免过载
setTimeout(sendNextChunk, 50);
}, 50);
};

reader.readAsArrayBuffer(chunk);
};

sendNextChunk();
}
}

// 使用示例
const ws = new BinaryWebSocket('ws://localhost:8080', {
binaryType: 'arraybuffer'
});

ws.on('open', () => {
console.log('连接已建立');
});

ws.on('message', (event) => {
if (typeof event.data === 'string') {
// 文本消息
console.log('收到文本消息:', event.data);
} else if (event.data instanceof ArrayBuffer) {
// 二进制消息
console.log('收到二进制数据,大小:', event.data.byteLength);

// 可以将ArrayBuffer转换为其他格式
const blob = new Blob([event.data]);
const url = URL.createObjectURL(blob);
console.log('Blob URL:', url);
}
});

ws.connect();

// 发送文件
document.getElementById('fileInput').addEventListener('change', (e) => {
const file = e.target.files[0];
if (file) {
ws.sendFile(file).then(() => {
console.log('文件发送成功');
});
}
});

// 发送大文件(分块)
document.getElementById('largeFileInput').addEventListener('change', (e) => {
const file = e.target.files[0];
if (file) {
ws.sendFileInChunks(file, 64 * 1024); // 64KB per chunk
}
});

服务端二进制处理:

const WebSocket = require('ws');
const fs = require('fs');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
let fileMetadata = null;
let fileChunks = [];

ws.on('message', (message) => {
// 判断是文本还是二进制
if (Buffer.isBuffer(message)) {
// 二进制数据
console.log('收到二进制数据,大小:', message.length);

if (fileMetadata) {
// 保存文件
const filePath = `./uploads/${fileMetadata.name}`;
fs.writeFile(filePath, message, (err) => {
if (err) {
console.error('保存文件失败:', err);
} else {
console.log('文件保存成功:', filePath);
ws.send(JSON.stringify({
type: 'file-saved',
name: fileMetadata.name,
path: filePath
}));
}
});

fileMetadata = null;
}
} else {
// 文本数据
try {
const data = JSON.parse(message.toString());

if (data.type === 'file-meta') {
// 接收文件元数据
fileMetadata = data;
console.log('准备接收文件:', data.name);
} else if (data.type === 'chunked-file-start') {
// 分块文件开始
fileMetadata = data;
fileChunks = new Array(data.totalChunks);
} else if (data.type === 'chunked-file-data') {
// 分块数据(下一条消息是二进制数据)
} else if (data.type === 'chunked-file-end') {
// 分块文件结束,合并所有块
const buffer = Buffer.concat(fileChunks);
const filePath = `./uploads/${data.name}`;

fs.writeFile(filePath, buffer, (err) => {
if (err) {
console.error('保存文件失败:', err);
} else {
console.log('分块文件保存成功:', filePath);
}
});

fileChunks = [];
fileMetadata = null;
}
} catch (e) {
console.log('收到文本消息:', message.toString());
}
}
});
});

实战应用场景

场景1: 即时聊天应用

class ChatWebSocket extends BinaryWebSocket {
constructor(url, options = {}) {
super(url, options);

this.userId = options.userId;
this.username = options.username;
this.currentRoom = null;

this.on('message', (event) => {
this.handleMessage(event.data);
});
}

// 处理消息
handleMessage(data) {
try {
const message = JSON.parse(data);

switch (message.type) {
case 'chat':
this.onChatMessage(message);
break;
case 'user-joined':
this.onUserJoined(message);
break;
case 'user-left':
this.onUserLeft(message);
break;
case 'typing':
this.onUserTyping(message);
break;
default:
console.log('未知消息类型:', message.type);
}
} catch (e) {
console.error('消息解析失败:', e);
}
}

// 加入房间
joinRoom(roomId) {
this.currentRoom = roomId;
this.send(JSON.stringify({
type: 'join-room',
roomId,
userId: this.userId,
username: this.username
}));
}

// 离开房间
leaveRoom() {
if (this.currentRoom) {
this.send(JSON.stringify({
type: 'leave-room',
roomId: this.currentRoom,
userId: this.userId
}));
this.currentRoom = null;
}
}

// 发送聊天消息
sendChatMessage(content) {
this.send(JSON.stringify({
type: 'chat',
roomId: this.currentRoom,
userId: this.userId,
username: this.username,
content,
timestamp: Date.now()
}));
}

// 发送正在输入状态
sendTyping(isTyping) {
this.send(JSON.stringify({
type: 'typing',
roomId: this.currentRoom,
userId: this.userId,
username: this.username,
isTyping
}));
}

// 收到聊天消息
onChatMessage(message) {
console.log(`${message.username}: ${message.content}`);
// 更新UI显示消息
}

// 用户加入
onUserJoined(message) {
console.log(`${message.username}加入了房间`);
}

// 用户离开
onUserLeft(message) {
console.log(`${message.username}离开了房间`);
}

// 用户正在输入
onUserTyping(message) {
console.log(`${message.username}正在输入...`);
}
}

// 使用示例
const chat = new ChatWebSocket('ws://localhost:8080', {
userId: '123',
username: '张三'
});

chat.on('open', () => {
// 加入聊天室
chat.joinRoom('room-001');
});

chat.connect();

// 发送消息
document.getElementById('sendBtn').addEventListener('click', () => {
const input = document.getElementById('messageInput');
chat.sendChatMessage(input.value);
input.value = '';
});

// 输入状态
let typingTimeout;
document.getElementById('messageInput').addEventListener('input', () => {
chat.sendTyping(true);

clearTimeout(typingTimeout);
typingTimeout = setTimeout(() => {
chat.sendTyping(false);
}, 1000);
});

场景2: 实时协作编辑

class CollaborativeEditor extends BinaryWebSocket {
constructor(url, options = {}) {
super(url, options);

this.documentId = options.documentId;
this.userId = options.userId;
this.pendingChanges = [];
this.version = 0;

this.on('message', (event) => {
this.handleMessage(event.data);
});
}

// 连接到文档
connectToDocument() {
this.send(JSON.stringify({
type: 'join-document',
documentId: this.documentId,
userId: this.userId
}));
}

// 发送编辑操作
sendEdit(operation) {
const change = {
type: 'edit',
documentId: this.documentId,
userId: this.userId,
operation,
version: this.version,
timestamp: Date.now()
};

this.pendingChanges.push(change);
this.send(JSON.stringify(change));
}

// 处理远程编辑
handleMessage(data) {
try {
const message = JSON.parse(data);

switch (message.type) {
case 'document-state':
this.handleDocumentState(message);
break;
case 'edit':
this.handleRemoteEdit(message);
break;
case 'cursor':
this.handleCursorMove(message);
break;
case 'ack':
this.handleAck(message);
break;
}
} catch (e) {
console.error('消息处理失败:', e);
}
}

// 处理文档状态
handleDocumentState(message) {
this.version = message.version;
console.log('文档版本:', this.version);
// 应用文档内容
}

// 处理远程编辑
handleRemoteEdit(message) {
if (message.userId !== this.userId) {
console.log('应用远程编辑:', message.operation);
// 应用操作到本地编辑器
this.applyOperation(message.operation);
}
}

// 处理光标移动
handleCursorMove(message) {
console.log(`用户${message.userId}光标位置:`, message.position);
// 更新UI显示其他用户的光标
}

// 处理确认
handleAck(message) {
// 移除已确认的变更
this.pendingChanges = this.pendingChanges.filter(
c => c.timestamp !== message.timestamp
);
this.version = message.version;
}

// 应用操作
applyOperation(operation) {
// 实现操作转换算法
console.log('应用操作:', operation);
}

// 发送光标位置
sendCursor(position) {
this.send(JSON.stringify({
type: 'cursor',
documentId: this.documentId,
userId: this.userId,
position
}));
}
}

// 使用示例
const editor = new CollaborativeEditor('ws://localhost:8080', {
documentId: 'doc-001',
userId: 'user-123'
});

editor.on('open', () => {
editor.connectToDocument();
});

editor.connect();

场景3: 实时游戏

class GameWebSocket extends BinaryWebSocket {
constructor(url, options = {}) {
super(url, options);

this.playerId = options.playerId;
this.gameId = options.gameId;
this.gameState = null;

this.on('message', (event) => {
this.handleMessage(event.data);
});
}

// 加入游戏
joinGame() {
this.send(JSON.stringify({
type: 'join-game',
gameId: this.gameId,
playerId: this.playerId
}));
}

// 发送玩家动作
sendAction(action) {
this.send(JSON.stringify({
type: 'player-action',
gameId: this.gameId,
playerId: this.playerId,
action,
timestamp: Date.now()
}));
}

// 处理消息
handleMessage(data) {
try {
const message = JSON.parse(data);

switch (message.type) {
case 'game-state':
this.handleGameState(message);
break;
case 'player-joined':
this.handlePlayerJoined(message);
break;
case 'player-action':
this.handlePlayerAction(message);
break;
case 'game-over':
this.handleGameOver(message);
break;
}
} catch (e) {
console.error('消息处理失败:', e);
}
}

// 处理游戏状态
handleGameState(message) {
this.gameState = message.state;
console.log('游戏状态更新:', this.gameState);
// 更新游戏画面
}

// 处理玩家加入
handlePlayerJoined(message) {
console.log(`玩家${message.playerId}加入游戏`);
}

// 处理玩家动作
handlePlayerAction(message) {
if (message.playerId !== this.playerId) {
console.log('其他玩家动作:', message.action);
// 更新游戏状态
}
}

// 处理游戏结束
handleGameOver(message) {
console.log('游戏结束,获胜者:', message.winner);
}
}

// 使用示例
const game = new GameWebSocket('ws://localhost:8080', {
playerId: 'player-123',
gameId: 'game-001'
});

game.on('open', () => {
game.joinGame();
});

game.connect();

// 发送玩家移动
document.addEventListener('keydown', (e) => {
const action = {
type: 'move',
direction: e.key
};
game.sendAction(action);
});

场景4: 实时数据监控

class MonitorWebSocket extends BinaryWebSocket {
constructor(url, options = {}) {
super(url, options);

this.metrics = new Map();
this.subscriptions = new Set();

this.on('message', (event) => {
this.handleMessage(event.data);
});
}

// 订阅指标
subscribe(metricName) {
this.subscriptions.add(metricName);
this.send(JSON.stringify({
type: 'subscribe',
metric: metricName
}));
}

// 取消订阅
unsubscribe(metricName) {
this.subscriptions.delete(metricName);
this.send(JSON.stringify({
type: 'unsubscribe',
metric: metricName
}));
}

// 处理消息
handleMessage(data) {
try {
const message = JSON.parse(data);

if (message.type === 'metrics') {
this.updateMetrics(message.data);
} else if (message.type === 'alert') {
this.handleAlert(message);
}
} catch (e) {
console.error('消息处理失败:', e);
}
}

// 更新指标
updateMetrics(data) {
Object.entries(data).forEach(([metric, value]) => {
this.metrics.set(metric, value);
});

console.log('指标更新:', data);
// 更新图表显示
}

// 处理告警
handleAlert(message) {
console.warn('告警:', message.message);
// 显示告警通知
}

// 获取指标值
getMetric(metricName) {
return this.metrics.get(metricName);
}
}

// 使用示例
const monitor = new MonitorWebSocket('ws://localhost:8080');

monitor.on('open', () => {
// 订阅CPU和内存指标
monitor.subscribe('cpu');
monitor.subscribe('memory');
monitor.subscribe('network');
});

monitor.connect();

性能优化

1. 连接池管理

对于需要大量WebSocket连接的场景,可以使用连接池来复用连接。

class WebSocketPool {
constructor(url, options = {}) {
this.url = url;
this.maxConnections = options.maxConnections || 5;
this.connections = [];
this.currentIndex = 0;
}

// 初始化连接池
initialize() {
for (let i = 0; i < this.maxConnections; i++) {
const ws = new ReconnectingWebSocket(this.url);
ws.connect();
this.connections.push(ws);
}
}

// 获取一个连接(轮询)
getConnection() {
const connection = this.connections[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.maxConnections;
return connection;
}

// 广播到所有连接
broadcast(data) {
this.connections.forEach(ws => {
ws.send(data);
});
}

// 关闭所有连接
closeAll() {
this.connections.forEach(ws => {
ws.close();
});
this.connections = [];
}
}

2. 消息压缩

对于大量文本数据,可以使用压缩算法减少传输量。

// 使用pako库进行压缩
class CompressedWebSocket extends BinaryWebSocket {
constructor(url, options = {}) {
super(url, options);
this.compressionThreshold = options.compressionThreshold || 1024; // 1KB
}

// 发送消息(自动压缩)
send(data) {
if (typeof data === 'string' && data.length > this.compressionThreshold) {
// 压缩文本数据
const compressed = pako.gzip(data);

// 发送压缩标记
super.send(JSON.stringify({
type: '__compressed__',
algorithm: 'gzip',
originalSize: data.length
}));

// 发送压缩数据
setTimeout(() => {
super.send(compressed);
}, 10);
} else {
// 直接发送
super.send(data);
}
}
}

3. 消息批处理

将多个小消息合并成一个大消息发送,减少网络往返。

class BatchedWebSocket extends BinaryWebSocket {
constructor(url, options = {}) {
super(url, options);

this.batchInterval = options.batchInterval || 100; // 100ms
this.batchQueue = [];
this.batchTimer = null;
}

// 发送消息(批处理)
sendBatched(data) {
this.batchQueue.push(data);

if (!this.batchTimer) {
this.batchTimer = setTimeout(() => {
this.flushBatch();
}, this.batchInterval);
}
}

// 刷新批次
flushBatch() {
if (this.batchQueue.length === 0) {
return;
}

const batch = {
type: '__batch__',
messages: this.batchQueue,
count: this.batchQueue.length
};

this.send(JSON.stringify(batch));
this.batchQueue = [];
this.batchTimer = null;
}
}

4. 按需连接

只在需要时建立连接,不需要时关闭连接。

class LazyWebSocket extends BinaryWebSocket {
constructor(url, options = {}) {
super(url, options);

this.idleTimeout = options.idleTimeout || 60000; // 1分钟
this.idleTimer = null;
this.lastActivityTime = null;
}

// 发送消息(自动连接)
send(data) {
this.resetIdleTimer();

// 如果未连接,先连接
if (this.getState() !== WebSocket.OPEN) {
this.connect();
// 等待连接建立后发送
this.once('open', () => {
super.send(data);
});
} else {
super.send(data);
}
}

// 重置空闲计时器
resetIdleTimer() {
this.lastActivityTime = Date.now();

if (this.idleTimer) {
clearTimeout(this.idleTimer);
}

this.idleTimer = setTimeout(() => {
console.log('连接空闲,自动关闭');
this.close();
}, this.idleTimeout);
}

// 单次事件监听
once(event, handler) {
const wrapper = (data) => {
handler(data);
this.off(event, wrapper);
};
this.on(event, wrapper);
}
}

最佳实践

1. 安全性

使用WSS(WebSocket Secure)

// 生产环境使用加密连接
const protocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
const ws = new WebSocket(`${protocol}//${location.host}/socket`);

身份认证

// 方式1: 在连接URL中传递token
const ws = new WebSocket(`wss://example.com/socket?token=${authToken}`);

// 方式2: 连接后发送认证消息
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'auth',
token: authToken
}));
};

// 服务端验证
wss.on('connection', (ws, req) => {
const url = new URL(req.url, 'ws://localhost');
const token = url.searchParams.get('token');

if (!verifyToken(token)) {
ws.close(1008, 'Unauthorized');
return;
}

// ... 正常处理
});

防止CSRF攻击

// 服务端检查Origin
wss.on('connection', (ws, req) => {
const origin = req.headers.origin;
const allowedOrigins = ['https://example.com', 'https://www.example.com'];

if (!allowedOrigins.includes(origin)) {
ws.close(1008, 'Origin not allowed');
return;
}
});

2. 错误处理

完善的错误处理

ws.onerror = (error) => {
// 记录错误
console.error('WebSocket错误:', error);

// 发送到错误追踪服务
if (window.errorTracker) {
errorTracker.captureException(error);
}

// 通知用户
showNotification('连接出现问题,正在尝试重新连接...');
};

ws.onclose = (event) => {
// 区分正常关闭和异常关闭
if (event.wasClean) {
console.log(`连接正常关闭: ${event.code} ${event.reason}`);
} else {
console.error('连接异常断开');

// 根据关闭码采取不同措施
switch (event.code) {
case 1000: // 正常关闭
break;
case 1001: // 端点离开
break;
case 1006: // 异常关闭
console.error('连接异常关闭,可能是网络问题');
break;
case 1008: // 策略违规
console.error('认证失败或违反策略');
break;
default:
console.error('未知关闭码:', event.code);
}
}
};

3. 状态管理

维护连接状态

class StatefulWebSocket extends BinaryWebSocket {
constructor(url, options = {}) {
super(url, options);

this.state = {
connected: false,
authenticated: false,
lastMessageTime: null,
messageCount: 0,
reconnectCount: 0
};

this.stateChangeListeners = [];
}

// 状态变更通知
updateState(changes) {
Object.assign(this.state, changes);
this.notifyStateChange();
}

notifyStateChange() {
this.stateChangeListeners.forEach(listener => {
listener(this.state);
});
}

onStateChange(listener) {
this.stateChangeListeners.push(listener);
}
}

4. 资源清理

及时清理资源

// 页面卸载时清理
window.addEventListener('beforeunload', () => {
if (ws) {
ws.close(1000, '页面关闭');
}
});

// React组件中
useEffect(() => {
const ws = new WebSocket('ws://localhost:8080');

return () => {
ws.close();
};
}, []);

// Vue组件中
export default {
mounted() {
this.ws = new WebSocket('ws://localhost:8080');
},
beforeUnmount() {
if (this.ws) {
this.ws.close();
}
}
};

完整代码示例

生产级WebSocket客户端

/**
* 生产级WebSocket客户端
* 包含完整的连接管理、重连、心跳、队列等功能
*/
class ProductionWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
// 重连配置
maxReconnectAttempts: options.maxReconnectAttempts || 10,
reconnectInterval: options.reconnectInterval || 3000,
maxReconnectInterval: options.maxReconnectInterval || 30000,
reconnectDecay: options.reconnectDecay || 1.5,

// 心跳配置
heartbeatInterval: options.heartbeatInterval || 30000,
heartbeatTimeout: options.heartbeatTimeout || 10000,

// 队列配置
enableQueue: options.enableQueue !== false,
maxQueueSize: options.maxQueueSize || 100,

// 二进制配置
binaryType: options.binaryType || 'arraybuffer',

...options
};

// 状态
this.ws = null;
this.reconnectAttempts = 0;
this.reconnectTimer = null;
this.forcedClose = false;

// 心跳
this.heartbeatTimer = null;
this.heartbeatTimeoutTimer = null;
this.lastPongTime = null;

// 消息队列
this.messageQueue = [];

// 事件处理器
this.eventHandlers = {
open: [],
message: [],
error: [],
close: [],
reconnect: []
};
}

/**
* 连接到服务器
*/
connect() {
this.forcedClose = false;

try {
this.ws = new WebSocket(this.url);
this.ws.binaryType = this.options.binaryType;
this.setupEventHandlers();
} catch (error) {
console.error('创建WebSocket失败:', error);
this.scheduleReconnect();
}
}

/**
* 设置事件处理器
*/
setupEventHandlers() {
this.ws.onopen = (event) => {
console.log('WebSocket连接成功');
this.reconnectAttempts = 0;
this.startHeartbeat();
this.flushQueue();
this.emit('open', event);
};

this.ws.onmessage = (event) => {
// 处理心跳
if (event.data === 'pong') {
this.handlePong();
return;
}

this.emit('message', event);
};

this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
this.emit('error', error);
};

this.ws.onclose = (event) => {
console.log('WebSocket连接关闭');
this.stopHeartbeat();
this.emit('close', event);

if (!this.forcedClose) {
this.scheduleReconnect();
}
};
}

/**
* 安排重连
*/
scheduleReconnect() {
if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
console.error('已达到最大重连次数');
return;
}

const timeout = Math.min(
this.options.reconnectInterval * Math.pow(this.options.reconnectDecay, this.reconnectAttempts),
this.options.maxReconnectInterval
);

console.log(`${timeout/1000}秒后重连(${this.reconnectAttempts + 1}/${this.options.maxReconnectAttempts})`);

this.reconnectTimer = setTimeout(() => {
this.reconnectAttempts++;
this.emit('reconnect', {
attempt: this.reconnectAttempts,
maxAttempts: this.options.maxReconnectAttempts
});
this.connect();
}, timeout);
}

/**
* 启动心跳
*/
startHeartbeat() {
this.stopHeartbeat();

this.heartbeatTimer = setInterval(() => {
this.sendHeartbeat();
}, this.options.heartbeatInterval);

this.sendHeartbeat();
}

/**
* 停止心跳
*/
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}

if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
this.heartbeatTimeoutTimer = null;
}
}

/**
* 发送心跳
*/
sendHeartbeat() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send('ping');

this.heartbeatTimeoutTimer = setTimeout(() => {
console.warn('心跳超时');
this.ws.close();
}, this.options.heartbeatTimeout);
}
}

/**
* 处理心跳响应
*/
handlePong() {
this.lastPongTime = Date.now();

if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
this.heartbeatTimeoutTimer = null;
}
}

/**
* 发送消息
*/
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
try {
this.ws.send(data);
return true;
} catch (error) {
console.error('发送消息失败:', error);

if (this.options.enableQueue) {
this.enqueue(data);
}
return false;
}
} else {
if (this.options.enableQueue) {
this.enqueue(data);
return true;
} else {
console.warn('WebSocket未连接');
return false;
}
}
}

/**
* 加入队列
*/
enqueue(data) {
if (this.messageQueue.length >= this.options.maxQueueSize) {
console.warn('消息队列已满');
this.messageQueue.shift();
}

this.messageQueue.push({
data,
timestamp: Date.now()
});
}

/**
* 发送队列中的消息
*/
flushQueue() {
if (this.messageQueue.length === 0) {
return;
}

console.log(`发送${this.messageQueue.length}条队列消息`);

const queue = [...this.messageQueue];
this.messageQueue = [];

queue.forEach(message => {
this.send(message.data);
});
}

/**
* 关闭连接
*/
close() {
this.forcedClose = true;

if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}

this.stopHeartbeat();

if (this.ws) {
this.ws.close();
}
}

/**
* 获取连接状态
*/
getState() {
return this.ws ? this.ws.readyState : WebSocket.CLOSED;
}

/**
* 事件监听
*/
on(event, handler) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].push(handler);
}
return this;
}

/**
* 移除事件监听
*/
off(event, handler) {
if (this.eventHandlers[event]) {
const index = this.eventHandlers[event].indexOf(handler);
if (index > -1) {
this.eventHandlers[event].splice(index, 1);
}
}
return this;
}

/**
* 触发事件
*/
emit(event, data) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`事件处理器错误[${event}]:`, error);
}
});
}
}
}

// 使用示例
const ws = new ProductionWebSocket('ws://localhost:8080', {
maxReconnectAttempts: 10,
reconnectInterval: 3000,
heartbeatInterval: 30000,
enableQueue: true
});

ws.on('open', () => {
console.log('连接已建立');
ws.send('Hello Server!');
});

ws.on('message', (event) => {
console.log('收到消息:', event.data);
});

ws.on('reconnect', (info) => {
console.log(`正在重连(${info.attempt}/${info.maxAttempts})`);
});

ws.on('close', (event) => {
console.log('连接已关闭');
});

ws.connect();

生产级WebSocket服务端(Node.js)

const WebSocket = require('ws');
const http = require('http');

/**
* WebSocket服务器管理器
*/
class WebSocketServer {
constructor(options = {}) {
this.port = options.port || 8080;
this.heartbeatInterval = options.heartbeatInterval || 30000;
this.clients = new Map();
this.rooms = new Map();

this.server = http.createServer();
this.wss = new WebSocket.Server({ server: this.server });

this.setupServer();
}

/**
* 设置服务器
*/
setupServer() {
this.wss.on('connection', (ws, req) => {
const clientId = this.generateClientId();
console.log(`客户端${clientId}连接`);

// 创建客户端对象
const client = {
id: clientId,
ws,
isAlive: true,
rooms: new Set(),
metadata: {
ip: req.socket.remoteAddress,
userAgent: req.headers['user-agent'],
connectedAt: Date.now()
}
};

this.clients.set(clientId, client);

// 发送欢迎消息
this.sendToClient(clientId, {
type: 'welcome',
clientId,
message: '欢迎连接'
});

// 设置事件处理
ws.on('message', (message) => {
this.handleMessage(clientId, message);
});

ws.on('pong', () => {
client.isAlive = true;
});

ws.on('close', () => {
console.log(`客户端${clientId}断开`);
this.removeClient(clientId);
});

ws.on('error', (error) => {
console.error(`客户端${clientId}错误:`, error);
});
});

// 启动心跳检测
this.startHeartbeat();
}

/**
* 生成客户端ID
*/
generateClientId() {
return `client-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}

/**
* 处理消息
*/
handleMessage(clientId, message) {
try {
// 处理心跳
if (message.toString() === 'ping') {
const client = this.clients.get(clientId);
if (client) {
client.ws.send('pong');
}
return;
}

// 解析消息
const data = JSON.parse(message.toString());

switch (data.type) {
case 'join-room':
this.handleJoinRoom(clientId, data);
break;
case 'leave-room':
this.handleLeaveRoom(clientId, data);
break;
case 'room-message':
this.handleRoomMessage(clientId, data);
break;
case 'broadcast':
this.broadcast(data);
break;
default:
console.log(`未知消息类型:${data.type}`);
}
} catch (error) {
console.error('消息处理失败:', error);
}
}

/**
* 加入房间
*/
handleJoinRoom(clientId, data) {
const client = this.clients.get(clientId);
if (!client) return;

const roomId = data.roomId;

if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}

this.rooms.get(roomId).add(clientId);
client.rooms.add(roomId);

console.log(`客户端${clientId}加入房间${roomId}`);

// 通知房间内其他人
this.broadcastToRoom(roomId, {
type: 'user-joined',
clientId,
roomId
}, clientId);
}

/**
* 离开房间
*/
handleLeaveRoom(clientId, data) {
const client = this.clients.get(clientId);
if (!client) return;

const roomId = data.roomId;

if (this.rooms.has(roomId)) {
this.rooms.get(roomId).delete(clientId);
client.rooms.delete(roomId);

console.log(`客户端${clientId}离开房间${roomId}`);

// 通知房间内其他人
this.broadcastToRoom(roomId, {
type: 'user-left',
clientId,
roomId
});

// 如果房间为空,删除房间
if (this.rooms.get(roomId).size === 0) {
this.rooms.delete(roomId);
}
}
}

/**
* 房间消息
*/
handleRoomMessage(clientId, data) {
this.broadcastToRoom(data.roomId, data, clientId);
}

/**
* 发送消息给指定客户端
*/
sendToClient(clientId, data) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
try {
client.ws.send(JSON.stringify(data));
return true;
} catch (error) {
console.error('发送消息失败:', error);
return false;
}
}
return false;
}

/**
* 广播到所有客户端
*/
broadcast(data, excludeClientId) {
this.clients.forEach((client, clientId) => {
if (clientId !== excludeClientId) {
this.sendToClient(clientId, data);
}
});
}

/**
* 广播到房间
*/
broadcastToRoom(roomId, data, excludeClientId) {
const room = this.rooms.get(roomId);
if (!room) return;

room.forEach(clientId => {
if (clientId !== excludeClientId) {
this.sendToClient(clientId, data);
}
});
}

/**
* 移除客户端
*/
removeClient(clientId) {
const client = this.clients.get(clientId);
if (!client) return;

// 从所有房间移除
client.rooms.forEach(roomId => {
this.handleLeaveRoom(clientId, { roomId });
});

this.clients.delete(clientId);
}

/**
* 启动心跳检测
*/
startHeartbeat() {
setInterval(() => {
this.clients.forEach((client, clientId) => {
if (client.isAlive === false) {
console.log(`客户端${clientId}心跳超时,断开连接`);
client.ws.terminate();
this.removeClient(clientId);
return;
}

client.isAlive = false;
client.ws.ping();
});
}, this.heartbeatInterval);
}

/**
* 启动服务器
*/
start() {
this.server.listen(this.port, () => {
console.log(`WebSocket服务器运行在端口${this.port}`);
});
}

/**
* 获取统计信息
*/
getStats() {
return {
clientCount: this.clients.size,
roomCount: this.rooms.size,
clients: Array.from(this.clients.values()).map(c => ({
id: c.id,
rooms: Array.from(c.rooms),
metadata: c.metadata
}))
};
}
}

// 启动服务器
const server = new WebSocketServer({ port: 8080 });
server.start();

总结

WebSocket是现代Web应用中实现实时双向通信的标准技术。本文从基础原理到高级应用,全面介绍了WebSocket的使用方法和最佳实践。

WebSocket的核心优势:

  • 真正的全双工通信
  • 低延迟、高效率
  • 支持文本和二进制数据
  • 广泛的浏览器支持

关键实现要点:

  • 完善的重连机制
  • 心跳保活检测
  • 消息队列管理
  • 错误处理和降级
  • 安全防护措施

适用场景:

  • 即时聊天和通讯
  • 实时协作编辑
  • 在线游戏
  • 实时数据监控
  • 视频直播互动

选择建议:

  • 需要双向实时通信时,WebSocket是首选
  • 只需服务器推送时,可以考虑SSE
  • 需要超低延迟或P2P通信时,考虑WebRTC
  • 需要兼容老旧浏览器时,使用长轮询作为降级方案

通过本文提供的完整解决方案,您可以在项目中快速实现一个生产级的WebSocket系统,为用户提供流畅的实时交互体验。