分布式系统
分布式系统概述
分布式系统是由多个独立计算机(节点)组成的系统,这些节点通过网络进行通信和协作,以实现共同的目标。从外部看,分布式系统呈现为一个统一的整体,用户感知不到其内部的分布式特性。
在现代Web应用中,分布式系统已经成为标配,特别是对于高可用、高性能、可扩展的应用程序。Node.js作为一个轻量级、高性能的运行时,非常适合构建分布式系统的各个组件。
分布式系统的核心概念
1. 分布式系统的特性
并发性
分布式系统中的多个节点可以同时执行任务,提高系统的整体吞吐量。Node.js的事件驱动、非阻塞I/O模型天生适合处理并发请求。
无共享性
分布式系统中的节点通常不共享内存,而是通过消息传递进行通信。这要求系统设计时考虑数据一致性、并发控制等问题。
故障独立性
分布式系统中的节点可能独立发生故障,系统需要设计为能够容忍部分节点故障而不影响整体可用性。
透明性
分布式系统应该对用户和应用程序隐藏其分布式特性,提供统一的接口和体验。透明性包括位置透明、迁移透明、复制透明、并发透明、故障透明等。
2. 分布式系统的挑战
网络通信问题
网络延迟、带宽限制、网络分区等问题是分布式系统面临的主要挑战之一。
数据一致性
在分布式系统中,保证数据一致性是一个复杂的问题,特别是在网络分区的情况下。
分布式协调
分布式系统中的节点需要协调彼此的行为,以完成共同的任务。
故障检测与恢复
在分布式系统中,节点故障是常态,如何快速检测故障并进行恢复是一个重要挑战。
安全性
分布式系统的安全性更加复杂,需要考虑通信安全、认证授权、数据保护等多个方面。
分布式系统基础理论
1. CAP定理
CAP定理指出,在一个分布式系统中,不可能同时满足以下三个特性:
- 一致性(Consistency):所有节点在同一时间看到相同的数据
- 可用性(Availability):系统在任何时候都能响应客户端请求
- 分区容错性(Partition tolerance):系统在网络分区的情况下仍然能够继续运行
在实际的分布式系统设计中,通常需要根据业务需求在这三个特性之间进行权衡。大多数分布式系统选择AP(可用性和分区容错性)或CP(一致性和分区容错性),而不是CA(一致性和可用性),因为在分布式环境中,网络分区是不可避免的。
2. BASE理论
BASE理论是对CAP定理的延伸,它指出分布式系统可以通过牺牲强一致性来换取可用性和性能:
- 基本可用(Basically Available):系统在故障发生时仍然能够提供基本的功能
- 软状态(Soft State):系统允许存在中间状态,这些状态不会影响系统的整体可用性
- 最终一致性(Eventually Consistent):系统中的所有节点最终会达到一致状态,但不需要实时保持一致
BASE理论是许多分布式系统(如NoSQL数据库、消息队列等)的设计基础。
3. 分布式事务
分布式事务是指涉及多个节点的事务,在分布式系统中实现事务一致性是一个挑战。常见的分布式事务处理模式包括:
- 两阶段提交(2PC):通过协调者和参与者的两阶段交互来保证事务的原子性
- 三阶段提交(3PC):在2PC的基础上增加了超时机制和准备阶段,提高了系统的可用性
- SAGA模式:将大事务拆分为多个小事务,每个小事务由一个服务负责,失败时执行补偿操作
- TCC(Try-Confirm-Cancel):通过预留资源、确认操作、取消操作三个阶段来实现分布式事务
4. 分布式锁
分布式锁是在分布式环境中实现互斥访问的机制,常见的实现方式包括:
- 基于数据库的分布式锁:通过数据库的唯一约束来实现互斥
- 基于Redis的分布式锁:利用Redis的原子操作来实现锁机制
- 基于ZooKeeper的分布式锁:利用ZooKeeper的临时节点和监听机制来实现锁
Node.js与分布式系统
1. Node.js在分布式系统中的优势
- 高性能:Node.js的事件驱动、非阻塞I/O模型使其能够高效处理大量并发连接
- 轻量级:Node.js应用通常占用较少的资源,适合部署在多个节点上
- JavaScript生态:丰富的npm包和工具使开发分布式系统变得更加容易
- 跨平台:Node.js可以在各种操作系统上运行,增加了部署的灵活性
- 异步编程模型:Node.js的异步编程模型天然适合处理分布式系统中的网络延迟和并发问题
2. Node.js分布式系统架构模式
微服务架构
微服务架构将复杂的应用程序拆分为小型、独立的服务,每个服务负责特定的业务功能。Node.js非常适合构建微服务,因为它轻量级、高性能,并且有丰富的框架支持(如Express、Koa、NestJS等)。
API网关模式
API网关作为分布式系统的统一入口,负责请求路由、负载均衡、认证授权、限流熔断等功能。Node.js可以使用Express、Fastify等框架快速构建高性能的API网关。
事件驱动架构
事件驱动架构通过事件的发布和订阅来实现组件间的解耦。Node.js的事件模块和丰富的消息队列客户端(如kafka-node、amqplib等)使其非常适合构建事件驱动的分布式系统。
服务网格模式
服务网格是一种基础设施层,负责服务间通信、服务发现、负载均衡、流量管理、安全等功能。虽然服务网格通常使用Go、Rust等语言实现,但Node.js应用可以作为服务网格中的服务提供者和消费者。
Node.js分布式系统核心组件
1. 服务发现
服务发现在分布式系统中扮演着重要角色,它允许服务找到彼此并进行通信。
基于etcd的服务发现
const Etcd = require('node-etcd');
const etcd = new Etcd(['localhost:2379']);
class ServiceRegistry {
constructor(serviceName, servicePort) {
this.serviceName = serviceName;
this.servicePort = servicePort;
this.serviceId = `${serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
this.registered = false;
}
async register() {
try {
// 注册服务
await etcd.set(
`/services/${this.serviceName}/${this.serviceId}`,
JSON.stringify({
id: this.serviceId,
name: this.serviceName,
address: 'localhost',
port: this.servicePort,
createdAt: new Date().toISOString()
})
);
// 设置TTL,定期刷新以保持服务活跃
this.ttlInterval = setInterval(async () => {
try {
await etcd.set(
`/services/${this.serviceName}/${this.serviceId}`,
JSON.stringify({
id: this.serviceId,
name: this.serviceName,
address: 'localhost',
port: this.servicePort,
updatedAt: new Date().toISOString()
}),
{
ttl: 60 // 60秒过期
}
);
} catch (error) {
console.error('Failed to refresh service TTL:', error);
}
}, 30000); // 每30秒刷新一次
this.registered = true;
console.log(`Service ${this.serviceId} registered with etcd`);
} catch (error) {
console.error('Failed to register service:', error);
throw error;
}
}
async discoverService(serviceName) {
try {
const services = await etcd.get(`/services/${serviceName}`);
const serviceInstances = services.node.nodes.map(node => JSON.parse(node.value));
// 简单的负载均衡:随机选择一个实例
if (serviceInstances.length > 0) {
const randomIndex = Math.floor(Math.random() * serviceInstances.length);
return serviceInstances[randomIndex];
}
return null;
} catch (error) {
console.error(`Failed to discover service ${serviceName}:`, error);
return null;
}
}
async unregister() {
if (this.registered) {
try {
await etcd.del(`/services/${this.serviceName}/${this.serviceId}`);
if (this.ttlInterval) {
clearInterval(this.ttlInterval);
}
this.registered = false;
console.log(`Service ${this.serviceId} unregistered from etcd`);
} catch (error) {
console.error('Failed to unregister service:', error);
}
}
}
}
// 使用示例
async function startService() {
const registry = new ServiceRegistry('user-service', 3000);
try {
// 注册服务
await registry.register();
// 发现其他服务
const productService = await registry.discoverService('product-service');
if (productService) {
console.log(`Found product service at ${productService.address}:${productService.port}`);
}
// 当服务关闭时取消注册
process.on('SIGTERM', async () => {
await registry.unregister();
process.exit(0);
});
} catch (error) {
console.error('Service startup failed:', error);
}
}
startService();
基于Consul的服务发现
const Consul = require('consul');
const consul = new Consul({
host: 'localhost',
port: 8500
});
class ConsulServiceRegistry {
constructor(serviceName, servicePort) {
this.serviceName = serviceName;
this.servicePort = servicePort;
this.serviceId = `${serviceName}-${Date.now()}`;
}
async register() {
try {
// 注册服务
await consul.agent.service.register({
id: this.serviceId,
name: this.serviceName,
address: 'localhost',
port: this.servicePort,
check: {
http: `http://localhost:${this.servicePort}/health`,
interval: '10s',
timeout: '5s',
deregisterCriticalServiceAfter: '1m'
}
});
console.log(`Service ${this.serviceId} registered with Consul`);
} catch (error) {
console.error('Failed to register service with Consul:', error);
throw error;
}
}
async discoverService(serviceName) {
try {
// 获取健康的服务实例
const result = await consul.health.service(serviceName, { passing: true });
const healthyInstances = result.map(item => item.Service);
if (healthyInstances.length > 0) {
// 简单的负载均衡:随机选择
const randomIndex = Math.floor(Math.random() * healthyInstances.length);
return healthyInstances[randomIndex];
}
return null;
} catch (error) {
console.error(`Failed to discover service ${serviceName}:`, error);
return null;
}
}
async unregister() {
try {
await consul.agent.service.deregister(this.serviceId);
console.log(`Service ${this.serviceId} unregistered from Consul`);
} catch (error) {
console.error('Failed to unregister service from Consul:', error);
}
}
}
// 使用示例
const express = require('express');
const app = express();
const PORT = 3000;
app.get('/health', (req, res) => {
res.status(200).json({ status: 'UP' });
});
app.listen(PORT, async () => {
console.log(`Server running on port ${PORT}`);
// 注册服务
const registry = new ConsulServiceRegistry('user-service', PORT);
await registry.register();
// 当进程终止时取消注册
process.on('SIGTERM', async () => {
await registry.unregister();
process.exit(0);
});
});
2. 分布式配置管理
分布式配置管理允许在运行时集中管理和更新各个服务的配置。
基于etcd的配置管理
const Etcd = require('node-etcd');
const etcd = new Etcd(['localhost:2379']);
class ConfigManager {
constructor(serviceName) {
this.serviceName = serviceName;
this.config = {};
this.watchers = new Map();
}
async loadConfig() {
try {
// 加载基础配置
const baseConfig = await this.getConfigValue(`config/base`);
// 加载服务特定配置
const serviceConfig = await this.getConfigValue(`config/${this.serviceName}`);
// 合并配置,服务特定配置优先级更高
this.config = { ...baseConfig, ...serviceConfig };
console.log(`Config loaded for service ${this.serviceName}`);
return this.config;
} catch (error) {
console.error('Failed to load config:', error);
throw error;
}
}
async getConfigValue(key) {
try {
const result = await etcd.get(key);
return JSON.parse(result.node.value);
} catch (error) {
if (error.errorCode === 100) { // Key not found
return {};
}
throw error;
}
}
async setConfigValue(key, value) {
try {
await etcd.set(`config/${this.serviceName}/${key}`, JSON.stringify(value));
console.log(`Config value ${key} set for service ${this.serviceName}`);
} catch (error) {
console.error(`Failed to set config value ${key}:`, error);
throw error;
}
}
// 监听配置变化
watchConfigChanges(callback) {
const watchKey = `config/${this.serviceName}`;
// 创建监听器
const watcher = etcd.watcher(watchKey, null, { recursive: true });
watcher.on('change', async (result) => {
try {
// 重新加载配置
await this.loadConfig();
// 通知回调
callback(this.config);
} catch (error) {
console.error('Failed to handle config change:', error);
}
});
watcher.on('error', (error) => {
console.error('Config watcher error:', error);
});
// 存储监听器引用以便后续可以取消监听
this.watchers.set(watchKey, watcher);
return () => {
watcher.stop();
this.watchers.delete(watchKey);
};
}
get(key, defaultValue = null) {
// 支持点表示法访问嵌套属性
if (key.includes('.')) {
return this.getNestedValue(this.config, key, defaultValue);
}
return this.config[key] !== undefined ? this.config[key] : defaultValue;
}
getNestedValue(obj, path, defaultValue) {
const keys = path.split('.');
let current = obj;
for (const key of keys) {
if (current === null || current === undefined) {
return defaultValue;
}
current = current[key];
}
return current !== undefined ? current : defaultValue;
}
}
// 使用示例
async function initConfig() {
const configManager = new ConfigManager('user-service');
// 加载初始配置
await configManager.loadConfig();
// 获取配置值
const dbHost = configManager.get('database.host', 'localhost');
const dbPort = configManager.get('database.port', 5432);
console.log(`Database configuration: ${dbHost}:${dbPort}`);
// 监听配置变化
const unwatch = configManager.watchConfigChanges((newConfig) => {
console.log('Configuration updated:', newConfig);
// 这里可以实现配置热更新逻辑
// 例如,重新连接数据库、更新日志级别等
});
// 当服务关闭时取消监听
process.on('SIGTERM', () => {
unwatch();
process.exit(0);
});
}
initConfig();
基于Consul的配置管理
const Consul = require('consul');
const consul = new Consul({
host: 'localhost',
port: 8500
});
class ConsulConfigManager {
constructor(serviceName) {
this.serviceName = serviceName;
this.config = {};
this.watchers = new Map();
}
async loadConfig() {
try {
// 获取基础配置
const baseConfig = await this.getConfigPrefix('config/base/');
// 获取服务特定配置
const serviceConfig = await this.getConfigPrefix(`config/${this.serviceName}/`);
// 合并配置
this.config = { ...baseConfig, ...serviceConfig };
console.log(`Config loaded for service ${this.serviceName}`);
return this.config;
} catch (error) {
console.error('Failed to load config:', error);
throw error;
}
}
async getConfigPrefix(prefix) {
try {
const result = await consul.kv.get({ key: prefix, recurse: true });
if (!result || result.length === 0) {
return {};
}
const config = {};
// 处理嵌套配置
for (const item of result) {
// 移除前缀
const key = item.Key.replace(prefix, '');
if (key) { // 忽略空键
try {
// 尝试解析JSON值
config[key] = JSON.parse(item.Value);
} catch (e) {
// 如果不是JSON,则直接使用原始值
config[key] = item.Value;
}
}
}
return config;
} catch (error) {
console.error(`Failed to get config for prefix ${prefix}:`, error);
return {};
}
}
async setConfigValue(key, value) {
try {
const consulKey = `config/${this.serviceName}/${key}`;
const consulValue = typeof value === 'string' ? value : JSON.stringify(value);
await consul.kv.set(consulKey, consulValue);
console.log(`Config value ${key} set for service ${this.serviceName}`);
} catch (error) {
console.error(`Failed to set config value ${key}:`, error);
throw error;
}
}
// 监听配置变化
watchConfigChanges(callback) {
const watchKey = `config/${this.serviceName}/`;
let index = 0;
const watch = async () => {
try {
const result = await consul.kv.get({
key: watchKey,
recurse: true,
index: index // 上次的索引,用于长轮询
});
if (result) {
index = result[0].ModifyIndex;
// 重新加载配置
await this.loadConfig();
// 通知回调
callback(this.config);
}
} catch (error) {
console.error('Config watch error:', error);
} finally {
// 继续监听
setTimeout(watch, 100);
}
};
// 开始监听
watch();
// 返回取消监听的函数
return () => {
// 在实际实现中,这里应该有取消长轮询的逻辑
console.log('Config watching stopped');
};
}
get(key, defaultValue = null) {
return this.config[key] !== undefined ? this.config[key] : defaultValue;
}
}
// 使用示例
async function initConsulConfig() {
const configManager = new ConsulConfigManager('user-service');
// 加载初始配置
await configManager.loadConfig();
// 获取配置值
const apiKey = configManager.get('apiKey');
const timeout = configManager.get('timeout', 5000);
console.log(`API Key: ${apiKey ? 'set' : 'not set'}`);
console.log(`Timeout: ${timeout}ms`);
// 监听配置变化
const unwatch = configManager.watchConfigChanges((newConfig) => {
console.log('Configuration updated:', newConfig);
// 处理配置更新逻辑
});
// 当服务关闭时取消监听
process.on('SIGTERM', () => {
unwatch();
process.exit(0);
});
}
initConsulConfig();
// 尝试解析JSON值
config[key] = JSON.parse(item.Value);
} catch (e) {
// 如果不是JSON,则直接使用原始值
config[key] = item.Value;
}
}
}
return config;
} catch (error) {
console.error(`Failed to get config for prefix ${prefix}:`, error);
return {};
}
}
async setConfigValue(key, value) {
try {
const consulKey = `config/${this.serviceName}/${key}`;
const consulValue = typeof value === 'string' ? value : JSON.stringify(value);
await consul.kv.set(consulKey, consulValue);
console.log(`Config value ${key} set for service ${this.serviceName}`);
} catch (error) {
console.error(`Failed to set config value ${key}:`, error);
throw error;
}
}
// 监听配置变化
watchConfigChanges(callback) {
const watchKey = `config/${this.serviceName}/`;
let index = 0;
const watch = async () => {
try {
const result = await consul.kv.get({
key: watchKey,
recurse: true,
index: index // 上次的索引,用于长轮询
});
if (result) {
index = result[0].ModifyIndex;
// 重新加载配置
await this.loadConfig();
// 通知回调
callback(this.config);
}
} catch (error) {
console.error('Config watch error:', error);
} finally {
// 继续监听
setTimeout(watch, 100);
}
};
// 开始监听
watch();
// 返回取消监听的函数
return () => {
// 在实际实现中,这里应该有取消长轮询的逻辑
console.log('Config watching stopped');
};
}
get(key, defaultValue = null) {
return this.config[key] !== undefined ? this.config[key] : defaultValue;
}
}
// 使用示例
async function initConsulConfig() {
const configManager = new ConsulConfigManager('user-service');
// 加载初始配置
await configManager.loadConfig();
// 获取配置值
const apiKey = configManager.get('apiKey');
const timeout = configManager.get('timeout', 5000);
console.log(`API Key: ${apiKey ? 'set' : 'not set'}`);
console.log(`Timeout: ${timeout}ms`);
// 监听配置变化
const unwatch = configManager.watchConfigChanges((newConfig) => {
console.log('Configuration updated:', newConfig);
// 处理配置更新逻辑
});
// 当服务关闭时取消监听
process.on('SIGTERM', () => {
unwatch();
process.exit(0);
});
}
initConsulConfig();
3. 分布式锁实现
分布式锁是分布式系统中实现互斥访问的重要机制。
基于Redis的分布式锁
const Redis = require('ioredis');
class RedisDistributedLock {
constructor(redisOptions = {}) {
this.redis = new Redis(redisOptions);
}
async acquireLock(resourceId, lockTimeout = 30000, acquireTimeout = 10000) {
const lockKey = `lock:${resourceId}`;
const lockValue = this._generateUniqueId();
const startTime = Date.now();
// 尝试获取锁,直到成功或超时
while (Date.now() - startTime < acquireTimeout) {
try {
// 使用SET命令的NX(不存在才设置)和PX(过期时间)选项
const result = await this.redis.set(lockKey, lockValue, 'NX', 'PX', lockTimeout);
if (result === 'OK') {
console.log(`Lock acquired for resource ${resourceId}`);
// 返回释放锁的函数
return {
release: async () => {
await this._releaseLock(lockKey, lockValue);
},
resourceId,
lockValue
};
}
// 短暂休眠后重试
await this._sleep(100);
} catch (error) {
console.error(`Error acquiring lock for resource ${resourceId}:`, error);
throw error;
}
}
// 超时
throw new Error(`Failed to acquire lock for resource ${resourceId} within ${acquireTimeout}ms`);
}
async _releaseLock(lockKey, lockValue) {
try {
// 使用Lua脚本确保原子性地释放锁(只释放自己的锁)
const script = `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
`;
const result = await this.redis.eval(script, 1, lockKey, lockValue);
if (result === 1) {
console.log(`Lock released for key ${lockKey}`);
} else {
console.log(`Failed to release lock for key ${lockKey}: lock may have expired or been acquired by another process`);
}
} catch (error) {
console.error(`Error releasing lock for key ${lockKey}:`, error);
}
}
// 生成唯一ID
_generateUniqueId() {
return `${process.pid}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
// 休眠函数
_sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// 自动续期锁(防止锁在长时间操作中过期)
async renewLock(resourceId, lockValue, lockTimeout = 30000) {
const lockKey = `lock:${resourceId}`;
try {
// 使用Lua脚本确保只续期自己的锁
const script = `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('pexpire', KEYS[1], ARGV[2])
else
return 0
end
`;
const result = await this.redis.eval(script, 1, lockKey, lockValue, lockTimeout);
if (result === 1) {
console.log(`Lock renewed for resource ${resourceId}`);
return true;
} else {
console.log(`Failed to renew lock for resource ${resourceId}`);
return false;
}
} catch (error) {
console.error(`Error renewing lock for resource ${resourceId}:`, error);
return false;
}
}
// 清理资源
async disconnect() {
try {
await this.redis.disconnect();
} catch (error) {
console.error('Error disconnecting from Redis:', error);
}
}
}
// 使用示例
async function processSharedResource(resourceId) {
const lockClient = new RedisDistributedLock({ host: 'localhost', port: 6379 });
let lock = null;
try {
// 获取锁
lock = await lockClient.acquireLock(resourceId);
// 开始自动续期
const renewInterval = setInterval(async () => {
if (lock) {
await lockClient.renewLock(resourceId, lock.lockValue);
}
}, 10000); // 每10秒续期一次
try {
// 处理共享资源
console.log(`Processing shared resource ${resourceId}...`);
// 模拟长时间操作
await new Promise(resolve => setTimeout(resolve, 35000));
console.log(`Finished processing shared resource ${resourceId}`);
} finally {
// 清除续期定时器
clearInterval(renewInterval);
// 释放锁
if (lock) {
await lock.release();
}
}
} catch (error) {
console.error(`Error processing resource ${resourceId}:`, error);
} finally {
// 断开连接
await lockClient.disconnect();
}
}
// 测试并发访问
async function testConcurrentAccess() {
const resourceId = 'shared-database';
// 模拟多个并发请求
const promises = [];
for (let i = 0; i < 3; i++) {
promises.push(processSharedResource(resourceId));
}
await Promise.all(promises);
}
testConcurrentAccess();
4. 分布式事务处理
SAGA模式实现
class Saga {
constructor() {
this.steps = [];
this.compensationSteps = [];
}
// 添加一个步骤和对应的补偿操作
addStep(stepFunction, compensationFunction) {
this.steps.push(stepFunction);
this.compensationSteps.push(compensationFunction);
return this;
}
// 执行SAGA事务
async execute(context) {
const executedSteps = [];
try {
// 执行每个步骤
for (let i = 0; i < this.steps.length; i++) {
const stepFunction = this.steps[i];
console.log(`Executing step ${i + 1}/${this.steps.length}`);
// 执行步骤并保存结果到上下文
await stepFunction(context);
executedSteps.push(i);
}
console.log('SAGA transaction completed successfully');
return { success: true, context };
} catch (error) {
console.error(`SAGA transaction failed at step ${executedSteps.length + 1}:`, error);
// 执行补偿操作(按相反顺序)
await this._compensate(executedSteps, context, error);
return { success: false, error, context };
}
}
// 执行补偿操作
async _compensate(executedSteps, context, error) {
context.error = error;
// 按相反顺序执行补偿
for (let i = executedSteps.length - 1; i >= 0; i--) {
const stepIndex = executedSteps[i];
const compensationFunction = this.compensationSteps[stepIndex];
try {
console.log(`Executing compensation for step ${stepIndex + 1}`);
await compensationFunction(context);
} catch (compensationError) {
console.error(`Compensation failed for step ${stepIndex + 1}:`, compensationError);
// 记录补偿失败,但继续尝试补偿其他步骤
}
}
}
}
// 使用示例:处理订单创建流程
async function createOrderSaga(orderData) {
// 创建订单SAGA实例
const orderSaga = new Saga();
// 1. 创建订单
orderSaga.addStep(
async (context) => {
// 创建订单记录
const order = await createOrderRecord(orderData);
context.orderId = order.id;
context.order = order;
console.log(`Order created with ID: ${order.id}`);
},
async (context) => {
// 补偿:取消订单
if (context.orderId) {
await cancelOrder(context.orderId);
console.log(`Order ${context.orderId} cancelled`);
}
}
);
// 2. 预留库存
orderSaga.addStep(
async (context) => {
// 调用库存服务预留库存
await reserveInventory(context.orderId, orderData.items);
console.log(`Inventory reserved for order ${context.orderId}`);
},
async (context) => {
// 补偿:释放预留的库存
if (context.orderId) {
await releaseInventory(context.orderId, orderData.items);
console.log(`Inventory released for order ${context.orderId}`);
}
}
);
// 3. 处理支付
orderSaga.addStep(
async (context) => {
// 调用支付服务处理支付
const paymentResult = await processPayment({
orderId: context.orderId,
amount: orderData.totalAmount,
paymentMethod: orderData.paymentMethod
});
context.paymentId = paymentResult.id;
console.log(`Payment processed with ID: ${paymentResult.id}`);
},
async (context) => {
// 补偿:退款
if (context.paymentId) {
await refundPayment(context.paymentId);
console.log(`Payment ${context.paymentId} refunded`);
}
}
);
// 4. 发送确认邮件
orderSaga.addStep(
async (context) => {
// 发送订单确认邮件
await sendOrderConfirmationEmail(orderData.customerEmail, context.orderId);
console.log(`Order confirmation email sent for order ${context.orderId}`);
},
async (context) => {
// 补偿:发送订单失败邮件
await sendOrderFailedEmail(orderData.customerEmail, context.orderId, context.error);
console.log(`Order failed email sent for order ${context.orderId}`);
}
);
// 执行SAGA事务
const context = {};
const result = await orderSaga.execute(context);
return result;
}
// 模拟服务函数
async function createOrderRecord(data) {
// 模拟数据库操作
return new Promise((resolve) => {
setTimeout(() => {
resolve({
id: `order-${Date.now()}`,
...data,
status: 'PENDING',
createdAt: new Date().toISOString()
});
}, 100);
});
}
async function cancelOrder(orderId) {
// 模拟取消订单操作
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Order ${orderId} cancelled in database`);
resolve();
}, 100);
});
}
async function reserveInventory(orderId, items) {
// 模拟库存服务调用
return new Promise((resolve, reject) => {
setTimeout(() => {
// 模拟随机失败(用于测试补偿)
if (Math.random() < 0.1) {
reject(new Error('Inventory reservation failed: insufficient stock'));
}
resolve();
}, 150);
});
}
async function releaseInventory(orderId, items) {
// 模拟释放库存操作
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Inventory released for order ${orderId}`);
resolve();
}, 100);
});
}
async function processPayment(paymentData) {
// 模拟支付服务调用
return new Promise((resolve, reject) => {
setTimeout(() => {
// 模拟随机失败(用于测试补偿)
if (Math.random() < 0.1) {
reject(new Error('Payment processing failed: insufficient funds'));
}
resolve({
id: `payment-${Date.now()}`,
status: 'COMPLETED',
...paymentData
});
}, 200);
});
}
async function refundPayment(paymentId) {
// 模拟退款操作
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Payment ${paymentId} refunded`);
resolve();
}, 150);
});
}
async function sendOrderConfirmationEmail(email, orderId) {
// 模拟发送邮件
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Confirmation email sent to ${email} for order ${orderId}`);
resolve();
}, 50);
});
}
async function sendOrderFailedEmail(email, orderId, error) {
// 模拟发送失败邮件
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Failed email sent to ${email} for order ${orderId}: ${error.message}`);
resolve();
}, 50);
});
}
// 测试SAGA事务
async function testOrderSaga() {
const orderData = {
customerId: 'customer-123',
customerEmail: 'customer@example.com',
items: [
{ productId: 'product-1', quantity: 2, price: 99.99 },
{ productId: 'product-2', quantity: 1, price: 199.99 }
],
totalAmount: 399.97,
paymentMethod: 'credit_card'
};
const result = await createOrderSaga(orderData);
if (result.success) {
console.log('Order processing succeeded!');
} else {
console.log('Order processing failed:', result.error.message);
}
}
testOrderSaga();
Node.js分布式系统通信
1. HTTP/REST通信
HTTP/REST是分布式系统中最常用的通信方式之一。
const axios = require('axios');
class RestClient {
constructor(baseURL, options = {}) {
this.client = axios.create({
baseURL,
timeout: options.timeout || 10000,
headers: {
'Content-Type': 'application/json',
...options.headers
},
...options.axiosOptions
});
// 添加请求拦截器
this.client.interceptors.request.use(
(config) => {
// 可以在这里添加认证信息、日志记录等
console.log(`Request to ${config.method.toUpperCase()} ${config.url}`);
return config;
},
(error) => {
return Promise.reject(error);
}
);
// 添加响应拦截器
this.client.interceptors.response.use(
(response) => {
// 处理成功响应
console.log(`Response from ${response.config.url}: ${response.status}`);
return response.data;
},
(error) => {
// 统一错误处理
console.error(`Error response from ${error.config?.url || 'unknown'}:`, error.message);
// 处理特定错误码
if (error.response) {
switch (error.response.status) {
case 401:
console.error('Authentication failed');
break;
case 404:
console.error('Resource not found');
break;
case 500:
console.error('Server error');
break;
}
}
return Promise.reject(error);
}
);
}
// GET请求
async get(url, params = {}, config = {}) {
try {
return await this.client.get(url, { params, ...config });
} catch (error) {
this._handleError(error);
throw error;
}
}
// POST请求
async post(url, data = {}, config = {}) {
try {
return await this.client.post(url, data, config);
} catch (error) {
this._handleError(error);
throw error;
}
}
// PUT请求
async put(url, data = {}, config = {}) {
try {
return await this.client.put(url, data, config);
} catch (error) {
this._handleError(error);
throw error;
}
}
// DELETE请求
async delete(url, config = {}) {
try {
return await this.client.delete(url, config);
} catch (error) {
this._handleError(error);
throw error;
}
}
// 错误处理辅助方法
_handleError(error) {
// 可以在这里添加错误日志、报警等
console.error('API request failed:', error.message);
}
// 关闭客户端(如果需要)
async close() {
// axios没有显式的关闭方法,但可以清理拦截器等资源
console.log('REST client closed');
}
}
// 使用示例
async function example() {
// 创建用户服务的REST客户端
const userServiceClient = new RestClient('http://user-service:3000/api', {
timeout: 5000,
headers: {
'X-Service-Name': 'order-service'
}
});
try {
// 获取用户信息
const user = await userServiceClient.get('/users/123');
console.log('User data:', user);
// 创建新用户
const newUser = await userServiceClient.post('/users', {
name: 'John Doe',
email: 'john@example.com',
age: 30
});
console.log('Created new user:', newUser);
// 更新用户
await userServiceClient.put('/users/123', {
name: 'Jane Doe',
email: 'jane@example.com'
});
console.log('User updated');
// 删除用户
await userServiceClient.delete('/users/456');
console.log('User deleted');
} catch (error) {
console.error('Error in API calls:', error);
} finally {
// 关闭客户端
await userServiceClient.close();
}
}
example(); const stepFunction = this.steps[i];
console.log(`Executing step ${i + 1}/${this.steps.length}`);
// 执行步骤并保存结果到上下文
await stepFunction(context);
executedSteps.push(i);
}
console.log('SAGA transaction completed successfully');
return { success: true, context };
} catch (error) {
console.error(`SAGA transaction failed at step ${executedSteps.length + 1}:`, error);
// 执行补偿操作(按相反顺序)
await this._compensate(executedSteps, context, error);
return { success: false, error, context };
}
}
// 执行补偿操作
async _compensate(executedSteps, context, error) {
context.error = error;
// 按相反顺序执行补偿
for (let i = executedSteps.length - 1; i >= 0; i--) {
const stepIndex = executedSteps[i];
const compensationFunction = this.compensationSteps[stepIndex];
try {
console.log(`Executing compensation for step ${stepIndex + 1}`);
await compensationFunction(context);
} catch (compensationError) {
console.error(`Compensation failed for step ${stepIndex + 1}:`, compensationError);
// 记录补偿失败,但继续尝试补偿其他步骤
}
}
}
}
// 使用示例:处理订单创建流程
async function createOrderSaga(orderData) {
// 创建订单SAGA实例
const orderSaga = new Saga();
// 1. 创建订单
orderSaga.addStep(
async (context) => {
// 创建订单记录
const order = await createOrderRecord(orderData);
context.orderId = order.id;
context.order = order;
console.log(`Order created with ID: ${order.id}`);
},
async (context) => {
// 补偿:取消订单
if (context.orderId) {
await cancelOrder(context.orderId);
console.log(`Order ${context.orderId} cancelled`);
}
}
);
// 2. 预留库存
orderSaga.addStep(
async (context) => {
// 调用库存服务预留库存
await reserveInventory(context.orderId, orderData.items);
console.log(`Inventory reserved for order ${context.orderId}`);
},
async (context) => {
// 补偿:释放预留的库存
if (context.orderId) {
await releaseInventory(context.orderId, orderData.items);
console.log(`Inventory released for order ${context.orderId}`);
}
}
);
// 3. 处理支付
orderSaga.addStep(
async (context) => {
// 调用支付服务处理支付
const paymentResult = await processPayment({
orderId: context.orderId,
amount: orderData.totalAmount,
paymentMethod: orderData.paymentMethod
});
context.paymentId = paymentResult.id;
console.log(`Payment processed with ID: ${paymentResult.id}`);
},
async (context) => {
// 补偿:退款
if (context.paymentId) {
await refundPayment(context.paymentId);
console.log(`Payment ${context.paymentId} refunded`);
}
}
);
// 4. 发送确认邮件
orderSaga.addStep(
async (context) => {
// 发送订单确认邮件
await sendOrderConfirmationEmail(orderData.customerEmail, context.orderId);
console.log(`Order confirmation email sent for order ${context.orderId}`);
},
async (context) => {
// 补偿:发送订单失败邮件
await sendOrderFailedEmail(orderData.customerEmail, context.orderId, context.error);
console.log(`Order failed email sent for order ${context.orderId}`);
}
);
// 执行SAGA事务
const context = {};
const result = await orderSaga.execute(context);
return result;
}
// 模拟服务函数
async function createOrderRecord(data) {
// 模拟数据库操作
return new Promise((resolve) => {
setTimeout(() => {
resolve({
id: `order-${Date.now()}`,
...data,
status: 'PENDING',
createdAt: new Date().toISOString()
});
}, 100);
});
}
async function cancelOrder(orderId) {
// 模拟取消订单操作
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Order ${orderId} cancelled in database`);
resolve();
}, 100);
});
}
async function reserveInventory(orderId, items) {
// 模拟库存服务调用
return new Promise((resolve, reject) => {
setTimeout(() => {
// 模拟随机失败(用于测试补偿)
if (Math.random() < 0.1) {
reject(new Error('Inventory reservation failed: insufficient stock'));
}
resolve();
}, 150);
});
}
async function releaseInventory(orderId, items) {
// 模拟释放库存操作
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Inventory released for order ${orderId}`);
resolve();
}, 100);
});
}
async function processPayment(paymentData) {
// 模拟支付服务调用
return new Promise((resolve, reject) => {
setTimeout(() => {
// 模拟随机失败(用于测试补偿)
if (Math.random() < 0.1) {
reject(new Error('Payment processing failed: insufficient funds'));
}
resolve({
id: `payment-${Date.now()}`,
status: 'COMPLETED',
...paymentData
});
}, 200);
});
}
async function refundPayment(paymentId) {
// 模拟退款操作
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Payment ${paymentId} refunded`);
resolve();
}, 150);
});
}
async function sendOrderConfirmationEmail(email, orderId) {
// 模拟发送邮件
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Confirmation email sent to ${email} for order ${orderId}`);
resolve();
}, 50);
});
}
async function sendOrderFailedEmail(email, orderId, error) {
// 模拟发送失败邮件
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Failed email sent to ${email} for order ${orderId}: ${error.message}`);
resolve();
}, 50);
});
}
// 测试SAGA事务
async function testOrderSaga() {
const orderData = {
customerId: 'customer-123',
customerEmail: 'customer@example.com',
items: [
{ productId: 'product-1', quantity: 2, price: 99.99 },
{ productId: 'product-2', quantity: 1, price: 199.99 }
],
totalAmount: 399.97,
paymentMethod: 'credit_card'
};
const result = await createOrderSaga(orderData);
if (result.success) {
console.log('Order processing succeeded!');
} else {
console.log('Order processing failed:', result.error.message);
}
}
testOrderSaga();
Node.js分布式系统通信
1. HTTP/REST通信
HTTP/REST是分布式系统中最常用的通信方式之一。
const axios = require('axios');
class RestClient {
constructor(baseURL, options = {}) {
this.client = axios.create({
baseURL,
timeout: options.timeout || 10000,
headers: {
'Content-Type': 'application/json',
...options.headers
},
...options.axiosOptions
});
// 添加请求拦截器
this.client.interceptors.request.use(
(config) => {
// 可以在这里添加认证信息、日志记录等
console.log(`Request to ${config.method.toUpperCase()} ${config.url}`);
return config;
},
(error) => {
return Promise.reject(error);
}
);
// 添加响应拦截器
this.client.interceptors.response.use(
(response) => {
// 处理成功响应
console.log(`Response from ${response.config.url}: ${response.status}`);
return response.data;
},
(error) => {
// 统一错误处理
console.error(`Error response from ${error.config?.url || 'unknown'}:`, error.message);
// 处理特定错误码
if (error.response) {
switch (error.response.status) {
case 401:
console.error('Authentication failed');
break;
case 404:
console.error('Resource not found');
break;
case 500:
console.error('Server error');
break;
}
}
return Promise.reject(error);
}
);
}
// GET请求
async get(url, params = {}, config = {}) {
try {
return await this.client.get(url, { params, ...config });
} catch (error) {
this._handleError(error);
throw error;
}
}
// POST请求
async post(url, data = {}, config = {}) {
try {
return await this.client.post(url, data, config);
} catch (error) {
this._handleError(error);
throw error;
}
}
// PUT请求
async put(url, data = {}, config = {}) {
try {
return await this.client.put(url, data, config);
} catch (error) {
this._handleError(error);
throw error;
}
}
// DELETE请求
async delete(url, config = {}) {
try {
return await this.client.delete(url, config);
} catch (error) {
this._handleError(error);
throw error;
}
}
// 错误处理辅助方法
_handleError(error) {
// 可以在这里添加错误日志、报警等
console.error('API request failed:', error.message);
}
// 关闭客户端(如果需要)
async close() {
// axios没有显式的关闭方法,但可以清理拦截器等资源
console.log('REST client closed');
}
}
// 使用示例
async function example() {
// 创建用户服务的REST客户端
const userServiceClient = new RestClient('http://user-service:3000/api', {
timeout: 5000,
headers: {
'X-Service-Name': 'order-service'
}
});
try {
// 获取用户信息
const user = await userServiceClient.get('/users/123');
console.log('User data:', user);
// 创建新用户
const newUser = await userServiceClient.post('/users', {
name: 'John Doe',
email: 'john@example.com',
age: 30
});
console.log('Created new user:', newUser);
// 更新用户
await userServiceClient.put('/users/123', {
name: 'Jane Doe',
email: 'jane@example.com'
});
console.log('User updated');
// 删除用户
await userServiceClient.delete('/users/456');
console.log('User deleted');
} catch (error) {
console.error('Error in API calls:', error);
} finally {
// 关闭客户端
await userServiceClient.close();
}
}
example();
2. gRPC通信
对于需要高性能通信的场景,gRPC是一个很好的选择。
首先,定义服务接口(使用Protocol Buffers):
// user.proto
syntax = "proto3";
package user;
// 用户服务定义
service UserService {
// 获取用户信息
rpc GetUser (GetUserRequest) returns (User);
// 创建用户
rpc CreateUser (CreateUserRequest) returns (User);
// 更新用户
rpc UpdateUser (UpdateUserRequest) returns (User);
// 删除用户
rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse);
// 批量获取用户
rpc ListUsers (ListUsersRequest) returns (ListUsersResponse);
}
// 获取用户请求
message GetUserRequest {
string id = 1;
}
// 创建用户请求
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
// 更新用户请求
message UpdateUserRequest {
string id = 1;
optional string name = 2;
optional string email = 3;
optional int32 age = 4;
}
// 删除用户请求
message DeleteUserRequest {
string id = 1;
}
// 删除用户响应
message DeleteUserResponse {
bool success = 1;
string message = 2;
}
// 列取用户请求
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
optional string sort_by = 3;
optional bool ascending = 4;
}
// 列取用户响应
message ListUsersResponse {
repeated User users = 1;
int32 total = 2;
int32 page = 3;
int32 page_size = 4;
bool has_more = 5;
}
// 用户消息
message User {
string id = 1;
string name = 2;
string email = 3;
int32 age = 4;
string created_at = 5;
string updated_at = 6;
}
然后,实现gRPC服务器:
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
// 加载proto文件
const protoPath = path.join(__dirname, 'proto', 'user.proto');
const packageDefinition = protoLoader.loadSync(protoPath, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const userProto = grpc.loadPackageDefinition(packageDefinition).user;
// 模拟数据库
const users = new Map();
let nextId = 1;
// 实现服务方法
const userService = {
// 获取用户
getUser: (call, callback) => {
const userId = call.request.id;
const user = users.get(userId);
if (user) {
callback(null, user);
} else {
callback({
code: grpc.status.NOT_FOUND,
details: `User with ID ${userId} not found`
});
}
},
// 创建用户
createUser: (call, callback) => {
const userData = call.request;
const newUser = {
id: nextId.toString(),
name: userData.name,
email: userData.email,
age: userData.age,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString()
};
users.set(newUser.id, newUser);
nextId++;
callback(null, newUser);
},
// 更新用户
updateUser: (call, callback) => {
const userId = call.request.id;
const updateData = call.request;
const user = users.get(userId);
if (user) {
// 更新用户数据
const updatedUser = {
...user,
name: updateData.name !== undefined ? updateData.name : user.name,
email: updateData.email !== undefined ? updateData.email : user.email,
age: updateData.age !== undefined ? updateData.age : user.age,
updated_at: new Date().toISOString()
};
users.set(userId, updatedUser);
callback(null, updatedUser);
} else {
callback({
code: grpc.status.NOT_FOUND,
details: `User with ID ${userId} not found`
});
}
},
// 删除用户
deleteUser: (call, callback) => {
const userId = call.request.id;
const exists = users.delete(userId);
if (exists) {
callback(null, {
success: true,
message: `User with ID ${userId} deleted successfully`
});
} else {
callback({
code: grpc.status.NOT_FOUND,
details: `User with ID ${userId} not found`
});
}
},
// 列取用户
listUsers: (call, callback) => {
const request = call.request;
const page = request.page || 1;
const pageSize = request.page_size || 10;
const sortBy = request.sort_by || 'created_at';
const ascending = request.ascending !== undefined ? request.ascending : true;
// 转换Map为数组
let userArray = Array.from(users.values());
// 排序
userArray.sort((a, b) => {
if (a[sortBy] < b[sortBy]) return ascending ? -1 : 1;
if (a[sortBy] > b[sortBy]) return ascending ? 1 : -1;
return 0;
});
// 分页
const startIndex = (page - 1) * pageSize;
const endIndex = startIndex + pageSize;
const paginatedUsers = userArray.slice(startIndex, endIndex);
callback(null, {
users: paginatedUsers,
total: userArray.length,
page: page,
page_size: pageSize,
has_more: endIndex < userArray.length
});
}
};
// 创建gRPC服务器
function startServer() {
const server = new grpc.Server();
// 添加服务
server.addService(userProto.UserService.service, userService);
// 绑定端口并启动服务器
const port = '0.0.0.0:50051';
server.bindAsync(port, grpc.ServerCredentials.createInsecure(), (err, boundPort) => {
if (err) {
console.error(`Failed to bind server: ${err}`);
return;
}
server.start();
console.log(`gRPC server running on port ${boundPort}`);
});
// 处理进程终止信号
process.on('SIGTERM', () => {
console.log('Received SIGTERM, shutting down gRPC server...');
server.forceShutdown();
process.exit(0);
});
}
// 启动服务器
startServer();
创建gRPC客户端:
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
// 加载proto文件
const protoPath = path.join(__dirname, 'proto', 'user.proto');
const packageDefinition = protoLoader.loadSync(protoPath, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const userProto = grpc.loadPackageDefinition(packageDefinition).user;
class GrpcUserClient {
constructor(serviceAddress = 'localhost:50051') {
// 创建客户端连接
this.client = new userProto.UserService(
serviceAddress,
grpc.credentials.createInsecure()
);
}
// 获取用户
getUser(userId) {
return new Promise((resolve, reject) => {
this.client.getUser({ id: userId }, (error, user) => {
if (error) {
reject(error);
} else {
resolve(user);
}
});
});
}
// 创建用户
createUser(userData) {
return new Promise((resolve, reject) => {
this.client.createUser(userData, (error, user) => {
if (error) {
reject(error);
} else {
resolve(user);
}
});
});
}
// 更新用户
updateUser(userId, updateData) {
return new Promise((resolve, reject) => {
this.client.updateUser({ id: userId, ...updateData }, (error, user) => {
if (error) {
reject(error);
} else {
resolve(user);
}
});
});
}
// 删除用户
deleteUser(userId) {
return new Promise((resolve, reject) => {
this.client.deleteUser({ id: userId }, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// 列取用户
listUsers(options = {}) {
return new Promise((resolve, reject) => {
this.client.listUsers(options, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// 关闭连接
close() {
// gRPC客户端连接会自动管理,但可以在这里添加清理逻辑
console.log('gRPC client connection closed');
}
}
// 使用示例
async function example() {
const userClient = new GrpcUserClient();
try {
// 创建用户
const newUser = await userClient.createUser({
name: 'John Doe',
email: 'john@example.com',
age: 30
});
console.log('Created user:', newUser);
// 获取用户
const user = await userClient.getUser(newUser.id);
console.log('Retrieved user:', user);
// 更新用户
const updatedUser = await userClient.updateUser(newUser.id, {
name: 'Jane Doe',
age: 31
});
console.log('Updated user:', updatedUser);
// 列取用户
const userList = await userClient.listUsers({
page: 1,
page_size: 10,
sort_by: 'created_at',
ascending: false
});
console.log('User list:', userList);
// 删除用户
const deleteResponse = await userClient.deleteUser(newUser.id);
console.log('Delete response:', deleteResponse);
} catch (error) {
console.error('Error in gRPC call:', error);
} finally {
// 关闭客户端
userClient.close();
}
}
example();
3. 消息队列通信
消息队列是实现异步通信和服务解耦的有效方式。
基于Kafka的消息队列通信
const { Kafka } = require('kafkajs');
class KafkaClient {
constructor(config = {}) {
this.kafka = new Kafka({
clientId: config.clientId || 'node-app',
brokers: config.brokers || ['localhost:9092'],
...config.kafkaOptions
});
this.producer = this.kafka.producer();
this.consumer = this.kafka.consumer({
groupId: config.groupId || 'default-group'
});
this.connected = false;
}
// 连接到Kafka
async connect() {
if (!this.connected) {
try {
await Promise.all([
this.producer.connect(),
this.consumer.connect()
]);
this.connected = true;
console.log('Connected to Kafka');
} catch (error) {
console.error('Failed to connect to Kafka:', error);
throw error;
}
}
}
// 发送消息
async sendMessage(topic, messages, options = {}) {
if (!this.connected) {
await this.connect();
}
try {
// 确保消息格式正确
const formattedMessages = messages.map(msg => ({
key: msg.key || null,
value: typeof msg === 'string' ? msg : JSON.stringify(msg),
headers: msg.headers || {}
}));
const result = await this.producer.send({
topic,
messages: formattedMessages,
...options
});
console.log(`Sent ${messages.length} messages to topic ${topic}`);
return result;
} catch (error) {
console.error(`Failed to send messages to topic ${topic}:`, error);
throw error;
}
}
// 消费消息
async consume(topic, messageHandler, options = {}) {
if (!this.connected) {
await this.connect();
}
try {
// 订阅主题
await this.consumer.subscribe({
topic,
fromBeginning: options.fromBeginning || false
});
// 开始消费
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
// 解析消息值
let value;
try {
value = JSON.parse(message.value.toString());
} catch (e) {
value = message.value.toString();
}
// 调用消息处理函数
await messageHandler({
topic,
partition,
message: {
key: message.key ? message.key.toString() : null,
value,
headers: message.headers,
offset: message.offset,
timestamp: message.timestamp
}
});
} catch (error) {
console.error(`Error processing message from topic ${topic}:`, error);
// 根据需要决定是否抛出错误以停止消费
if (options.throwOnError) {
throw error;
}
}
},
...options.runOptions
});
console.log(`Started consuming messages from topic ${topic}`);
} catch (error) {
console.error(`Failed to start consuming from topic ${topic}:`, error);
throw error;
}
}
// 断开连接
async disconnect() {
if (this.connected) {
try {
await Promise.all([
this.producer.disconnect(),
this.consumer.disconnect()
]);
this.connected = false;
console.log('Disconnected from Kafka');
} catch (error) {
console.error('Failed to disconnect from Kafka:', error);
}
}
}
}
// 使用示例
async function example() {
// 创建Kafka客户端
const kafkaClient = new KafkaClient({
clientId: 'order-service',
brokers: ['localhost:9092'],
groupId: 'order-group'
});
try {
// 连接到Kafka
await kafkaClient.connect();
// 发送消息示例
await kafkaClient.sendMessage('order-events', [
{
key: 'order-123',
value: {
orderId: '123',
status: 'created',
amount: 100.50,
timestamp: new Date().toISOString()
},
headers: {
'event-type': 'order_created'
}
},
{
key: 'order-124',
value: {
orderId: '124',
status: 'created',
amount: 200.75,
timestamp: new Date().toISOString()
},
headers: {
'event-type': 'order_created'
}
}
]);
// 消费消息示例
await kafkaClient.consume('payment-events', async ({ message }) => {
console.log('Received payment event:', message.value);
// 处理支付事件
const { paymentId, orderId, status, amount } = message.value;
if (status === 'completed') {
console.log(`Payment ${paymentId} for order ${orderId} completed successfully`);
// 这里可以更新订单状态、发送通知等
} else if (status === 'failed') {
console.log(`Payment ${paymentId} for order ${orderId} failed`);
// 这里可以处理支付失败逻辑
}
}, {
fromBeginning: true
});
// 保持程序运行一段时间以接收消息
setTimeout(async () => {
// 断开连接
await kafkaClient.disconnect();
process.exit(0);
}, 60000); // 运行60秒
} catch (error) {
console.error('Error in Kafka example:', error);
// 尝试断开连接
await kafkaClient.disconnect();
process.exit(1);
}
}
example();
分布式系统可观测性
1. 分布式日志
在分布式系统中,日志分散在多个服务实例中,需要集中收集和分析。
const winston = require('winston');
const WinstonGraylog2 = require('winston-graylog2');
class DistributedLogger {
constructor(serviceName, options = {}) {
this.serviceName = serviceName;
this.environment = options.environment || process.env.NODE_ENV || 'development';
this.instanceId = options.instanceId || `instance-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// 创建日志器
this.logger = winston.createLogger({
level: options.logLevel || 'info',
defaultMeta: {
service: this.serviceName,
environment: this.environment,
instanceId: this.instanceId
},
format: winston.format.combine(
winston.format.timestamp({
format: 'YYYY-MM-DD HH:mm:ss.SSS'
}),
winston.format.json()
),
transports: [
// 控制台输出(开发环境)
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
}),
silent: this.environment === 'production'
}),
// 文件输出
new winston.transports.File({
filename: `${this.serviceName}-error.log`,
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5
}),
new winston.transports.File({
filename: `${this.serviceName}-combined.log`,
maxsize: 5242880, // 5MB
maxFiles: 5
})
]
});
// 添加Graylog传输(生产环境)
if (options.graylog && this.environment === 'production') {
this.logger.add(new WinstonGraylog2({
name: 'graylog',
silent: false,
handleExceptions: true,
graylog: {
servers: [
{ host: options.graylog.host, port: options.graylog.port }
],
hostname: this.serviceName,
facility: this.serviceName,
bufferSize: 1400
}
}));
}
// 处理未捕获的异常
process.on('uncaughtException', (error) => {
this.logger.error('Uncaught Exception:', {
error: error.message,
stack: error.stack
});
process.exit(1);
});
// 处理未处理的Promise拒绝
process.on('unhandledRejection', (reason, promise) => {
this.logger.error('Unhandled Rejection:', {
reason: reason instanceof Error ? reason.message : reason,
stack: reason instanceof Error ? reason.stack : null
});
});
}
// 记录调试信息
debug(message, meta = {}) {
this.logger.debug(message, meta);
}
// 记录信息
info(message, meta = {}) {
this.logger.info(message, meta);
}
// 记录警告
warn(message, meta = {}) {
this.logger.warn(message, meta);
}
// 记录错误
error(message, meta = {}) {
// 确保错误对象被正确序列化
if (meta.error instanceof Error) {
meta.error = {
message: meta.error.message,
stack: meta.error.stack,
name: meta.error.name
};
}
this.logger.error(message, meta);
}
// 记录HTTP请求
logRequest(req, res, next) {
const start = Date.now();
const requestId = req.headers['x-request-id'] || this._generateRequestId();
// 确保响应对象有请求ID
res.setHeader('x-request-id', requestId);
const logData = {
requestId,
method: req.method,
url: req.originalUrl,
headers: {
'user-agent': req.headers['user-agent'],
'content-type': req.headers['content-type']
},
ip: req.ip,
body: this._sanitizeBody(req.body)
};
this.logger.info('Request received', logData);
// 监听响应完成事件
res.on('finish', () => {
const duration = Date.now() - start;
this.logger.info('Request completed', {
...logData,
statusCode: res.statusCode,
duration: duration,
contentLength: res.getHeader('content-length')
});
});
next();
}
// 生成请求ID
_generateRequestId() {
return `${Date.now().toString(36)}-${Math.random().toString(36).substr(2, 9)}`;
}
// 清理请求体(移除敏感信息)
_sanitizeBody(body) {
if (!body || typeof body !== 'object') {
return body;
}
const sanitizedBody = { ...body };
// 移除敏感字段
const sensitiveFields = ['password', 'token', 'creditCard', 'ccv', 'auth', 'secret'];
for (const field of sensitiveFields) {
if (sanitizedBody[field]) {
sanitizedBody[field] = '******';
}
}
return sanitizedBody;
}
}
// 使用示例
function setupLogger() {
const logger = new DistributedLogger('user-service', {
environment: process.env.NODE_ENV,
logLevel: process.env.LOG_LEVEL || 'info',
graylog: {
host: process.env.GRAYLOG_HOST,
port: parseInt(process.env.GRAYLOG_PORT, 10) || 12201
}
});
return logger;
}
// 在Express应用中使用
const express = require('express');
const app = express();
const logger = setupLogger();
// 记录所有请求
app.use(logger.logRequest.bind(logger));