第14章:高级特性和扩展开发
2025/9/1大约 13 分钟
第14章:高级特性和扩展开发
学习目标
- 掌握MCP协议的高级特性和扩展机制
- 开发自定义协议扩展和插件
- 实现多语言MCP Server支持
- 学习与其他AI框架的集成
- 探索MCP在企业级应用中的实践
14.1 协议高级特性
14.1.1 自定义协议扩展
// src/extensions/CustomProtocolExtension.ts
export interface ProtocolExtension {
name: string;
version: string;
methods: Map<string, MethodHandler>;
notifications: Map<string, NotificationHandler>;
capabilities: ExtensionCapabilities;
}
export interface ExtensionCapabilities {
streaming?: {
supported: boolean;
maxChunkSize?: number;
compressionMethods?: string[];
};
batch?: {
supported: boolean;
maxBatchSize?: number;
};
subscription?: {
supported: boolean;
subscriptionTypes?: string[];
};
customTypes?: {
[typeName: string]: JSONSchema;
};
}
// 流式处理扩展
export class StreamingExtension implements ProtocolExtension {
name = 'streaming';
version = '1.0.0';
methods = new Map<string, MethodHandler>();
notifications = new Map<string, NotificationHandler>();
capabilities: ExtensionCapabilities = {
streaming: {
supported: true,
maxChunkSize: 64 * 1024, // 64KB
compressionMethods: ['gzip', 'deflate']
}
};
constructor() {
this.setupStreamingMethods();
}
private setupStreamingMethods(): void {
// 流式工具调用
this.methods.set('tools/call-stream', async (params, context) => {
const { name, arguments: args } = params;
const tool = context.server.getTool(name);
if (!tool || !tool.supportsStreaming) {
throw new Error(`Tool ${name} does not support streaming`);
}
// 创建流式响应通道
const streamId = this.generateStreamId();
context.response.setHeader('X-Stream-ID', streamId);
// 执行工具并流式返回结果
const stream = tool.executeStream(args);
for await (const chunk of stream) {
await context.sendNotification('tools/stream-chunk', {
streamId,
chunk: await this.compressChunk(chunk),
metadata: {
timestamp: Date.now(),
sequence: chunk.sequence
}
});
}
// 流结束通知
await context.sendNotification('tools/stream-end', {
streamId,
totalChunks: stream.totalChunks,
finalSize: stream.totalSize
});
return { streamId, status: 'streaming' };
});
// 流式资源读取
this.methods.set('resources/read-stream', async (params, context) => {
const { uri, range } = params;
const resource = await context.server.getResource(uri);
if (!resource.supportsStreaming) {
throw new Error(`Resource ${uri} does not support streaming`);
}
const streamId = this.generateStreamId();
const stream = resource.createReadStream(range);
for await (const chunk of stream) {
await context.sendNotification('resources/stream-chunk', {
streamId,
data: chunk.toString('base64'),
position: chunk.position,
size: chunk.length
});
}
return { streamId, totalSize: stream.totalSize };
});
}
private async compressChunk(chunk: any): Promise<string> {
// 使用gzip压缩数据块
const zlib = await import('zlib');
const compressed = zlib.gzipSync(JSON.stringify(chunk));
return compressed.toString('base64');
}
private generateStreamId(): string {
return `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
// 批处理扩展
export class BatchExtension implements ProtocolExtension {
name = 'batch';
version = '1.0.0';
methods = new Map<string, MethodHandler>();
notifications = new Map<string, NotificationHandler>();
capabilities: ExtensionCapabilities = {
batch: {
supported: true,
maxBatchSize: 100
}
};
constructor() {
this.setupBatchMethods();
}
private setupBatchMethods(): void {
// 批量工具调用
this.methods.set('tools/call-batch', async (params, context) => {
const { operations } = params;
if (operations.length > this.capabilities.batch!.maxBatchSize!) {
throw new Error(`Batch size exceeds maximum allowed: ${this.capabilities.batch!.maxBatchSize}`);
}
const results = await Promise.allSettled(
operations.map(async (op: any, index: number) => {
try {
const tool = context.server.getTool(op.name);
if (!tool) {
throw new Error(`Tool not found: ${op.name}`);
}
const result = await tool.execute(op.arguments);
return {
index,
status: 'fulfilled',
result
};
} catch (error) {
return {
index,
status: 'rejected',
error: error.message
};
}
})
);
return {
results,
summary: {
total: operations.length,
successful: results.filter(r => r.status === 'fulfilled').length,
failed: results.filter(r => r.status === 'rejected').length
}
};
});
// 批量资源读取
this.methods.set('resources/read-batch', async (params, context) => {
const { uris } = params;
const results = await Promise.allSettled(
uris.map(async (uri: string) => {
const resource = await context.server.getResource(uri);
return {
uri,
content: await resource.read(),
metadata: resource.getMetadata()
};
})
);
return { results };
});
}
}
14.1.2 订阅和事件系统
// src/extensions/SubscriptionExtension.ts
export interface Subscription {
id: string;
type: string;
filter?: SubscriptionFilter;
clientId: string;
createdAt: Date;
isActive: boolean;
}
export interface SubscriptionFilter {
patterns?: string[];
attributes?: Record<string, any>;
conditions?: FilterCondition[];
}
export interface FilterCondition {
field: string;
operator: 'eq' | 'ne' | 'gt' | 'lt' | 'contains' | 'matches';
value: any;
}
export class SubscriptionExtension implements ProtocolExtension {
name = 'subscription';
version = '1.0.0';
methods = new Map<string, MethodHandler>();
notifications = new Map<string, NotificationHandler>();
private subscriptions = new Map<string, Subscription>();
private eventEmitter = new EventEmitter();
capabilities: ExtensionCapabilities = {
subscription: {
supported: true,
subscriptionTypes: [
'resource-changes',
'tool-executions',
'server-events',
'custom-events'
]
}
};
constructor() {
this.setupSubscriptionMethods();
this.setupEventHandling();
}
private setupSubscriptionMethods(): void {
// 创建订阅
this.methods.set('subscription/create', async (params, context) => {
const { type, filter } = params;
const subscription: Subscription = {
id: this.generateSubscriptionId(),
type,
filter,
clientId: context.clientId,
createdAt: new Date(),
isActive: true
};
this.subscriptions.set(subscription.id, subscription);
// 注册事件监听
this.registerEventListener(subscription);
return {
subscriptionId: subscription.id,
status: 'active'
};
});
// 取消订阅
this.methods.set('subscription/cancel', async (params, context) => {
const { subscriptionId } = params;
const subscription = this.subscriptions.get(subscriptionId);
if (!subscription || subscription.clientId !== context.clientId) {
throw new Error('Subscription not found or access denied');
}
subscription.isActive = false;
this.unregisterEventListener(subscription);
return { status: 'cancelled' };
});
// 列出订阅
this.methods.set('subscription/list', async (params, context) => {
const clientSubscriptions = Array.from(this.subscriptions.values())
.filter(sub => sub.clientId === context.clientId && sub.isActive);
return {
subscriptions: clientSubscriptions.map(sub => ({
id: sub.id,
type: sub.type,
createdAt: sub.createdAt,
isActive: sub.isActive
}))
};
});
}
private setupEventHandling(): void {
// 资源变更事件
this.eventEmitter.on('resource-changed', async (event) => {
const relevantSubs = this.getRelevantSubscriptions('resource-changes', event);
for (const subscription of relevantSubs) {
await this.sendNotification(subscription, 'resource-changed', {
uri: event.uri,
changeType: event.changeType,
timestamp: event.timestamp,
metadata: event.metadata
});
}
});
// 工具执行事件
this.eventEmitter.on('tool-executed', async (event) => {
const relevantSubs = this.getRelevantSubscriptions('tool-executions', event);
for (const subscription of relevantSubs) {
await this.sendNotification(subscription, 'tool-executed', {
toolName: event.toolName,
success: event.success,
duration: event.duration,
result: event.result,
error: event.error
});
}
});
}
private registerEventListener(subscription: Subscription): void {
// 根据订阅类型注册相应的事件监听器
switch (subscription.type) {
case 'resource-changes':
// 监听文件系统变更
this.watchResourceChanges(subscription);
break;
case 'tool-executions':
// 监听工具执行
this.watchToolExecutions(subscription);
break;
case 'server-events':
// 监听服务器事件
this.watchServerEvents(subscription);
break;
default:
throw new Error(`Unsupported subscription type: ${subscription.type}`);
}
}
private async watchResourceChanges(subscription: Subscription): Promise<void> {
if (!subscription.filter?.patterns) return;
const chokidar = await import('chokidar');
for (const pattern of subscription.filter.patterns) {
const watcher = chokidar.watch(pattern);
watcher.on('change', (path) => {
this.eventEmitter.emit('resource-changed', {
uri: `file://${path}`,
changeType: 'modified',
timestamp: new Date(),
subscriptionId: subscription.id
});
});
watcher.on('add', (path) => {
this.eventEmitter.emit('resource-changed', {
uri: `file://${path}`,
changeType: 'created',
timestamp: new Date(),
subscriptionId: subscription.id
});
});
watcher.on('unlink', (path) => {
this.eventEmitter.emit('resource-changed', {
uri: `file://${path}`,
changeType: 'deleted',
timestamp: new Date(),
subscriptionId: subscription.id
});
});
}
}
private getRelevantSubscriptions(type: string, event: any): Subscription[] {
return Array.from(this.subscriptions.values())
.filter(sub =>
sub.isActive &&
sub.type === type &&
this.matchesFilter(sub.filter, event)
);
}
private matchesFilter(filter?: SubscriptionFilter, event?: any): boolean {
if (!filter) return true;
// 检查模式匹配
if (filter.patterns && event.uri) {
const matches = filter.patterns.some(pattern => {
const regex = new RegExp(pattern.replace(/\*/g, '.*'));
return regex.test(event.uri);
});
if (!matches) return false;
}
// 检查属性匹配
if (filter.attributes) {
for (const [key, value] of Object.entries(filter.attributes)) {
if (event[key] !== value) return false;
}
}
// 检查条件匹配
if (filter.conditions) {
for (const condition of filter.conditions) {
if (!this.evaluateCondition(condition, event)) {
return false;
}
}
}
return true;
}
private evaluateCondition(condition: FilterCondition, event: any): boolean {
const fieldValue = this.getNestedValue(event, condition.field);
switch (condition.operator) {
case 'eq':
return fieldValue === condition.value;
case 'ne':
return fieldValue !== condition.value;
case 'gt':
return fieldValue > condition.value;
case 'lt':
return fieldValue < condition.value;
case 'contains':
return String(fieldValue).includes(condition.value);
case 'matches':
return new RegExp(condition.value).test(String(fieldValue));
default:
return false;
}
}
private getNestedValue(obj: any, path: string): any {
return path.split('.').reduce((current, key) => current?.[key], obj);
}
private async sendNotification(subscription: Subscription, method: string, params: any): Promise<void> {
// 实际发送通知的实现
// 这里应该调用客户端连接管理器发送通知
}
private generateSubscriptionId(): string {
return `sub_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
14.2 插件系统设计
14.2.1 插件架构
// src/plugins/PluginSystem.ts
export interface Plugin {
name: string;
version: string;
description: string;
dependencies?: string[];
// 生命周期方法
initialize(context: PluginContext): Promise<void>;
activate(context: PluginContext): Promise<void>;
deactivate(): Promise<void>;
dispose(): Promise<void>;
// 扩展点
tools?: ToolContribution[];
resources?: ResourceContribution[];
prompts?: PromptContribution[];
commands?: CommandContribution[];
}
export interface PluginContext {
server: MCPServer;
logger: Logger;
storage: PluginStorage;
eventBus: EventBus;
extensionPoints: ExtensionPointRegistry;
}
export interface ToolContribution {
name: string;
description: string;
inputSchema: JSONSchema;
handler: ToolHandler;
metadata?: ToolMetadata;
}
export interface ResourceContribution {
scheme: string;
handler: ResourceHandler;
subscriptionSupport?: boolean;
}
export interface PromptContribution {
name: string;
description: string;
template: PromptTemplate;
parameters?: ParameterDefinition[];
}
export class PluginManager {
private plugins = new Map<string, PluginInstance>();
private registry: PluginRegistry;
private loader: PluginLoader;
private dependencyResolver: DependencyResolver;
constructor(private context: PluginContext) {
this.registry = new PluginRegistry();
this.loader = new PluginLoader();
this.dependencyResolver = new DependencyResolver();
}
async loadPlugin(pluginPath: string): Promise<void> {
try {
// 加载插件模块
const pluginModule = await this.loader.load(pluginPath);
const plugin = pluginModule.default || pluginModule;
// 验证插件接口
this.validatePlugin(plugin);
// 检查依赖
await this.resolveDependencies(plugin);
// 创建插件实例
const instance = new PluginInstance(plugin, this.context);
// 初始化插件
await plugin.initialize(this.context);
// 注册插件
this.plugins.set(plugin.name, instance);
this.registry.register(plugin);
this.context.logger.info(`Plugin loaded successfully: ${plugin.name}@${plugin.version}`);
} catch (error) {
this.context.logger.error(`Failed to load plugin: ${pluginPath}`, error);
throw error;
}
}
async activatePlugin(pluginName: string): Promise<void> {
const instance = this.plugins.get(pluginName);
if (!instance) {
throw new Error(`Plugin not found: ${pluginName}`);
}
if (instance.isActive) {
return;
}
try {
// 激活依赖
if (instance.plugin.dependencies) {
for (const dep of instance.plugin.dependencies) {
await this.activatePlugin(dep);
}
}
// 激活插件
await instance.plugin.activate(this.context);
// 注册贡献
await this.registerContributions(instance);
instance.isActive = true;
this.context.logger.info(`Plugin activated: ${pluginName}`);
} catch (error) {
this.context.logger.error(`Failed to activate plugin: ${pluginName}`, error);
throw error;
}
}
async deactivatePlugin(pluginName: string): Promise<void> {
const instance = this.plugins.get(pluginName);
if (!instance || !instance.isActive) {
return;
}
try {
// 注销贡献
await this.unregisterContributions(instance);
// 停用插件
await instance.plugin.deactivate();
instance.isActive = false;
this.context.logger.info(`Plugin deactivated: ${pluginName}`);
} catch (error) {
this.context.logger.error(`Failed to deactivate plugin: ${pluginName}`, error);
throw error;
}
}
private async registerContributions(instance: PluginInstance): Promise<void> {
const { plugin } = instance;
// 注册工具
if (plugin.tools) {
for (const tool of plugin.tools) {
this.context.server.addTool({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema
}, tool.handler);
instance.contributions.tools.push(tool.name);
}
}
// 注册资源
if (plugin.resources) {
for (const resource of plugin.resources) {
this.context.server.addResourceProvider(resource.scheme, resource.handler);
instance.contributions.resources.push(resource.scheme);
}
}
// 注册提示
if (plugin.prompts) {
for (const prompt of plugin.prompts) {
this.context.server.addPrompt({
name: prompt.name,
description: prompt.description,
arguments: prompt.parameters || []
}, async (args) => {
return prompt.template.render(args);
});
instance.contributions.prompts.push(prompt.name);
}
}
}
private async unregisterContributions(instance: PluginInstance): Promise<void> {
// 注销工具
for (const toolName of instance.contributions.tools) {
this.context.server.removeTool(toolName);
}
// 注销资源
for (const scheme of instance.contributions.resources) {
this.context.server.removeResourceProvider(scheme);
}
// 注销提示
for (const promptName of instance.contributions.prompts) {
this.context.server.removePrompt(promptName);
}
// 清空贡献记录
instance.contributions = {
tools: [],
resources: [],
prompts: []
};
}
private validatePlugin(plugin: Plugin): void {
if (!plugin.name || !plugin.version) {
throw new Error('Plugin must have name and version');
}
if (typeof plugin.initialize !== 'function') {
throw new Error('Plugin must implement initialize method');
}
if (typeof plugin.activate !== 'function') {
throw new Error('Plugin must implement activate method');
}
}
private async resolveDependencies(plugin: Plugin): Promise<void> {
if (!plugin.dependencies) return;
for (const dep of plugin.dependencies) {
if (!this.plugins.has(dep)) {
throw new Error(`Missing dependency: ${dep}`);
}
}
}
getPlugin(name: string): PluginInstance | undefined {
return this.plugins.get(name);
}
getActivePlugins(): PluginInstance[] {
return Array.from(this.plugins.values()).filter(p => p.isActive);
}
getAllPlugins(): PluginInstance[] {
return Array.from(this.plugins.values());
}
}
class PluginInstance {
public isActive = false;
public contributions = {
tools: [] as string[],
resources: [] as string[],
prompts: [] as string[]
};
constructor(
public plugin: Plugin,
public context: PluginContext
) {}
}
// 示例插件实现
export class DatabasePlugin implements Plugin {
name = 'database';
version = '1.0.0';
description = 'Database integration plugin';
private dbPool?: any;
async initialize(context: PluginContext): Promise<void> {
// 初始化数据库连接池
this.dbPool = new DatabasePool({
host: process.env.DB_HOST,
database: process.env.DB_NAME,
username: process.env.DB_USER,
password: process.env.DB_PASSWORD
});
context.logger.info('Database plugin initialized');
}
async activate(context: PluginContext): Promise<void> {
context.logger.info('Database plugin activated');
}
async deactivate(): Promise<void> {
if (this.dbPool) {
await this.dbPool.close();
}
}
async dispose(): Promise<void> {
await this.deactivate();
}
get tools(): ToolContribution[] {
return [
{
name: 'db_query',
description: 'Execute database query',
inputSchema: {
type: 'object',
properties: {
sql: { type: 'string' },
params: { type: 'array', items: { type: 'string' } }
},
required: ['sql']
},
handler: this.executeQuery.bind(this)
}
];
}
get resources(): ResourceContribution[] {
return [
{
scheme: 'db',
handler: this.handleDatabaseResource.bind(this),
subscriptionSupport: true
}
];
}
private async executeQuery(args: any): Promise<any> {
const { sql, params = [] } = args;
const result = await this.dbPool.query(sql, params);
return {
rows: result.rows,
rowCount: result.rowCount
};
}
private async handleDatabaseResource(uri: string): Promise<any> {
// 解析数据库URI
const url = new URL(uri);
const tableName = url.pathname.slice(1);
const result = await this.dbPool.query(`SELECT * FROM ${tableName} LIMIT 100`);
return {
uri,
mimeType: 'application/json',
text: JSON.stringify(result.rows, null, 2),
metadata: {
table: tableName,
rowCount: result.rowCount
}
};
}
}
14.3 多语言支持
14.3.1 Python实现示例
# python/mcp_server/base.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
import json
import asyncio
import sys
from dataclasses import dataclass
@dataclass
class Tool:
name: str
description: str
input_schema: Dict[str, Any]
handler: Callable
@dataclass
class Resource:
uri: str
name: Optional[str] = None
description: Optional[str] = None
mime_type: Optional[str] = None
@dataclass
class Prompt:
name: str
description: str
arguments: List[Dict[str, Any]]
handler: Callable
class MCPServer:
def __init__(self, server_info: Dict[str, Any], capabilities: Dict[str, Any]):
self.server_info = server_info
self.capabilities = capabilities
self.tools: Dict[str, Tool] = {}
self.resources: Dict[str, Resource] = {}
self.prompts: Dict[str, Prompt] = {}
def add_tool(self, tool: Tool):
"""添加工具"""
self.tools[tool.name] = tool
def add_resource(self, scheme: str, handler: Callable):
"""添加资源处理器"""
# 实现资源注册逻辑
pass
def add_prompt(self, prompt: Prompt):
"""添加提示模板"""
self.prompts[prompt.name] = prompt
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理JSON-RPC请求"""
method = request.get('method')
params = request.get('params', {})
request_id = request.get('id')
try:
if method == 'initialize':
return await self._handle_initialize(params, request_id)
elif method == 'tools/list':
return await self._handle_tools_list(params, request_id)
elif method == 'tools/call':
return await self._handle_tools_call(params, request_id)
elif method == 'resources/list':
return await self._handle_resources_list(params, request_id)
elif method == 'resources/read':
return await self._handle_resources_read(params, request_id)
elif method == 'prompts/list':
return await self._handle_prompts_list(params, request_id)
elif method == 'prompts/get':
return await self._handle_prompts_get(params, request_id)
else:
return self._create_error_response(-32601, f'Method not found: {method}', request_id)
except Exception as e:
return self._create_error_response(-32603, f'Internal error: {str(e)}', request_id)
async def _handle_initialize(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
return {
'jsonrpc': '2.0',
'id': request_id,
'result': {
'protocolVersion': '2024-11-05',
'serverInfo': self.server_info,
'capabilities': self.capabilities
}
}
async def _handle_tools_list(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
tools_list = []
for tool in self.tools.values():
tools_list.append({
'name': tool.name,
'description': tool.description,
'inputSchema': tool.input_schema
})
return {
'jsonrpc': '2.0',
'id': request_id,
'result': {
'tools': tools_list
}
}
async def _handle_tools_call(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
tool_name = params.get('name')
arguments = params.get('arguments', {})
if tool_name not in self.tools:
return self._create_error_response(-32002, f'Tool not found: {tool_name}', request_id)
tool = self.tools[tool_name]
try:
result = await tool.handler(arguments)
return {
'jsonrpc': '2.0',
'id': request_id,
'result': {
'content': [
{
'type': 'text',
'text': str(result) if not isinstance(result, dict) else json.dumps(result, indent=2)
}
]
}
}
except Exception as e:
return self._create_error_response(-32004, f'Tool execution failed: {str(e)}', request_id)
def _create_error_response(self, code: int, message: str, request_id: Any) -> Dict[str, Any]:
return {
'jsonrpc': '2.0',
'id': request_id,
'error': {
'code': code,
'message': message
}
}
async def run_stdio(self):
"""运行STDIO传输模式"""
while True:
try:
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
request = json.loads(line.strip())
response = await self.handle_request(request)
print(json.dumps(response), flush=True)
except json.JSONDecodeError:
error_response = self._create_error_response(-32700, 'Parse error', None)
print(json.dumps(error_response), flush=True)
except Exception as e:
error_response = self._create_error_response(-32603, f'Internal error: {str(e)}', None)
print(json.dumps(error_response), flush=True)
# 示例Python MCP Server实现
class FileSystemServer(MCPServer):
def __init__(self, allowed_paths: List[str]):
super().__init__(
server_info={
'name': 'filesystem-server',
'version': '1.0.0'
},
capabilities={
'tools': {},
'resources': {},
'prompts': {}
}
)
self.allowed_paths = [os.path.abspath(path) for path in allowed_paths]
self._setup_tools()
def _setup_tools(self):
# 文件读取工具
self.add_tool(Tool(
name='read_file',
description='Read file contents',
input_schema={
'type': 'object',
'properties': {
'path': {'type': 'string', 'description': 'File path to read'}
},
'required': ['path']
},
handler=self._read_file
))
# 文件写入工具
self.add_tool(Tool(
name='write_file',
description='Write file contents',
input_schema={
'type': 'object',
'properties': {
'path': {'type': 'string', 'description': 'File path to write'},
'content': {'type': 'string', 'description': 'Content to write'}
},
'required': ['path', 'content']
},
handler=self._write_file
))
def _validate_path(self, file_path: str) -> str:
"""验证文件路径安全性"""
abs_path = os.path.abspath(file_path)
# 检查路径是否在允许的目录内
for allowed_path in self.allowed_paths:
if abs_path.startswith(allowed_path):
return abs_path
raise ValueError(f"Access denied: {file_path}")
async def _read_file(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""读取文件内容"""
file_path = self._validate_path(args['path'])
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
'path': file_path,
'content': content,
'size': len(content)
}
except Exception as e:
raise Exception(f"Failed to read file: {str(e)}")
async def _write_file(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""写入文件内容"""
file_path = self._validate_path(args['path'])
content = args['content']
try:
# 确保目录存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return {
'path': file_path,
'size': len(content),
'success': True
}
except Exception as e:
raise Exception(f"Failed to write file: {str(e)}")
# 启动脚本
import os
import asyncio
async def main():
allowed_paths = os.environ.get('ALLOWED_PATHS', '/tmp').split(':')
server = FileSystemServer(allowed_paths)
await server.run_stdio()
if __name__ == '__main__':
asyncio.run(main())
14.3.2 Go实现示例
// go/mcp/server.go
package mcp
import (
"bufio"
"encoding/json"
"fmt"
"os"
"context"
"log"
)
type Server struct {
ServerInfo ServerInfo `json:"serverInfo"`
Capabilities ServerCapabilities `json:"capabilities"`
Tools map[string]Tool `json:"-"`
Resources map[string]Resource `json:"-"`
Prompts map[string]Prompt `json:"-"`
}
type ServerInfo struct {
Name string `json:"name"`
Version string `json:"version"`
}
type ServerCapabilities struct {
Tools map[string]interface{} `json:"tools"`
Resources map[string]interface{} `json:"resources"`
Prompts map[string]interface{} `json:"prompts"`
}
type Tool struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema interface{} `json:"inputSchema"`
Handler ToolHandler `json:"-"`
}
type ToolHandler func(ctx context.Context, args map[string]interface{}) (interface{}, error)
type Request struct {
JsonRPC string `json:"jsonrpc"`
ID interface{} `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params"`
}
type Response struct {
JsonRPC string `json:"jsonrpc"`
ID interface{} `json:"id"`
Result interface{} `json:"result,omitempty"`
Error *Error `json:"error,omitempty"`
}
type Error struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
func NewServer(info ServerInfo, capabilities ServerCapabilities) *Server {
return &Server{
ServerInfo: info,
Capabilities: capabilities,
Tools: make(map[string]Tool),
Resources: make(map[string]Resource),
Prompts: make(map[string]Prompt),
}
}
func (s *Server) AddTool(tool Tool) {
s.Tools[tool.Name] = tool
}
func (s *Server) HandleRequest(ctx context.Context, req Request) Response {
switch req.Method {
case "initialize":
return s.handleInitialize(req)
case "tools/list":
return s.handleToolsList(req)
case "tools/call":
return s.handleToolsCall(ctx, req)
default:
return Response{
JsonRPC: "2.0",
ID: req.ID,
Error: &Error{
Code: -32601,
Message: fmt.Sprintf("Method not found: %s", req.Method),
},
}
}
}
func (s *Server) handleInitialize(req Request) Response {
return Response{
JsonRPC: "2.0",
ID: req.ID,
Result: map[string]interface{}{
"protocolVersion": "2024-11-05",
"serverInfo": s.ServerInfo,
"capabilities": s.Capabilities,
},
}
}
func (s *Server) handleToolsList(req Request) Response {
tools := make([]map[string]interface{}, 0, len(s.Tools))
for _, tool := range s.Tools {
tools = append(tools, map[string]interface{}{
"name": tool.Name,
"description": tool.Description,
"inputSchema": tool.InputSchema,
})
}
return Response{
JsonRPC: "2.0",
ID: req.ID,
Result: map[string]interface{}{
"tools": tools,
},
}
}
func (s *Server) handleToolsCall(ctx context.Context, req Request) Response {
params, ok := req.Params.(map[string]interface{})
if !ok {
return Response{
JsonRPC: "2.0",
ID: req.ID,
Error: &Error{
Code: -32602,
Message: "Invalid params",
},
}
}
toolName, ok := params["name"].(string)
if !ok {
return Response{
JsonRPC: "2.0",
ID: req.ID,
Error: &Error{
Code: -32602,
Message: "Tool name is required",
},
}
}
tool, exists := s.Tools[toolName]
if !exists {
return Response{
JsonRPC: "2.0",
ID: req.ID,
Error: &Error{
Code: -32002,
Message: fmt.Sprintf("Tool not found: %s", toolName),
},
}
}
args, _ := params["arguments"].(map[string]interface{})
if args == nil {
args = make(map[string]interface{})
}
result, err := tool.Handler(ctx, args)
if err != nil {
return Response{
JsonRPC: "2.0",
ID: req.ID,
Error: &Error{
Code: -32004,
Message: fmt.Sprintf("Tool execution failed: %s", err.Error()),
},
}
}
return Response{
JsonRPC: "2.0",
ID: req.ID,
Result: map[string]interface{}{
"content": []map[string]interface{}{
{
"type": "text",
"text": fmt.Sprintf("%v", result),
},
},
},
}
}
func (s *Server) RunStdio(ctx context.Context) error {
scanner := bufio.NewScanner(os.Stdin)
encoder := json.NewEncoder(os.Stdout)
for scanner.Scan() {
line := scanner.Text()
var req Request
if err := json.Unmarshal([]byte(line), &req); err != nil {
errorResp := Response{
JsonRPC: "2.0",
ID: nil,
Error: &Error{
Code: -32700,
Message: "Parse error",
},
}
encoder.Encode(errorResp)
continue
}
resp := s.HandleRequest(ctx, req)
if err := encoder.Encode(resp); err != nil {
log.Printf("Failed to encode response: %v", err)
}
}
return scanner.Err()
}
// 示例Go MCP Server实现
package main
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
)
type FileSystemServer struct {
*mcp.Server
allowedPaths []string
}
func NewFileSystemServer(allowedPaths []string) *FileSystemServer {
server := mcp.NewServer(
mcp.ServerInfo{
Name: "filesystem-server",
Version: "1.0.0",
},
mcp.ServerCapabilities{
Tools: make(map[string]interface{}),
Resources: make(map[string]interface{}),
Prompts: make(map[string]interface{}),
},
)
fs := &FileSystemServer{
Server: server,
allowedPaths: allowedPaths,
}
fs.setupTools()
return fs
}
func (fs *FileSystemServer) setupTools() {
// 文件读取工具
fs.AddTool(mcp.Tool{
Name: "read_file",
Description: "Read file contents",
InputSchema: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"path": map[string]interface{}{
"type": "string",
"description": "File path to read",
},
},
"required": []string{"path"},
},
Handler: fs.readFile,
})
// 文件写入工具
fs.AddTool(mcp.Tool{
Name: "write_file",
Description: "Write file contents",
InputSchema: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"path": map[string]interface{}{
"type": "string",
"description": "File path to write",
},
"content": map[string]interface{}{
"type": "string",
"description": "Content to write",
},
},
"required": []string{"path", "content"},
},
Handler: fs.writeFile,
})
}
func (fs *FileSystemServer) validatePath(path string) (string, error) {
absPath, err := filepath.Abs(path)
if err != nil {
return "", err
}
for _, allowedPath := range fs.allowedPaths {
if strings.HasPrefix(absPath, allowedPath) {
return absPath, nil
}
}
return "", fmt.Errorf("access denied: %s", path)
}
func (fs *FileSystemServer) readFile(ctx context.Context, args map[string]interface{}) (interface{}, error) {
pathArg, ok := args["path"].(string)
if !ok {
return nil, fmt.Errorf("path is required and must be a string")
}
validPath, err := fs.validatePath(pathArg)
if err != nil {
return nil, err
}
content, err := ioutil.ReadFile(validPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %v", err)
}
return map[string]interface{}{
"path": validPath,
"content": string(content),
"size": len(content),
}, nil
}
func (fs *FileSystemServer) writeFile(ctx context.Context, args map[string]interface{}) (interface{}, error) {
pathArg, ok := args["path"].(string)
if !ok {
return nil, fmt.Errorf("path is required and must be a string")
}
contentArg, ok := args["content"].(string)
if !ok {
return nil, fmt.Errorf("content is required and must be a string")
}
validPath, err := fs.validatePath(pathArg)
if err != nil {
return nil, err
}
// 确保目录存在
if err := os.MkdirAll(filepath.Dir(validPath), 0755); err != nil {
return nil, fmt.Errorf("failed to create directory: %v", err)
}
if err := ioutil.WriteFile(validPath, []byte(contentArg), 0644); err != nil {
return nil, fmt.Errorf("failed to write file: %v", err)
}
return map[string]interface{}{
"path": validPath,
"size": len(contentArg),
"success": true,
}, nil
}
func main() {
allowedPaths := []string{"/tmp"}
if paths := os.Getenv("ALLOWED_PATHS"); paths != "" {
allowedPaths = strings.Split(paths, ":")
}
server := NewFileSystemServer(allowedPaths)
ctx := context.Background()
if err := server.RunStdio(ctx); err != nil {
fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
os.Exit(1)
}
}
本章总结
第14章深入探讨了MCP协议的高级特性和扩展开发:
核心知识点
- 协议扩展机制:学会了流式处理、批处理、订阅等高级特性的实现
- 插件系统设计:构建了完整的插件加载、管理和贡献机制
- 多语言实现:提供了Python和Go的完整MCP Server实现示例
- 企业级应用:探索了MCP在大规模、复杂环境中的应用模式
- 未来扩展:展示了MCP协议的扩展潜力和发展方向
实践要点
- 设计灵活的协议扩展机制支持未来需求
- 建立完善的插件系统支持生态发展
- 提供多语言SDK降低开发门槛
- 考虑企业级应用的复杂需求和约束
- 保持与社区同步推进协议演进
通过完整的14章学习,全面掌握了MCP Server开发的完整知识体系,从基础概念到高级特性,从简单示例到复杂项目,从单一语言到多语言支持,为成为MCP专家奠定了坚实基础。