第 3 章:MCP协议规范详解
2025/9/1大约 10 分钟
第 3 章:MCP协议规范详解
学习目标
- 深入理解MCP协议的消息格式
- 掌握请求-响应模式和事件流
- 学习协议的版本控制和兼容性
- 理解错误处理和异常情况
- 掌握协议扩展机制
3.1 MCP协议基础
协议架构概述
// MCP协议基于JSON-RPC 2.0,具有以下特点:
interface MCPProtocolFeatures {
// 传输层支持
transport: {
stdio: '标准输入输出';
websocket: 'WebSocket连接';
http: 'HTTP长轮询';
};
// 消息格式
messageFormat: 'JSON-RPC 2.0';
// 双向通信
bidirectional: true;
// 流式传输支持
streaming: boolean;
// 协议版本
version: '2024-11-05';
}
// 协议版本定义
const MCP_PROTOCOL_VERSION = '2024-11-05';
// 支持的传输方式
enum TransportType {
STDIO = 'stdio',
WEBSOCKET = 'websocket',
HTTP = 'http'
}
JSON-RPC 2.0基础
// JSON-RPC 2.0 消息基础结构
interface JSONRPCMessage {
jsonrpc: '2.0'; // 协议版本,必须是 '2.0'
id?: string | number | null; // 消息ID,通知消息可以省略
}
// 请求消息
interface JSONRPCRequest extends JSONRPCMessage {
method: string; // 方法名
params?: any; // 参数,可以是对象或数组
id: string | number; // 请求ID,必须存在
}
// 响应消息
interface JSONRPCResponse extends JSONRPCMessage {
id: string | number | null; // 对应的请求ID
result?: any; // 成功结果
error?: { // 错误信息
code: number;
message: string;
data?: any;
};
}
// 通知消息
interface JSONRPCNotification extends JSONRPCMessage {
method: string; // 方法名
params?: any; // 参数
// 注意:通知消息没有id字段
}
3.2 MCP消息类型和格式
初始化流程
// 1. 初始化请求 (客户端 -> 服务器)
interface InitializeRequest {
jsonrpc: '2.0';
id: string | number;
method: 'initialize';
params: {
protocolVersion: string; // MCP协议版本
capabilities: {
tools?: {
listChanged?: boolean; // 支持工具列表变更通知
};
resources?: {
subscribe?: boolean; // 支持资源订阅
listChanged?: boolean; // 支持资源列表变更通知
};
prompts?: {
listChanged?: boolean; // 支持提示模板列表变更通知
};
logging?: {}; // 日志支持
experimental?: Record<string, any>; // 实验性功能
};
clientInfo: {
name: string; // 客户端名称
version: string; // 客户端版本
};
};
}
// 2. 初始化响应 (服务器 -> 客户端)
interface InitializeResponse {
jsonrpc: '2.0';
id: string | number;
result: {
protocolVersion: string; // 服务器支持的协议版本
capabilities: {
tools?: {
listChanged?: boolean;
};
resources?: {
subscribe?: boolean;
listChanged?: boolean;
};
prompts?: {
listChanged?: boolean;
};
logging?: {};
experimental?: Record<string, any>;
};
serverInfo: {
name: string; // 服务器名称
version: string; // 服务器版本
};
instructions?: string; // 可选的服务器使用说明
};
}
// 3. 初始化完成通知 (客户端 -> 服务器)
interface InitializedNotification {
jsonrpc: '2.0';
method: 'notifications/initialized';
params?: {};
}
// 初始化流程示例
class MCPInitializationFlow {
async performHandshake(): Promise<void> {
// 步骤1: 客户端发送初始化请求
const initRequest: InitializeRequest = {
jsonrpc: '2.0',
id: 'init-1',
method: 'initialize',
params: {
protocolVersion: '2024-11-05',
capabilities: {
tools: { listChanged: true },
resources: { subscribe: true, listChanged: true },
prompts: { listChanged: true },
logging: {}
},
clientInfo: {
name: 'example-client',
version: '1.0.0'
}
}
};
console.log('发送初始化请求:', JSON.stringify(initRequest, null, 2));
// 步骤2: 服务器响应
const initResponse: InitializeResponse = {
jsonrpc: '2.0',
id: 'init-1',
result: {
protocolVersion: '2024-11-05',
capabilities: {
tools: { listChanged: true },
resources: { subscribe: false, listChanged: true },
prompts: { listChanged: false },
logging: {}
},
serverInfo: {
name: 'example-server',
version: '1.0.0'
},
instructions: '这是一个示例MCP服务器,提供文件操作和计算功能。'
}
};
console.log('收到初始化响应:', JSON.stringify(initResponse, null, 2));
// 步骤3: 客户端发送初始化完成通知
const initializedNotification: InitializedNotification = {
jsonrpc: '2.0',
method: 'notifications/initialized',
params: {}
};
console.log('发送初始化完成通知:', JSON.stringify(initializedNotification, null, 2));
}
}
工具相关消息
// 列出工具请求
interface ListToolsRequest {
jsonrpc: '2.0';
id: string | number;
method: 'tools/list';
params?: {
cursor?: string; // 分页游标
};
}
// 列出工具响应
interface ListToolsResponse {
jsonrpc: '2.0';
id: string | number;
result: {
tools: Array<{
name: string;
description?: string;
inputSchema: {
type: 'object';
properties?: Record<string, any>;
required?: string[];
};
}>;
nextCursor?: string; // 下一页游标
};
}
// 调用工具请求
interface CallToolRequest {
jsonrpc: '2.0';
id: string | number;
method: 'tools/call';
params: {
name: string;
arguments?: Record<string, any>;
};
}
// 调用工具响应
interface CallToolResponse {
jsonrpc: '2.0';
id: string | number;
result: {
content: Array<{
type: 'text' | 'image' | 'resource';
text?: string;
data?: string; // Base64编码的数据
mimeType?: string;
uri?: string; // 资源URI
}>;
isError?: boolean; // 表示工具执行是否有错误
};
}
// 工具列表变更通知
interface ToolsListChangedNotification {
jsonrpc: '2.0';
method: 'notifications/tools/list_changed';
params?: {};
}
// 工具消息处理示例
class MCPToolsHandler {
private tools: Map<string, any> = new Map();
// 处理工具列表请求
async handleListTools(request: ListToolsRequest): Promise<ListToolsResponse> {
const toolsList = Array.from(this.tools.values()).map(tool => ({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema
}));
return {
jsonrpc: '2.0',
id: request.id,
result: {
tools: toolsList
}
};
}
// 处理工具调用请求
async handleCallTool(request: CallToolRequest): Promise<CallToolResponse> {
const { name, arguments: args = {} } = request.params;
const tool = this.tools.get(name);
if (!tool) {
throw new Error(`Unknown tool: ${name}`);
}
try {
const result = await tool.handler(args);
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: typeof result === 'string' ? result : JSON.stringify(result, null, 2)
}
]
}
};
} catch (error) {
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: `Error: ${error.message}`
}
],
isError: true
}
};
}
}
// 发送工具列表变更通知
notifyToolsListChanged(): ToolsListChangedNotification {
return {
jsonrpc: '2.0',
method: 'notifications/tools/list_changed',
params: {}
};
}
}
资源相关消息
// 列出资源请求
interface ListResourcesRequest {
jsonrpc: '2.0';
id: string | number;
method: 'resources/list';
params?: {
cursor?: string;
};
}
// 列出资源响应
interface ListResourcesResponse {
jsonrpc: '2.0';
id: string | number;
result: {
resources: Array<{
uri: string;
name: string;
description?: string;
mimeType?: string;
}>;
nextCursor?: string;
};
}
// 读取资源请求
interface ReadResourceRequest {
jsonrpc: '2.0';
id: string | number;
method: 'resources/read';
params: {
uri: string;
};
}
// 读取资源响应
interface ReadResourceResponse {
jsonrpc: '2.0';
id: string | number;
result: {
contents: Array<{
uri: string;
mimeType?: string;
text?: string;
blob?: string; // Base64编码的二进制数据
}>;
};
}
// 订阅资源请求
interface SubscribeResourceRequest {
jsonrpc: '2.0';
id: string | number;
method: 'resources/subscribe';
params: {
uri: string;
};
}
// 取消订阅资源请求
interface UnsubscribeResourceRequest {
jsonrpc: '2.0';
id: string | number;
method: 'resources/unsubscribe';
params: {
uri: string;
};
}
// 资源更新通知
interface ResourceUpdatedNotification {
jsonrpc: '2.0';
method: 'notifications/resources/updated';
params: {
uri: string;
};
}
// 资源列表变更通知
interface ResourcesListChangedNotification {
jsonrpc: '2.0';
method: 'notifications/resources/list_changed';
params?: {};
}
// 资源处理示例
class MCPResourcesHandler {
private resources: Map<string, any> = new Map();
private subscribers: Map<string, Set<string>> = new Map();
// 处理资源订阅
async handleSubscribe(request: SubscribeResourceRequest): Promise<any> {
const { uri } = request.params;
if (!this.resources.has(uri)) {
throw new Error(`Resource not found: ${uri}`);
}
// 记录订阅
if (!this.subscribers.has(uri)) {
this.subscribers.set(uri, new Set());
}
this.subscribers.get(uri)!.add(request.id.toString());
return {
jsonrpc: '2.0',
id: request.id,
result: {}
};
}
// 通知资源更新
notifyResourceUpdate(uri: string): ResourceUpdatedNotification[] {
const subscribers = this.subscribers.get(uri);
if (!subscribers || subscribers.size === 0) {
return [];
}
return Array.from(subscribers).map(subscriberId => ({
jsonrpc: '2.0',
method: 'notifications/resources/updated',
params: { uri }
}));
}
}
提示模板相关消息
// 列出提示模板请求
interface ListPromptsRequest {
jsonrpc: '2.0';
id: string | number;
method: 'prompts/list';
params?: {
cursor?: string;
};
}
// 列出提示模板响应
interface ListPromptsResponse {
jsonrpc: '2.0';
id: string | number;
result: {
prompts: Array<{
name: string;
description?: string;
arguments?: Array<{
name: string;
description: string;
required?: boolean;
}>;
}>;
nextCursor?: string;
};
}
// 获取提示模板请求
interface GetPromptRequest {
jsonrpc: '2.0';
id: string | number;
method: 'prompts/get';
params: {
name: string;
arguments?: Record<string, any>;
};
}
// 获取提示模板响应
interface GetPromptResponse {
jsonrpc: '2.0';
id: string | number;
result: {
description?: string;
messages: Array<{
role: 'user' | 'assistant' | 'system';
content: {
type: 'text' | 'image';
text?: string;
data?: string;
mimeType?: string;
};
}>;
};
}
// 提示模板列表变更通知
interface PromptsListChangedNotification {
jsonrpc: '2.0';
method: 'notifications/prompts/list_changed';
params?: {};
}
3.3 错误处理和状态码
MCP错误代码
// MCP定义的标准错误代码
enum MCPErrorCode {
// JSON-RPC标准错误
PARSE_ERROR = -32700,
INVALID_REQUEST = -32600,
METHOD_NOT_FOUND = -32601,
INVALID_PARAMS = -32602,
INTERNAL_ERROR = -32603,
// MCP特定错误
INVALID_TOOL = -32000,
TOOL_EXECUTION_ERROR = -32001,
RESOURCE_NOT_FOUND = -32002,
RESOURCE_ACCESS_DENIED = -32003,
PROMPT_NOT_FOUND = -32004,
INVALID_PROMPT_ARGS = -32005
}
// 错误响应格式
interface MCPErrorResponse {
jsonrpc: '2.0';
id: string | number | null;
error: {
code: MCPErrorCode;
message: string;
data?: {
type?: string;
description?: string;
details?: any;
};
};
}
// 错误处理工具类
class MCPErrorHandler {
// 创建标准错误响应
static createErrorResponse(
id: string | number | null,
code: MCPErrorCode,
message: string,
data?: any
): MCPErrorResponse {
return {
jsonrpc: '2.0',
id,
error: {
code,
message,
data
}
};
}
// 创建工具错误
static createToolError(
id: string | number,
toolName: string,
error: Error
): MCPErrorResponse {
return this.createErrorResponse(
id,
MCPErrorCode.TOOL_EXECUTION_ERROR,
`Tool execution failed: ${toolName}`,
{
type: 'tool_error',
description: error.message,
details: {
toolName,
originalError: error.stack
}
}
);
}
// 创建资源错误
static createResourceError(
id: string | number,
uri: string,
message: string
): MCPErrorResponse {
return this.createErrorResponse(
id,
MCPErrorCode.RESOURCE_NOT_FOUND,
message,
{
type: 'resource_error',
uri
}
);
}
}
// 错误处理示例
class MCPServerErrorHandling {
async handleRequest(request: any): Promise<any> {
try {
// 验证请求格式
if (!request.jsonrpc || request.jsonrpc !== '2.0') {
return MCPErrorHandler.createErrorResponse(
request.id || null,
MCPErrorCode.INVALID_REQUEST,
'Invalid JSON-RPC version'
);
}
if (!request.method) {
return MCPErrorHandler.createErrorResponse(
request.id || null,
MCPErrorCode.INVALID_REQUEST,
'Missing method'
);
}
// 路由到具体的处理方法
switch (request.method) {
case 'tools/call':
return await this.handleToolCall(request);
case 'resources/read':
return await this.handleResourceRead(request);
default:
return MCPErrorHandler.createErrorResponse(
request.id,
MCPErrorCode.METHOD_NOT_FOUND,
`Method not found: ${request.method}`
);
}
} catch (error) {
return MCPErrorHandler.createErrorResponse(
request.id || null,
MCPErrorCode.INTERNAL_ERROR,
'Internal server error',
{
type: 'internal_error',
description: error.message
}
);
}
}
private async handleToolCall(request: any): Promise<any> {
try {
// 工具调用逻辑
return { /* 成功响应 */ };
} catch (error) {
return MCPErrorHandler.createToolError(
request.id,
request.params?.name || 'unknown',
error
);
}
}
private async handleResourceRead(request: any): Promise<any> {
const uri = request.params?.uri;
if (!uri) {
return MCPErrorHandler.createErrorResponse(
request.id,
MCPErrorCode.INVALID_PARAMS,
'Missing required parameter: uri'
);
}
// 资源读取逻辑
return { /* 成功响应 */ };
}
}
3.4 协议版本控制和兼容性
版本协商
// 版本兼容性检查
class MCPVersionManager {
private readonly supportedVersions = ['2024-11-05', '2024-10-07'];
private readonly currentVersion = '2024-11-05';
// 检查版本兼容性
isVersionSupported(clientVersion: string): boolean {
return this.supportedVersions.includes(clientVersion);
}
// 获取兼容的版本
getNegotiatedVersion(clientVersion: string): string | null {
if (this.isVersionSupported(clientVersion)) {
return clientVersion;
}
// 返回支持的最高版本
return this.supportedVersions[0];
}
// 版本特性检查
hasFeature(version: string, feature: string): boolean {
const features = {
'2024-11-05': [
'streaming_support',
'resource_subscriptions',
'tool_list_changes',
'enhanced_error_handling'
],
'2024-10-07': [
'basic_tools',
'basic_resources',
'basic_prompts'
]
};
return features[version]?.includes(feature) || false;
}
// 初始化时的版本协商
negotiateVersion(initRequest: InitializeRequest): {
version: string;
capabilities: any;
errors?: string[];
} {
const clientVersion = initRequest.params.protocolVersion;
const negotiatedVersion = this.getNegotiatedVersion(clientVersion);
const errors: string[] = [];
if (!negotiatedVersion) {
errors.push(`Unsupported protocol version: ${clientVersion}`);
return { version: this.currentVersion, capabilities: {}, errors };
}
if (negotiatedVersion !== clientVersion) {
errors.push(`Version downgraded from ${clientVersion} to ${negotiatedVersion}`);
}
// 根据协商的版本调整能力
const capabilities = this.getCapabilitiesForVersion(
negotiatedVersion,
initRequest.params.capabilities
);
return { version: negotiatedVersion, capabilities, errors };
}
private getCapabilitiesForVersion(version: string, clientCapabilities: any): any {
const baseCapabilities = {
tools: {},
resources: {},
prompts: {}
};
if (this.hasFeature(version, 'tool_list_changes')) {
baseCapabilities.tools = { listChanged: true };
}
if (this.hasFeature(version, 'resource_subscriptions')) {
baseCapabilities.resources = { subscribe: true, listChanged: true };
}
if (this.hasFeature(version, 'streaming_support')) {
baseCapabilities.experimental = {
streaming: true
};
}
return baseCapabilities;
}
}
向后兼容性处理
// 兼容性适配器
class MCPCompatibilityAdapter {
// 适配旧版本的消息格式
adaptMessage(message: any, fromVersion: string, toVersion: string): any {
if (fromVersion === toVersion) {
return message;
}
// 从旧版本适配到新版本
if (fromVersion === '2024-10-07' && toVersion === '2024-11-05') {
return this.adaptFrom20241007To20241105(message);
}
// 从新版本适配到旧版本
if (fromVersion === '2024-11-05' && toVersion === '2024-10-07') {
return this.adaptFrom20241105To20241007(message);
}
return message;
}
private adaptFrom20241007To20241105(message: any): any {
// 添加新版本支持的字段
if (message.method === 'tools/call' && message.result) {
if (!message.result.content) {
message.result.content = [
{
type: 'text',
text: message.result.text || JSON.stringify(message.result)
}
];
delete message.result.text;
}
}
return message;
}
private adaptFrom20241105To20241007(message: any): any {
// 移除旧版本不支持的字段
if (message.method === 'tools/call' && message.result?.content) {
const firstContent = message.result.content[0];
if (firstContent?.type === 'text') {
message.result.text = firstContent.text;
delete message.result.content;
}
}
return message;
}
}
3.5 流式传输和批量操作
流式传输支持
// 流式响应接口
interface StreamingResponse {
jsonrpc: '2.0';
id: string | number;
result?: {
stream: true;
streamId: string;
};
error?: any;
}
// 流式数据块
interface StreamChunk {
jsonrpc: '2.0';
method: 'notifications/stream/chunk';
params: {
streamId: string;
chunk: {
type: 'text' | 'image' | 'data';
content: any;
final?: boolean; // 是否为最后一块
};
};
}
// 流式传输管理器
class MCPStreamManager {
private streams: Map<string, any> = new Map();
// 创建流式响应
createStream(requestId: string | number): { response: StreamingResponse; streamId: string } {
const streamId = `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
this.streams.set(streamId, {
requestId,
startTime: Date.now(),
chunks: []
});
return {
response: {
jsonrpc: '2.0',
id: requestId,
result: {
stream: true,
streamId
}
},
streamId
};
}
// 发送流式数据块
sendChunk(streamId: string, content: any, type: 'text' | 'image' | 'data' = 'text', final: boolean = false): StreamChunk {
const stream = this.streams.get(streamId);
if (!stream) {
throw new Error(`Stream not found: ${streamId}`);
}
const chunk: StreamChunk = {
jsonrpc: '2.0',
method: 'notifications/stream/chunk',
params: {
streamId,
chunk: {
type,
content,
final
}
}
};
stream.chunks.push(chunk);
if (final) {
this.streams.delete(streamId);
}
return chunk;
}
// 流式工具调用示例
async handleStreamingToolCall(request: CallToolRequest): Promise<StreamingResponse> {
const { response, streamId } = this.createStream(request.id);
// 异步执行工具并流式返回结果
this.executeToolWithStreaming(request.params.name, request.params.arguments || {}, streamId);
return response;
}
private async executeToolWithStreaming(toolName: string, args: any, streamId: string): Promise<void> {
try {
// 模拟长时间运行的工具
const results = await this.simulateLongRunningTool(toolName, args);
for (let i = 0; i < results.length; i++) {
const isLast = i === results.length - 1;
this.sendChunk(streamId, results[i], 'text', isLast);
// 模拟处理延迟
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
this.sendChunk(streamId, { error: error.message }, 'text', true);
}
}
private async simulateLongRunningTool(toolName: string, args: any): Promise<string[]> {
// 模拟工具执行,分块返回结果
return [
'Step 1: Initializing...',
'Step 2: Processing data...',
'Step 3: Computing results...',
'Step 4: Finalizing...',
'Completed successfully!'
];
}
}
批量操作支持
// 批量请求
interface BatchRequest {
jsonrpc: '2.0';
id: string | number;
method: 'batch';
params: {
requests: Array<{
method: string;
params?: any;
id: string | number;
}>;
};
}
// 批量响应
interface BatchResponse {
jsonrpc: '2.0';
id: string | number;
result: {
responses: Array<{
id: string | number;
result?: any;
error?: any;
}>;
};
}
// 批量操作处理器
class MCPBatchHandler {
// 处理批量请求
async handleBatch(request: BatchRequest): Promise<BatchResponse> {
const responses = [];
for (const subRequest of request.params.requests) {
try {
const response = await this.handleSingleRequest({
jsonrpc: '2.0',
id: subRequest.id,
method: subRequest.method,
params: subRequest.params
});
responses.push({
id: subRequest.id,
result: response.result,
error: response.error
});
} catch (error) {
responses.push({
id: subRequest.id,
error: {
code: MCPErrorCode.INTERNAL_ERROR,
message: error.message
}
});
}
}
return {
jsonrpc: '2.0',
id: request.id,
result: {
responses
}
};
}
private async handleSingleRequest(request: any): Promise<any> {
// 处理单个请求的逻辑
return { result: { success: true } };
}
}
3.6 协议扩展机制
实验性功能支持
// 实验性功能定义
interface ExperimentalFeatures {
streaming?: {
supported: boolean;
maxStreams?: number;
};
batch?: {
supported: boolean;
maxBatchSize?: number;
};
customTransports?: {
supported: boolean;
transports: string[];
};
advancedAuth?: {
supported: boolean;
methods: string[];
};
}
// 扩展能力协商
class MCPExtensionManager {
private supportedExtensions: Map<string, any> = new Map();
registerExtension(name: string, definition: any): void {
this.supportedExtensions.set(name, definition);
}
// 处理扩展功能的初始化
negotiateExtensions(clientCapabilities: any): ExperimentalFeatures {
const experimental: ExperimentalFeatures = {};
// 流式传输扩展
if (clientCapabilities.experimental?.streaming && this.supportedExtensions.has('streaming')) {
experimental.streaming = {
supported: true,
maxStreams: 10
};
}
// 批量操作扩展
if (clientCapabilities.experimental?.batch && this.supportedExtensions.has('batch')) {
experimental.batch = {
supported: true,
maxBatchSize: 100
};
}
return experimental;
}
// 处理扩展消息
async handleExtensionMessage(method: string, params: any): Promise<any> {
const [namespace, action] = method.split('/');
if (namespace === 'experimental') {
return await this.handleExperimentalMessage(action, params);
}
throw new Error(`Unknown extension namespace: ${namespace}`);
}
private async handleExperimentalMessage(action: string, params: any): Promise<any> {
switch (action) {
case 'stream_start':
return await this.handleStreamStart(params);
case 'batch_execute':
return await this.handleBatchExecute(params);
default:
throw new Error(`Unknown experimental action: ${action}`);
}
}
private async handleStreamStart(params: any): Promise<any> {
// 实现流式传输开始逻辑
return { streamId: 'stream_123', started: true };
}
private async handleBatchExecute(params: any): Promise<any> {
// 实现批量执行逻辑
return { batchId: 'batch_456', status: 'processing' };
}
}
小结
通过本章的学习,我们深入了解了MCP协议的技术规范:
- 协议基础:基于JSON-RPC 2.0的双向通信协议
- 消息类型:初始化、工具、资源、提示模板等消息格式
- 错误处理:标准化的错误代码和处理机制
- 版本控制:版本协商和向后兼容性
- 高级特性:流式传输、批量操作、协议扩展
这些规范为开发健壮、可靠的MCP Server奠定了坚实的基础。接下来我们将学习如何开发具体的工具功能。