跳到主要内容

通信格式:JSON-RPC

概述

什么是JSON-RPC

JSON-RPC(JSON Remote Procedure Call)是一种轻量级的远程过程调用协议,使用JSON作为数据格式。它允许客户端向服务器发送请求,服务器执行相应的过程并返回结果。在MCP协议中,JSON-RPC被用作标准的数据交换格式,实现AI应用与外部工具之间的通信。

JSON-RPC在MCP中的作用

  • 标准化通信:提供统一的请求响应格式
  • 跨语言支持:支持多种编程语言和平台
  • 简单易用:基于JSON,易于理解和实现
  • 扩展性强:支持通知、批量请求等高级特性

适用场景

  • 客户端-服务器通信
  • 微服务间通信
  • API接口设计
  • 分布式系统集成
  • 工具和插件通信

JSON-RPC协议规范

协议版本

JSON-RPC 2.0是目前最广泛使用的版本,相比1.0版本有以下改进:

  • 支持通知(无响应请求)
  • 支持批量请求
  • 改进的错误处理
  • 更清晰的协议规范

基本概念

  • 请求(Request):客户端向服务器发送的调用请求
  • 响应(Response):服务器返回的调用结果
  • 通知(Notification):不需要响应的请求
  • 错误(Error):请求处理失败时的错误信息

传输方式

JSON-RPC本身不指定传输方式,可以用于:

  • HTTP/HTTPS
  • WebSocket
  • TCP/UDP
  • 标准输入输出流(如MCP中的stdio)

请求格式详解

请求对象结构

{
"jsonrpc": "2.0",
"id": "unique-identifier",
"method": "method-name",
"params": {
"param1": "value1",
"param2": "value2"
}
}

字段说明

  • jsonrpc:协议版本,固定为"2.0"
  • id:请求标识符,用于匹配请求和响应
  • method:要调用的方法名称
  • params:方法参数(可选)

请求类型

1. 标准请求

{
"jsonrpc": "2.0",
"id": 1,
"method": "subtract",
"params": {
"minuend": 42,
"subtrahend": 23
}
}

2. 通知请求(无ID)

{
"jsonrpc": "2.0",
"method": "update",
"params": {
"status": "completed"
}
}

3. 批量请求

[
{
"jsonrpc": "2.0",
"id": 1,
"method": "sum",
"params": [1, 2, 4]
},
{
"jsonrpc": "2.0",
"id": 2,
"method": "notify_hello",
"params": [7]
},
{
"jsonrpc": "2.0",
"id": 3,
"method": "get_data"
}
]

响应格式详解

成功响应

{
"jsonrpc": "2.0",
"id": 1,
"result": 19
}

错误响应

{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32600,
"message": "Invalid Request"
}
}

字段说明

  • jsonrpc:协议版本,固定为"2.0"
  • id:对应请求的标识符
  • result:成功时的返回结果
  • error:失败时的错误信息

错误代码规范

代码含义说明
-32700Parse errorJSON解析错误
-32600Invalid Request无效的请求
-32601Method not found方法不存在
-32602Invalid params无效的参数
-32603Internal error内部错误
-32000 to -32099Server error服务器错误(预留)

协议实现方法

基础JSON-RPC客户端

class JSONRPCClient {
constructor(transport) {
this.transport = transport;
this.requestId = 0;
this.pendingRequests = new Map();
}

// 生成唯一请求ID
generateId() {
return ++this.requestId;
}

// 发送请求
async call(method, params = {}) {
const id = this.generateId();

const request = {
jsonrpc: '2.0',
id,
method,
params
};

return new Promise((resolve, reject) => {
// 保存待处理的请求
this.pendingRequests.set(id, { resolve, reject });

// 发送请求
this.transport.send(request);

// 设置超时
setTimeout(() => {
if (this.pendingRequests.has(id)) {
this.pendingRequests.delete(id);
reject(new Error('Request timeout'));
}
}, 30000); // 30秒超时
});
}

// 发送通知(无响应)
notify(method, params = {}) {
const request = {
jsonrpc: '2.0',
method,
params
};

this.transport.send(request);
}

// 处理响应
handleResponse(response) {
const { id, result, error } = response;

if (id && this.pendingRequests.has(id)) {
const { resolve, reject } = this.pendingRequests.get(id);
this.pendingRequests.delete(id);

if (error) {
reject(new JSONRPCError(error.code, error.message, error.data));
} else {
resolve(result);
}
}
}

// 处理批量响应
handleBatchResponse(responses) {
if (Array.isArray(responses)) {
responses.forEach(response => this.handleResponse(response));
} else {
this.handleResponse(responses);
}
}
}

// JSON-RPC错误类
class JSONRPCError extends Error {
constructor(code, message, data = null) {
super(message);
this.name = 'JSONRPCError';
this.code = code;
this.data = data;
}
}

基础JSON-RPC服务器

class JSONRPCServer {
constructor() {
this.methods = new Map();
this.middleware = [];
}

// 注册方法
registerMethod(name, handler) {
this.methods.set(name, handler);
}

// 添加中间件
use(middleware) {
this.middleware.push(middleware);
}

// 处理请求
async handleRequest(request) {
try {
// 验证请求格式
if (!this.validateRequest(request)) {
return this.createErrorResponse(request.id, -32600, 'Invalid Request');
}

// 执行中间件
for (const mw of this.middleware) {
const result = await mw(request);
if (result) {
return result;
}
}

// 处理通知(无ID的请求)
if (!request.id) {
await this.handleNotification(request);
return null; // 通知不需要响应
}

// 查找方法处理器
const handler = this.methods.get(request.method);
if (!handler) {
return this.createErrorResponse(request.id, -32601, 'Method not found');
}

// 执行方法
const result = await handler(request.params);
return this.createSuccessResponse(request.id, result);

} catch (error) {
return this.createErrorResponse(request.id, -32603, 'Internal error', error.message);
}
}

// 处理通知
async handleNotification(request) {
const handler = this.methods.get(request.method);
if (handler) {
try {
await handler(request.params);
} catch (error) {
console.error('Notification handler error:', error);
}
}
}

// 处理批量请求
async handleBatchRequest(requests) {
if (!Array.isArray(requests)) {
return this.createErrorResponse(null, -32600, 'Invalid Request');
}

const responses = [];
for (const request of requests) {
const response = await this.handleRequest(request);
if (response) {
responses.push(response);
}
}

return responses.length > 0 ? responses : null;
}

// 验证请求
validateRequest(request) {
return request.jsonrpc === '2.0' &&
request.method &&
typeof request.method === 'string';
}

// 创建成功响应
createSuccessResponse(id, result) {
return {
jsonrpc: '2.0',
id,
result
};
}

// 创建错误响应
createErrorResponse(id, code, message, data = null) {
const response = {
jsonrpc: '2.0',
id,
error: { code, message }
};

if (data) {
response.error.data = data;
}

return response;
}
}

高级特性实现

中间件系统

// 日志中间件
function loggingMiddleware(request, next) {
console.log(`[${new Date().toISOString()}] ${request.method} called`);

const startTime = Date.now();
const result = await next(request);

const duration = Date.now() - startTime;
console.log(`[${new Date().toISOString()}] ${request.method} completed in ${duration}ms`);

return result;
}

// 认证中间件
function authMiddleware(request, next) {
const token = request.params?.token || request.headers?.authorization;

if (!token || !isValidToken(token)) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32001,
message: 'Unauthorized'
}
};
}

return next(request);
}

// 限流中间件
function rateLimitMiddleware(request, next) {
const clientId = getClientId(request);

if (isRateLimited(clientId)) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32002,
message: 'Rate limit exceeded'
}
};
}

return next(request);
}

批量请求优化

// 批量请求处理器
class BatchRequestProcessor {
constructor(server) {
this.server = server;
this.batchSize = 100;
this.batchTimeout = 1000;
this.pendingBatch = [];
this.batchTimer = null;
}

// 添加请求到批次
addToBatch(request) {
this.pendingBatch.push(request);

if (this.pendingBatch.length >= this.batchSize) {
this.processBatch();
} else if (this.pendingBatch.length === 1) {
this.startBatchTimer();
}
}

// 启动批次定时器
startBatchTimer() {
this.batchTimer = setTimeout(() => {
this.processBatch();
}, this.batchTimeout);
}

// 处理批次
async processBatch() {
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}

if (this.pendingBatch.length > 0) {
const batch = [...this.pendingBatch];
this.pendingBatch = [];

try {
const results = await Promise.all(
batch.map(request => this.server.handleRequest(request))
);

// 过滤掉null结果(通知)
const responses = results.filter(result => result !== null);

if (responses.length > 0) {
this.sendResponses(responses);
}
} catch (error) {
console.error('Batch processing error:', error);
}
}
}

// 发送响应
sendResponses(responses) {
if (responses.length === 1) {
this.transport.send(responses[0]);
} else {
this.transport.send(responses);
}
}
}

错误处理机制

错误分类和处理

// 错误处理器
class JSONRPCErrorHandler {
// 处理解析错误
static handleParseError(error) {
return {
jsonrpc: '2.0',
id: null,
error: {
code: -32700,
message: 'Parse error',
data: error.message
}
};
}

// 处理无效请求
static handleInvalidRequest(request) {
return {
jsonrpc: '2.0',
id: request.id || null,
error: {
code: -32600,
message: 'Invalid Request',
data: 'Request does not conform to JSON-RPC 2.0 specification'
}
};
}

// 处理方法不存在
static handleMethodNotFound(request) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32601,
message: 'Method not found',
data: `Method '${request.method}' not found`
}
};
}

// 处理无效参数
static handleInvalidParams(request, validationErrors) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32602,
message: 'Invalid params',
data: validationErrors
}
};
}

// 处理内部错误
static handleInternalError(request, error) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: -32603,
message: 'Internal error',
data: process.env.NODE_ENV === 'development' ? error.stack : 'Internal server error'
}
};
}

// 处理自定义错误
static handleCustomError(request, code, message, data = null) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code, message, data }
};
}
}

错误恢复策略

// 错误恢复管理器
class ErrorRecoveryManager {
constructor(client) {
this.client = client;
this.retryStrategies = new Map();
this.maxRetries = 3;
}

// 注册重试策略
registerRetryStrategy(errorCode, strategy) {
this.retryStrategies.set(errorCode, strategy);
}

// 处理错误并尝试恢复
async handleError(request, error) {
const retryStrategy = this.retryStrategies.get(error.code);

if (retryStrategy && request.retryCount < this.maxRetries) {
request.retryCount = (request.retryCount || 0) + 1;

try {
await retryStrategy(request, error);
return await this.client.call(request.method, request.params);
} catch (retryError) {
console.error('Retry failed:', retryError);
}
}

throw error;
}

// 指数退避重试策略
static exponentialBackoff(request, error) {
const delay = Math.pow(2, request.retryCount) * 1000;
return new Promise(resolve => setTimeout(resolve, delay));
}

// 固定延迟重试策略
static fixedDelay(request, error) {
return new Promise(resolve => setTimeout(resolve, 1000));
}
}

性能优化策略

连接池管理

// JSON-RPC连接池
class JSONRPCConnectionPool {
constructor(factory, maxConnections = 10) {
this.factory = factory;
this.maxConnections = maxConnections;
this.connections = new Map();
this.availableConnections = [];
this.waitingRequests = [];
}

// 获取连接
async getConnection() {
if (this.availableConnections.length > 0) {
return this.availableConnections.pop();
}

if (this.connections.size < this.maxConnections) {
const connection = await this.factory();
this.connections.set(connection.id, connection);
return connection;
}

return new Promise(resolve => {
this.waitingRequests.push(resolve);
});
}

// 释放连接
releaseConnection(connection) {
if (this.waitingRequests.length > 0) {
const resolve = this.waitingRequests.shift();
resolve(connection);
} else {
this.availableConnections.push(connection);
}
}

// 关闭所有连接
closeAll() {
for (const connection of this.connections.values()) {
connection.close();
}
this.connections.clear();
this.availableConnections.length = 0;
}
}

请求缓存

// JSON-RPC请求缓存
class JSONRPCCache {
constructor(maxSize = 1000, ttl = 60000) {
this.maxSize = maxSize;
this.ttl = ttl;
this.cache = new Map();
}

// 生成缓存键
generateKey(method, params) {
return `${method}:${JSON.stringify(params)}`;
}

// 获取缓存
get(method, params) {
const key = this.generateKey(method, params);
const entry = this.cache.get(key);

if (entry && Date.now() - entry.timestamp < this.ttl) {
return entry.result;
}

if (entry) {
this.cache.delete(key);
}

return null;
}

// 设置缓存
set(method, params, result) {
const key = this.generateKey(method, params);

if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}

this.cache.set(key, {
result,
timestamp: Date.now()
});
}

// 清除缓存
clear() {
this.cache.clear();
}
}

实际应用示例

MCP工具集成示例

// MCP工具包装器
class MCPToolWrapper {
constructor(command, args = []) {
this.client = new JSONRPCClient(new StdioTransport(command, args));
this.cache = new JSONRPCCache();
}

// 调用工具方法
async call(method, params = {}) {
// 检查缓存
const cached = this.cache.get(method, params);
if (cached) {
return cached;
}

// 调用远程方法
const result = await this.client.call(method, params);

// 缓存结果
this.cache.set(method, params, result);

return result;
}

// 发送通知
notify(method, params = {}) {
this.client.notify(method, params);
}

// 关闭连接
close() {
this.client.close();
}
}

// 使用示例
const fileTool = new MCPToolWrapper('python', ['file_tool.py']);

// 读取文件
const content = await fileTool.call('read_file', { path: '/path/to/file.txt' });

// 写入文件
await fileTool.call('write_file', {
path: '/path/to/file.txt',
content: 'Hello World'
});

// 监听文件变化
fileTool.notify('watch_file', { path: '/path/to/file.txt' });

最佳实践

设计原则

  1. 单一职责:每个方法只负责一个特定功能
  2. 参数验证:严格验证输入参数的有效性
  3. 错误处理:提供清晰的错误信息和错误代码
  4. 性能优化:合理使用缓存和连接池

安全考虑

  • 验证所有输入数据
  • 实现适当的访问控制
  • 记录操作日志
  • 限制资源使用

性能优化建议

  1. 批量处理:将多个小请求合并为批次
  2. 连接复用:使用连接池管理连接
  3. 请求缓存:缓存频繁调用的结果
  4. 异步处理:使用异步操作提高并发性能

总结

JSON-RPC是一种强大而灵活的远程过程调用协议,为MCP协议提供了标准化的通信格式。通过本文的学习,你应该能够:

  1. 理解协议规范:掌握JSON-RPC 2.0的完整规范
  2. 实现通信协议:能够构建JSON-RPC客户端和服务器
  3. 处理错误情况:掌握各种错误处理策略和恢复机制
  4. 优化性能:了解JSON-RPC的性能优化方法
  5. 集成MCP工具:将JSON-RPC集成到MCP应用中

JSON-RPC为MCP协议提供了可靠的数据交换基础,掌握它将为你的MCP开发工作提供强大的支持。