Chapter 3: MCP Protocol Specification
Haiyue
22min
Chapter 3: MCP Protocol Specification
Learning Objectives
- Deeply understand the message format of MCP protocol
- Master request-response patterns and event streams
- Learn protocol version control and compatibility
- Understand error handling and exception cases
- Master protocol extension mechanisms
Knowledge Summary
MCP Protocol Architecture
MCP (Model Context Protocol) is built on JSON-RPC 2.0 protocol, adopting bidirectional communication mode and supporting two message types: request-response and notification.
🔄 正在渲染 Mermaid 图表...
Message Format Specification
| Message Type | Format Requirements | Required Fields | Optional Fields |
|---|---|---|---|
| Request Message | JSON-RPC 2.0 Request | jsonrpc, method, id | params |
| Response Message | JSON-RPC 2.0 Response | jsonrpc, id | result or error |
| Notification Message | JSON-RPC 2.0 Notification | jsonrpc, method | params |
| Error Message | JSON-RPC 2.0 Error | jsonrpc, id, error | - |
Protocol Message Details
1. Basic Message Structure
# MCP protocol message structure example
import json
from typing import Dict, Any, Optional, Union
from enum import Enum
class MCPMessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
ERROR = "error"
class MCPMessage:
"""MCP protocol message base class"""
def __init__(self, jsonrpc: str = "2.0"):
self.jsonrpc = jsonrpc
def to_dict(self) -> Dict[str, Any]:
return {"jsonrpc": self.jsonrpc}
class MCPRequest(MCPMessage):
"""MCP request message"""
def __init__(self, method: str, id: Union[str, int], params: Optional[Dict] = None):
super().__init__()
self.method = method
self.id = id
self.params = params or {}
def to_dict(self) -> Dict[str, Any]:
result = super().to_dict()
result.update({
"method": self.method,
"id": self.id,
"params": self.params
})
return result
class MCPResponse(MCPMessage):
"""MCP response message"""
def __init__(self, id: Union[str, int], result: Optional[Dict] = None, error: Optional[Dict] = None):
super().__init__()
self.id = id
self.result = result
self.error = error
def to_dict(self) -> Dict[str, Any]:
result = super().to_dict()
result["id"] = self.id
if self.error:
result["error"] = self.error
else:
result["result"] = self.result or {}
return result
class MCPNotification(MCPMessage):
"""MCP notification message"""
def __init__(self, method: str, params: Optional[Dict] = None):
super().__init__()
self.method = method
self.params = params or {}
def to_dict(self) -> Dict[str, Any]:
result = super().to_dict()
result.update({
"method": self.method,
"params": self.params
})
return result
# Usage example
request = MCPRequest("tools/list", 1)
response = MCPResponse(1, {"tools": []})
notification = MCPNotification("initialized")
print("Request message:", json.dumps(request.to_dict(), indent=2, ensure_ascii=False))
print("Response message:", json.dumps(response.to_dict(), indent=2, ensure_ascii=False))
print("Notification message:", json.dumps(notification.to_dict(), indent=2, ensure_ascii=False))
2. Protocol Lifecycle
# MCP protocol lifecycle management
from enum import Enum
from typing import List, Callable
import asyncio
class MCPConnectionState(Enum):
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
INITIALIZING = "initializing"
READY = "ready"
SHUTTING_DOWN = "shutting_down"
class MCPLifecycleManager:
"""MCP protocol lifecycle manager"""
def __init__(self):
self.state = MCPConnectionState.DISCONNECTED
self.capabilities = {
"tools": {},
"resources": {},
"prompts": {},
"logging": {}
}
self.client_info = {}
self.server_info = {}
async def initialize(self, client_info: Dict[str, Any]):
"""Initialize protocol connection"""
self.state = MCPConnectionState.INITIALIZING
self.client_info = client_info
# Simulate initialization process
print(f"Initializing MCP connection...")
print(f"Client info: {client_info}")
# Set server info and capabilities
self.server_info = {
"name": "example-mcp-server",
"version": "1.0.0",
"description": "MCP protocol example server"
}
# Return initialization response
init_response = {
"protocolVersion": "2024-11-05",
"capabilities": self.capabilities,
"serverInfo": self.server_info
}
self.state = MCPConnectionState.READY
print(f"MCP connection initialization complete")
return init_response
def get_initialize_request(self, client_info: Dict[str, Any]) -> MCPRequest:
"""Generate initialization request"""
return MCPRequest(
method="initialize",
id="init-1",
params={
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {"listChanged": True},
"sampling": {}
},
"clientInfo": client_info
}
)
def get_initialized_notification(self) -> MCPNotification:
"""Generate initialization complete notification"""
return MCPNotification("initialized")
async def shutdown(self):
"""Close protocol connection"""
self.state = MCPConnectionState.SHUTTING_DOWN
print("Closing MCP connection...")
# Cleanup resources
self.capabilities.clear()
self.client_info.clear()
self.server_info.clear()
self.state = MCPConnectionState.DISCONNECTED
print("MCP connection closed")
# Lifecycle example
async def lifecycle_example():
manager = MCPLifecycleManager()
# 1. Initialize
client_info = {
"name": "example-client",
"version": "1.0.0"
}
init_request = manager.get_initialize_request(client_info)
print("Send initialization request:", json.dumps(init_request.to_dict(), indent=2, ensure_ascii=False))
# 2. Process initialization
init_response = await manager.initialize(client_info)
print("Initialization response:", json.dumps(init_response, indent=2, ensure_ascii=False))
# 3. Send initialization complete notification
initialized_notif = manager.get_initialized_notification()
print("Send initialization complete notification:", json.dumps(initialized_notif.to_dict(), indent=2, ensure_ascii=False))
# 4. Close connection
await manager.shutdown()
# Run example
# asyncio.run(lifecycle_example())
print("Protocol lifecycle manager ready")
3. Error Handling Specification
# MCP protocol error handling
from typing import Optional
class MCPErrorCode:
"""MCP standard error codes"""
# JSON-RPC standard error codes
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
# MCP specific error codes
INVALID_TOOL = -32000
TOOL_EXECUTION_ERROR = -32001
RESOURCE_NOT_FOUND = -32002
RESOURCE_ACCESS_DENIED = -32003
PROMPT_NOT_FOUND = -32004
INVALID_PROMPT_ARGUMENTS = -32005
class MCPError(Exception):
"""MCP protocol error"""
def __init__(self, code: int, message: str, data: Optional[Dict] = None):
super().__init__(message)
self.code = code
self.message = message
self.data = data or {}
def to_dict(self) -> Dict[str, Any]:
error = {
"code": self.code,
"message": self.message
}
if self.data:
error["data"] = self.data
return error
class MCPErrorHandler:
"""MCP error handler"""
@staticmethod
def create_error_response(request_id: Union[str, int], error: MCPError) -> MCPResponse:
"""Create error response"""
return MCPResponse(id=request_id, error=error.to_dict())
@staticmethod
def handle_parse_error() -> MCPError:
"""Handle parse error"""
return MCPError(
MCPErrorCode.PARSE_ERROR,
"Parse error",
{"description": "JSON parse failed"}
)
@staticmethod
def handle_invalid_request(details: str = "") -> MCPError:
"""Handle invalid request"""
return MCPError(
MCPErrorCode.INVALID_REQUEST,
"Invalid Request",
{"description": f"Request format invalid: {details}"}
)
@staticmethod
def handle_method_not_found(method: str) -> MCPError:
"""Handle method not found"""
return MCPError(
MCPErrorCode.METHOD_NOT_FOUND,
"Method not found",
{"method": method, "description": f"Method '{method}' does not exist"}
)
@staticmethod
def handle_invalid_params(method: str, details: str = "") -> MCPError:
"""Handle invalid parameters"""
return MCPError(
MCPErrorCode.INVALID_PARAMS,
"Invalid params",
{"method": method, "description": f"Parameters invalid: {details}"}
)
@staticmethod
def handle_tool_error(tool_name: str, error_msg: str) -> MCPError:
"""Handle tool execution error"""
return MCPError(
MCPErrorCode.TOOL_EXECUTION_ERROR,
"Tool execution error",
{
"tool": tool_name,
"error": error_msg,
"description": f"Tool '{tool_name}' execution failed: {error_msg}"
}
)
# Error handling examples
def error_handling_examples():
"""Error handling examples"""
handler = MCPErrorHandler()
# 1. Method not found error
method_error = handler.handle_method_not_found("unknown/method")
error_response = handler.create_error_response("req-1", method_error)
print("Method not found error response:")
print(json.dumps(error_response.to_dict(), indent=2, ensure_ascii=False))
# 2. Tool execution error
tool_error = handler.handle_tool_error("file_reader", "File does not exist")
tool_error_response = handler.create_error_response("req-2", tool_error)
print("\nTool execution error response:")
print(json.dumps(tool_error_response.to_dict(), indent=2, ensure_ascii=False))
# 3. Invalid parameters error
param_error = handler.handle_invalid_params("tools/call", "Missing required parameter 'name'")
param_error_response = handler.create_error_response("req-3", param_error)
print("\nInvalid parameters error response:")
print(json.dumps(param_error_response.to_dict(), indent=2, ensure_ascii=False))
error_handling_examples()
Protocol Extension Mechanism
Custom Capability Extension
Extension Principles
MCP protocol supports extensions in the following ways:
- Custom Methods - Add new RPC methods
- Capability Declaration - Declare custom capabilities in capabilities
- Parameter Extension - Extend parameters of existing methods
- Metadata Annotations - Add additional metadata information
# Protocol extension example
class MCPExtension:
"""MCP protocol extension"""
def __init__(self, name: str, version: str):
self.name = name
self.version = version
self.custom_capabilities = {}
self.custom_methods = {}
def register_capability(self, capability_name: str, capability_spec: Dict):
"""Register custom capability"""
self.custom_capabilities[capability_name] = capability_spec
print(f"Registered custom capability: {capability_name}")
def register_method(self, method_name: str, handler: Callable):
"""Register custom method"""
self.custom_methods[method_name] = handler
print(f"Registered custom method: {method_name}")
def get_extended_capabilities(self) -> Dict:
"""Get extended capability declaration"""
base_capabilities = {
"tools": {},
"resources": {},
"prompts": {}
}
# Add extended capabilities
base_capabilities.update(self.custom_capabilities)
# Add extension metadata
base_capabilities["extensions"] = {
self.name: {
"version": self.version,
"methods": list(self.custom_methods.keys()),
"capabilities": list(self.custom_capabilities.keys())
}
}
return base_capabilities
# Extension example: Add database query capability
def create_database_extension():
"""Create database extension"""
ext = MCPExtension("database", "1.0.0")
# Register database query capability
ext.register_capability("database", {
"query": True,
"transaction": True,
"schema_introspection": True
})
# Register database related methods
def handle_db_query(params):
return {
"columns": ["id", "name", "email"],
"rows": [
[1, "Alice", "alice@example.com"],
[2, "Bob", "bob@example.com"]
]
}
def handle_db_schema(params):
return {
"tables": ["users", "posts", "comments"],
"views": ["user_posts"],
"procedures": ["get_user_stats"]
}
ext.register_method("database/query", handle_db_query)
ext.register_method("database/schema", handle_db_schema)
return ext
# Use extension
db_ext = create_database_extension()
capabilities = db_ext.get_extended_capabilities()
print("Extended capability declaration:")
print(json.dumps(capabilities, indent=2, ensure_ascii=False))
Version Control and Compatibility
Protocol Version Management
# Version compatibility handling
from packaging import version
from typing import Set
class MCPVersionManager:
"""MCP version manager"""
SUPPORTED_VERSIONS = ["2024-11-05", "2024-10-07", "2024-09-25"]
CURRENT_VERSION = "2024-11-05"
@classmethod
def is_version_supported(cls, protocol_version: str) -> bool:
"""Check if protocol version is supported"""
return protocol_version in cls.SUPPORTED_VERSIONS
@classmethod
def get_compatible_features(cls, protocol_version: str) -> Set[str]:
"""Get features supported by version"""
features = set()
# Base features (all versions)
features.update(["tools", "resources", "prompts"])
# Features added in 2024-10-07
if version.parse(protocol_version) >= version.parse("2024-10-07"):
features.update(["logging", "progress"])
# Features added in 2024-11-05
if version.parse(protocol_version) >= version.parse("2024-11-05"):
features.update(["sampling", "roots"])
return features
@classmethod
def negotiate_version(cls, client_version: str) -> str:
"""Negotiate protocol version"""
if cls.is_version_supported(client_version):
return client_version
# Choose highest compatible version
compatible_versions = [v for v in cls.SUPPORTED_VERSIONS
if version.parse(v) <= version.parse(client_version)]
if compatible_versions:
return max(compatible_versions, key=version.parse)
# Use lowest supported version
return min(cls.SUPPORTED_VERSIONS, key=version.parse)
# Version compatibility example
def version_compatibility_example():
"""Version compatibility example"""
vm = MCPVersionManager()
test_versions = ["2024-11-05", "2024-10-07", "2024-12-01", "2024-08-01"]
for test_version in test_versions:
print(f"\nTest version: {test_version}")
if vm.is_version_supported(test_version):
print(f"Directly supports version {test_version}")
negotiated = test_version
else:
negotiated = vm.negotiate_version(test_version)
print(f"Negotiated version: {test_version} -> {negotiated}")
features = vm.get_compatible_features(negotiated)
print(f"Supported features: {', '.join(sorted(features))}")
version_compatibility_example()
Message Transport and Serialization
Transport Layer Abstraction
Transport Requirements
MCP protocol supports multiple transport methods:
- Standard Input/Output - Most common, suitable for local tool integration
- HTTP - Suitable for network services and REST API integration
- WebSocket - Suitable for real-time communication and browser integration
- TCP Socket - Suitable for high-performance scenarios
# Transport layer abstraction
from abc import ABC, abstractmethod
import json
import asyncio
class MCPTransport(ABC):
"""MCP transport layer abstract base class"""
@abstractmethod
async def send(self, message: Dict[str, Any]) -> None:
"""Send message"""
pass
@abstractmethod
async def receive(self) -> Dict[str, Any]:
"""Receive message"""
pass
@abstractmethod
async def close(self) -> None:
"""Close connection"""
pass
class StdioTransport(MCPTransport):
"""Standard input/output transport"""
def __init__(self):
self.reader = None
self.writer = None
async def initialize(self):
"""Initialize stdio transport"""
self.reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(self.reader)
await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
transport, protocol = await asyncio.get_event_loop().connect_write_pipe(
lambda: asyncio.StreamReaderProtocol(asyncio.StreamReader()),
sys.stdout
)
self.writer = asyncio.StreamWriter(transport, protocol, self.reader, asyncio.get_event_loop())
async def send(self, message: Dict[str, Any]) -> None:
"""Send JSON message to stdout"""
json_str = json.dumps(message, ensure_ascii=False) + "\n"
self.writer.write(json_str.encode())
await self.writer.drain()
async def receive(self) -> Dict[str, Any]:
"""Receive JSON message from stdin"""
line = await self.reader.readline()
return json.loads(line.decode().strip())
async def close(self) -> None:
"""Close transport"""
if self.writer:
self.writer.close()
await self.writer.wait_closed()
class HTTPTransport(MCPTransport):
"""HTTP transport"""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.session = None
async def send(self, message: Dict[str, Any]) -> None:
"""Send HTTP POST request"""
import aiohttp
if not self.session:
self.session = aiohttp.ClientSession()
async with self.session.post(
f"{self.base_url}/mcp",
json=message,
headers={"Content-Type": "application/json"}
) as response:
if response.status != 200:
raise MCPError(-32603, f"HTTP error: {response.status}")
async def receive(self) -> Dict[str, Any]:
"""HTTP transport reception usually handled via callbacks"""
# HTTP is request-response mode, usually no active reception needed
raise NotImplementedError("HTTP transport uses request-response mode")
async def close(self) -> None:
"""Close HTTP session"""
if self.session:
await self.session.close()
# Transport layer usage example
async def transport_example():
"""Transport layer usage example"""
# Use stdio transport
stdio_transport = StdioTransport()
# await stdio_transport.initialize() # Uncomment when actually using
# Example message
test_message = {
"jsonrpc": "2.0",
"method": "tools/list",
"id": 1
}
print("Transport layer example message:")
print(json.dumps(test_message, indent=2, ensure_ascii=False))
# HTTP transport example
http_transport = HTTPTransport("http://localhost:3000")
print("\nHTTP transport configured")
# Run example
# asyncio.run(transport_example())
print("Transport layer abstraction ready")
Through this chapter, we gained a deep understanding of the MCP protocol’s technical specifications, including message formats, lifecycle management, error handling, extension mechanisms, and version compatibility. This knowledge provides a solid protocol foundation for implementing specific MCP Server functions.