通信格式:JSON-RPC
概述
什么是JSON-RPC
JSON-RPC(JSON Remote Procedure Call)是一种轻量级的远程过程调用协议,使用JSON作为数据格式。它允许客户端向服务器发送请求,服务器执行相应的过程并返回结果。在MCP协议中,JSON-RPC被用作标准的数据交换格式,实现AI应用与外部工具之间的通信。
JSON-RPC在MCP中的作用
- 标准化通信:提供统一的请求响应格式
- 跨语言支持:支持多种编程语言和平台
- 简单易用:基于JSON,易于理解和实现
- 扩展性强:支持通知、批量请求等高级特性
适用场景
- 客户端-服务器通信
- 微服务间通信
- API接口设计
- 分布式系统集成
- 工具和插件通信
JSON-RPC协议规范
协议版本
JSON-RPC 2.0是目前最广泛使用的版本,相比1.0版本有以下改进:
- 支持通知(无响应请求)
- 支持批量请求
- 改进的错误处理
- 更清晰的协议规范
基本概念
- 请求(Request):客户端向服务器发送的调用请求
- 响应(Response):服务器返回的调用结果
- 通知(Notification):不需要响应的请求
- 错误(Error):请求处理失败时的错误信息
传输方式
JSON-RPC本身不指定传输方式,可以用于:
- HTTP/HTTPS
- WebSocket
- TCP/UDP
- 标准输入输出流(如MCP中的stdio)
请求格式详解
请求对象结构
{
"jsonrpc": "2.0",
"id": "unique-identifier",
"method": "method-name",
"params": {
"param1": "value1",
"param2": "value2"
}
}
字段说明
- jsonrpc:协议版本,固定为"2.0"
- id:请求标识符,用于匹配请求和响应
- method:要调用的方法名称
- params:方法参数(可选)
请求类型
1. 标准请求
{
"jsonrpc": "2.0",
"id": 1,
"method": "subtract",
"params": {
"minuend": 42,
"subtrahend": 23
}
}
2. 通知请求(无ID)
{
"jsonrpc": "2.0",
"method": "update",
"params": {
"status": "completed"
}
}
3. 批量请求
[
{
"jsonrpc": "2.0",
"id": 1,
"method": "sum",
"params": [1, 2, 4]
},
{
"jsonrpc": "2.0",
"id": 2,
"method": "notify_hello",
"params": [7]
},
{
"jsonrpc": "2.0",
"id": 3,
"method": "get_data"
}
]
响应格式详解
成功响应
{
"jsonrpc": "2.0",
"id": 1,
"result": 19
}
错误响应
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32600,
"message": "Invalid Request"
}
}
字段说明
- jsonrpc:协议版本,固定为"2.0"
- id:对应请求的标识符
- result:成功时的返回结果
- error:失败时的错误信息
错误代码规范
| 代码 | 含义 | 说明 |
|---|---|---|
| -32700 | Parse error | JSON解析错误 |
| -32600 | Invalid Request | 无效的请求 |
| -32601 | Method not found | 方法不存在 |
| -32602 | Invalid params | 无效的参数 |
| -32603 | Internal error | 内部错误 |
| -32000 to -32099 | Server error | 服务器错误(预留) |
协议实现方法
基础JSON-RPC客户端
class JSONRPCClient {
constructor(transport) {
this.transport = transport;
this.requestId = 0;
this.pendingRequests = new Map();
}
// 生成唯一请求ID
generateId() {
return ++this.requestId;
}
// 发送请求
async call(method, params = {}) {
const id = this.generateId();
const request = {
jsonrpc: '2.0',
id,
method,
params
};
return new Promise((resolve, reject) => {
// 保存待处理的请求
this.pendingRequests.set(id, { resolve, reject });
// 发送请求
this.transport.send(request);
// 设置超时
setTimeout(() => {
if (this.pendingRequests.has(id)) {
this.pendingRequests.delete(id);
reject(new Error('Request timeout'));
}
}, 30000); // 30秒超时
});
}
// 发送通知(无响应)
notify(method, params = {}) {
const request = {
jsonrpc: '2.0',
method,
params
};
this.transport.send(request);
}
// 处理响应
handleResponse(response) {
const { id, result, error } = response;
if (id && this.pendingRequests.has(id)) {
const { resolve, reject } = this.pendingRequests.get(id);
this.pendingRequests.delete(id);
if (error) {
reject(new JSONRPCError(error.code, error.message, error.data));
} else {
resolve(result);
}
}
}
// 处理批量响应
handleBatchResponse(responses) {
if (Array.isArray(responses)) {
responses.forEach(response => this.handleResponse(response));
} else {
this.handleResponse(responses);
}
}
}
// JSON-RPC错误类
class JSONRPCError extends Error {
constructor(code, message, data = null) {
super(message);
this.name = 'JSONRPCError';
this.code = code;
this.data = data;
}
}
基础JSON-RPC服务器
class JSONRPCServer {
constructor() {
this.methods = new Map();
this.middleware = [];
}
// 注册方法
registerMethod(name, handler) {
this.methods.set(name, handler);
}
// 添加中间件
use(middleware) {
this.middleware.push(middleware);
}
// 处理请求
async handleRequest(request) {
try {
// 验证请求格式
if (!this.validateRequest(request)) {
return this.createErrorResponse(request.id, -32600, 'Invalid Request');
}
// 执行中间件
for (const mw of this.middleware) {
const result = await mw(request);
if (result) {
return result;
}
}
// 处理通知(无ID的请求)
if (!request.id) {
await this.handleNotification(request);
return null; // 通知不需要响应
}
// 查找方法处理器
const handler = this.methods.get(request.method);
if (!handler) {
return this.createErrorResponse(request.id, -32601, 'Method not found');
}
// 执行方法
const result = await handler(request.params);
return this.createSuccessResponse(request.id, result);
} catch (error) {
return this.createErrorResponse(request.id, -32603, 'Internal error', error.message);
}
}
// 处理通知
async handleNotification(request) {
const handler = this.methods.get(request.method);
if (handler) {
try {
await handler(request.params);
} catch (error) {
console.error('Notification handler error:', error);
}
}
}
// 处理批量请求
async handleBatchRequest(requests) {
if (!Array.isArray(requests)) {
return this.createErrorResponse(null, -32600, 'Invalid Request');
}
const responses = [];
for (const request of requests) {
const response = await this.handleRequest(request);
if (response) {
responses.push(response);
}
}
return responses.length > 0 ? responses : null;
}
// 验证请求
validateRequest(request) {
return request.jsonrpc === '2.0' &&
request.method &&
typeof request.method === 'string';
}
// 创建成功响应
createSuccessResponse(id, result) {
return {
jsonrpc: '2.0',
id,
result
};
}
// 创建错误响应
createErrorResponse(id, code, message, data = null) {
const response = {
jsonrpc: '2.0',
id,
error: { code, message }
};
if (data) {
response.error.data = data;
}
return response;
}
}
高级特性实现
中间件系统
// 日志中间件
function loggingMiddleware(request, next) {
console.log(`[${new Date().toISOString()}] ${request.method} called`);
const startTime = Date.now();
const result = await next(request);
const duration = Date.now() - startTime;
console.log(`[${new Date().toISOString()}] ${request.method} completed in ${duration}ms`);
return result;
}
// 认证中间件
function authMiddleware(request, next) {
const token = request.params?.token || request.headers?.authorization;
if (!token || !isValidToken(token)) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32001,
message: 'Unauthorized'
}
};
}
return next(request);
}
// 限流中间件
function rateLimitMiddleware(request, next) {
const clientId = getClientId(request);
if (isRateLimited(clientId)) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32002,
message: 'Rate limit exceeded'
}
};
}
return next(request);
}
批量请求优化
// 批量请求处理器
class BatchRequestProcessor {
constructor(server) {
this.server = server;
this.batchSize = 100;
this.batchTimeout = 1000;
this.pendingBatch = [];
this.batchTimer = null;
}
// 添加请求到批次
addToBatch(request) {
this.pendingBatch.push(request);
if (this.pendingBatch.length >= this.batchSize) {
this.processBatch();
} else if (this.pendingBatch.length === 1) {
this.startBatchTimer();
}
}
// 启动批次定时器
startBatchTimer() {
this.batchTimer = setTimeout(() => {
this.processBatch();
}, this.batchTimeout);
}
// 处理批次
async processBatch() {
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}
if (this.pendingBatch.length > 0) {
const batch = [...this.pendingBatch];
this.pendingBatch = [];
try {
const results = await Promise.all(
batch.map(request => this.server.handleRequest(request))
);
// 过滤掉null结果(通知)
const responses = results.filter(result => result !== null);
if (responses.length > 0) {
this.sendResponses(responses);
}
} catch (error) {
console.error('Batch processing error:', error);
}
}
}
// 发送响应
sendResponses(responses) {
if (responses.length === 1) {
this.transport.send(responses[0]);
} else {
this.transport.send(responses);
}
}
}
错误处理机制
错误分类和处理
// 错误处理器
class JSONRPCErrorHandler {
// 处理解析错误
static handleParseError(error) {
return {
jsonrpc: '2.0',
id: null,
error: {
code: -32700,
message: 'Parse error',
data: error.message
}
};
}
// 处理无效请求
static handleInvalidRequest(request) {
return {
jsonrpc: '2.0',
id: request.id || null,
error: {
code: -32600,
message: 'Invalid Request',
data: 'Request does not conform to JSON-RPC 2.0 specification'
}
};
}
// 处理方法不存在
static handleMethodNotFound(request) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32601,
message: 'Method not found',
data: `Method '${request.method}' not found`
}
};
}
// 处理无效参数
static handleInvalidParams(request, validationErrors) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32602,
message: 'Invalid params',
data: validationErrors
}
};
}
// 处理内部错误
static handleInternalError(request, error) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32603,
message: 'Internal error',
data: process.env.NODE_ENV === 'development' ? error.stack : 'Internal server error'
}
};
}
// 处理自定义错误
static handleCustomError(request, code, message, data = null) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code, message, data }
};
}
}
错误恢复策略
// 错误恢复管理器
class ErrorRecoveryManager {
constructor(client) {
this.client = client;
this.retryStrategies = new Map();
this.maxRetries = 3;
}
// 注册重试策略
registerRetryStrategy(errorCode, strategy) {
this.retryStrategies.set(errorCode, strategy);
}
// 处理错误并尝试恢复
async handleError(request, error) {
const retryStrategy = this.retryStrategies.get(error.code);
if (retryStrategy && request.retryCount < this.maxRetries) {
request.retryCount = (request.retryCount || 0) + 1;
try {
await retryStrategy(request, error);
return await this.client.call(request.method, request.params);
} catch (retryError) {
console.error('Retry failed:', retryError);
}
}
throw error;
}
// 指数退避重试策略
static exponentialBackoff(request, error) {
const delay = Math.pow(2, request.retryCount) * 1000;
return new Promise(resolve => setTimeout(resolve, delay));
}
// 固定延迟重试策略
static fixedDelay(request, error) {
return new Promise(resolve => setTimeout(resolve, 1000));
}
}
性能优化策略
连接池管理
// JSON-RPC连接池
class JSONRPCConnectionPool {
constructor(factory, maxConnections = 10) {
this.factory = factory;
this.maxConnections = maxConnections;
this.connections = new Map();
this.availableConnections = [];
this.waitingRequests = [];
}
// 获取连接
async getConnection() {
if (this.availableConnections.length > 0) {
return this.availableConnections.pop();
}
if (this.connections.size < this.maxConnections) {
const connection = await this.factory();
this.connections.set(connection.id, connection);
return connection;
}
return new Promise(resolve => {
this.waitingRequests.push(resolve);
});
}
// 释放连接
releaseConnection(connection) {
if (this.waitingRequests.length > 0) {
const resolve = this.waitingRequests.shift();
resolve(connection);
} else {
this.availableConnections.push(connection);
}
}
// 关闭所有连接
closeAll() {
for (const connection of this.connections.values()) {
connection.close();
}
this.connections.clear();
this.availableConnections.length = 0;
}
}
请求缓存
// JSON-RPC请求缓存
class JSONRPCCache {
constructor(maxSize = 1000, ttl = 60000) {
this.maxSize = maxSize;
this.ttl = ttl;
this.cache = new Map();
}
// 生成缓存键
generateKey(method, params) {
return `${method}:${JSON.stringify(params)}`;
}
// 获取缓存
get(method, params) {
const key = this.generateKey(method, params);
const entry = this.cache.get(key);
if (entry && Date.now() - entry.timestamp < this.ttl) {
return entry.result;
}
if (entry) {
this.cache.delete(key);
}
return null;
}
// 设置缓存
set(method, params, result) {
const key = this.generateKey(method, params);
if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
this.cache.set(key, {
result,
timestamp: Date.now()
});
}
// 清除缓存
clear() {
this.cache.clear();
}
}
实际应用示例
MCP工具集成示例
// MCP工具包装器
class MCPToolWrapper {
constructor(command, args = []) {
this.client = new JSONRPCClient(new StdioTransport(command, args));
this.cache = new JSONRPCCache();
}
// 调用工具方法
async call(method, params = {}) {
// 检查缓存
const cached = this.cache.get(method, params);
if (cached) {
return cached;
}
// 调用远程方法
const result = await this.client.call(method, params);
// 缓存结果
this.cache.set(method, params, result);
return result;
}
// 发送通知
notify(method, params = {}) {
this.client.notify(method, params);
}
// 关闭连接
close() {
this.client.close();
}
}
// 使用示例
const fileTool = new MCPToolWrapper('python', ['file_tool.py']);
// 读取文件
const content = await fileTool.call('read_file', { path: '/path/to/file.txt' });
// 写入文件
await fileTool.call('write_file', {
path: '/path/to/file.txt',
content: 'Hello World'
});
// 监听文件变化
fileTool.notify('watch_file', { path: '/path/to/file.txt' });
最佳实践
设计原则
- 单一职责:每个方法只负责一个特定功能
- 参数验证:严格验证输入参数的有效性
- 错误处理:提供清晰的错误信息和错误代码
- 性能优化:合理使用缓存和连接池
安全考虑
- 验证所有输入数据
- 实现适当的访问控制
- 记录操作日志
- 限制资源使用
性能优化建议
- 批量处理:将多个小请求合并为批次
- 连接复用:使用连接池管理连接
- 请求缓存:缓存频繁调用的结果
- 异步处理:使用异步操作提高并发性能
总结
JSON-RPC是一种强大而灵活的远程过程调用协议,为MCP协议提供了标准化的通信格式。通过本文的学习,你应该能够:
- 理解协议规范:掌握JSON-RPC 2.0的完整规范
- 实现通信协议:能够构建JSON-RPC客户端和服务器
- 处理错误情况:掌握各种错误处理策略和恢复机制
- 优化性能:了解JSON-RPC的性能优化方法
- 集成MCP工具:将JSON-RPC集成到MCP应用中
JSON-RPC为MCP协议提供了可靠的数据交换基础,掌握它将为你的MCP开发工作提供强大的支持。