Chapter 14: Advanced Features and Extension Development

Haiyue
45min

Chapter 14: Advanced Features and Extension Development

Learning Objectives

  1. Master advanced features and extension mechanisms of the MCP protocol
  2. Develop custom protocol extensions and plugins
  3. Implement multi-language MCP Server support
  4. Learn integration with other AI frameworks
  5. Explore MCP practices in enterprise-level applications

14.1 Advanced Protocol Features

14.1.1 Custom Protocol Extensions

// src/extensions/CustomProtocolExtension.ts
export interface ProtocolExtension {
  name: string;
  version: string;
  methods: Map<string, MethodHandler>;
  notifications: Map<string, NotificationHandler>;
  capabilities: ExtensionCapabilities;
}

export interface ExtensionCapabilities {
  streaming?: {
    supported: boolean;
    maxChunkSize?: number;
    compressionMethods?: string[];
  };
  
  batch?: {
    supported: boolean;
    maxBatchSize?: number;
  };
  
  subscription?: {
    supported: boolean;
    subscriptionTypes?: string[];
  };
  
  customTypes?: {
    [typeName: string]: JSONSchema;
  };
}

// Streaming extension
export class StreamingExtension implements ProtocolExtension {
  name = 'streaming';
  version = '1.0.0';
  methods = new Map<string, MethodHandler>();
  notifications = new Map<string, NotificationHandler>();
  
  capabilities: ExtensionCapabilities = {
    streaming: {
      supported: true,
      maxChunkSize: 64 * 1024, // 64KB
      compressionMethods: ['gzip', 'deflate']
    }
  };
  
  constructor() {
    this.setupStreamingMethods();
  }
  
  private setupStreamingMethods(): void {
    // Streaming tool call
    this.methods.set('tools/call-stream', async (params, context) => {
      const { name, arguments: args } = params;
      const tool = context.server.getTool(name);
      
      if (!tool || !tool.supportsStreaming) {
        throw new Error(`Tool ${name} does not support streaming`);
      }
      
      // Create streaming response channel
      const streamId = this.generateStreamId();
      context.response.setHeader('X-Stream-ID', streamId);
      
      // Execute tool and stream results
      const stream = tool.executeStream(args);
      
      for await (const chunk of stream) {
        await context.sendNotification('tools/stream-chunk', {
          streamId,
          chunk: await this.compressChunk(chunk),
          metadata: {
            timestamp: Date.now(),
            sequence: chunk.sequence
          }
        });
      }
      
      // Stream end notification
      await context.sendNotification('tools/stream-end', {
        streamId,
        totalChunks: stream.totalChunks,
        finalSize: stream.totalSize
      });
      
      return { streamId, status: 'streaming' };
    });
    
    // Streaming resource reading
    this.methods.set('resources/read-stream', async (params, context) => {
      const { uri, range } = params;
      const resource = await context.server.getResource(uri);
      
      if (!resource.supportsStreaming) {
        throw new Error(`Resource ${uri} does not support streaming`);
      }
      
      const streamId = this.generateStreamId();
      const stream = resource.createReadStream(range);
      
      for await (const chunk of stream) {
        await context.sendNotification('resources/stream-chunk', {
          streamId,
          data: chunk.toString('base64'),
          position: chunk.position,
          size: chunk.length
        });
      }
      
      return { streamId, totalSize: stream.totalSize };
    });
  }
  
  private async compressChunk(chunk: any): Promise<string> {
    // Compress data chunk using gzip
    const zlib = await import('zlib');
    const compressed = zlib.gzipSync(JSON.stringify(chunk));
    return compressed.toString('base64');
  }
  
  private generateStreamId(): string {
    return `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

// Batch processing extension
export class BatchExtension implements ProtocolExtension {
  name = 'batch';
  version = '1.0.0';
  methods = new Map<string, MethodHandler>();
  notifications = new Map<string, NotificationHandler>();
  
  capabilities: ExtensionCapabilities = {
    batch: {
      supported: true,
      maxBatchSize: 100
    }
  };
  
  constructor() {
    this.setupBatchMethods();
  }
  
  private setupBatchMethods(): void {
    // Batch tool call
    this.methods.set('tools/call-batch', async (params, context) => {
      const { operations } = params;
      
      if (operations.length > this.capabilities.batch!.maxBatchSize!) {
        throw new Error(`Batch size exceeds maximum allowed: ${this.capabilities.batch!.maxBatchSize}`);
      }
      
      const results = await Promise.allSettled(
        operations.map(async (op: any, index: number) => {
          try {
            const tool = context.server.getTool(op.name);
            if (!tool) {
              throw new Error(`Tool not found: ${op.name}`);
            }
            
            const result = await tool.execute(op.arguments);
            return {
              index,
              status: 'fulfilled',
              result
            };
          } catch (error) {
            return {
              index,
              status: 'rejected',
              error: error.message
            };
          }
        })
      );
      
      return {
        results,
        summary: {
          total: operations.length,
          successful: results.filter(r => r.status === 'fulfilled').length,
          failed: results.filter(r => r.status === 'rejected').length
        }
      };
    });
    
    // Batch resource reading
    this.methods.set('resources/read-batch', async (params, context) => {
      const { uris } = params;
      
      const results = await Promise.allSettled(
        uris.map(async (uri: string) => {
          const resource = await context.server.getResource(uri);
          return {
            uri,
            content: await resource.read(),
            metadata: resource.getMetadata()
          };
        })
      );
      
      return { results };
    });
  }
}

14.1.2 Subscription and Event System

// src/extensions/SubscriptionExtension.ts
export interface Subscription {
  id: string;
  type: string;
  filter?: SubscriptionFilter;
  clientId: string;
  createdAt: Date;
  isActive: boolean;
}

export interface SubscriptionFilter {
  patterns?: string[];
  attributes?: Record<string, any>;
  conditions?: FilterCondition[];
}

export interface FilterCondition {
  field: string;
  operator: 'eq' | 'ne' | 'gt' | 'lt' | 'contains' | 'matches';
  value: any;
}

export class SubscriptionExtension implements ProtocolExtension {
  name = 'subscription';
  version = '1.0.0';
  methods = new Map<string, MethodHandler>();
  notifications = new Map<string, NotificationHandler>();
  
  private subscriptions = new Map<string, Subscription>();
  private eventEmitter = new EventEmitter();
  
  capabilities: ExtensionCapabilities = {
    subscription: {
      supported: true,
      subscriptionTypes: [
        'resource-changes',
        'tool-executions',
        'server-events',
        'custom-events'
      ]
    }
  };
  
  constructor() {
    this.setupSubscriptionMethods();
    this.setupEventHandling();
  }
  
  private setupSubscriptionMethods(): void {
    // Create subscription
    this.methods.set('subscription/create', async (params, context) => {
      const { type, filter } = params;
      
      const subscription: Subscription = {
        id: this.generateSubscriptionId(),
        type,
        filter,
        clientId: context.clientId,
        createdAt: new Date(),
        isActive: true
      };
      
      this.subscriptions.set(subscription.id, subscription);
      
      // Register event listener
      this.registerEventListener(subscription);
      
      return {
        subscriptionId: subscription.id,
        status: 'active'
      };
    });
    
    // Cancel subscription
    this.methods.set('subscription/cancel', async (params, context) => {
      const { subscriptionId } = params;
      const subscription = this.subscriptions.get(subscriptionId);
      
      if (!subscription || subscription.clientId !== context.clientId) {
        throw new Error('Subscription not found or access denied');
      }
      
      subscription.isActive = false;
      this.unregisterEventListener(subscription);
      
      return { status: 'cancelled' };
    });
    
    // List subscriptions
    this.methods.set('subscription/list', async (params, context) => {
      const clientSubscriptions = Array.from(this.subscriptions.values())
        .filter(sub => sub.clientId === context.clientId && sub.isActive);
      
      return {
        subscriptions: clientSubscriptions.map(sub => ({
          id: sub.id,
          type: sub.type,
          createdAt: sub.createdAt,
          isActive: sub.isActive
        }))
      };
    });
  }
  
  private setupEventHandling(): void {
    // Resource change events
    this.eventEmitter.on('resource-changed', async (event) => {
      const relevantSubs = this.getRelevantSubscriptions('resource-changes', event);
      
      for (const subscription of relevantSubs) {
        await this.sendNotification(subscription, 'resource-changed', {
          uri: event.uri,
          changeType: event.changeType,
          timestamp: event.timestamp,
          metadata: event.metadata
        });
      }
    });
    
    // Tool execution events
    this.eventEmitter.on('tool-executed', async (event) => {
      const relevantSubs = this.getRelevantSubscriptions('tool-executions', event);
      
      for (const subscription of relevantSubs) {
        await this.sendNotification(subscription, 'tool-executed', {
          toolName: event.toolName,
          success: event.success,
          duration: event.duration,
          result: event.result,
          error: event.error
        });
      }
    });
  }
  
  private registerEventListener(subscription: Subscription): void {
    // Register appropriate event listeners based on subscription type
    switch (subscription.type) {
      case 'resource-changes':
        // Watch file system changes
        this.watchResourceChanges(subscription);
        break;
        
      case 'tool-executions':
        // Watch tool executions
        this.watchToolExecutions(subscription);
        break;
        
      case 'server-events':
        // Watch server events
        this.watchServerEvents(subscription);
        break;
        
      default:
        throw new Error(`Unsupported subscription type: ${subscription.type}`);
    }
  }
  
  private async watchResourceChanges(subscription: Subscription): Promise<void> {
    if (!subscription.filter?.patterns) return;
    
    const chokidar = await import('chokidar');
    
    for (const pattern of subscription.filter.patterns) {
      const watcher = chokidar.watch(pattern);
      
      watcher.on('change', (path) => {
        this.eventEmitter.emit('resource-changed', {
          uri: `file://${path}`,
          changeType: 'modified',
          timestamp: new Date(),
          subscriptionId: subscription.id
        });
      });
      
      watcher.on('add', (path) => {
        this.eventEmitter.emit('resource-changed', {
          uri: `file://${path}`,
          changeType: 'created',
          timestamp: new Date(),
          subscriptionId: subscription.id
        });
      });
      
      watcher.on('unlink', (path) => {
        this.eventEmitter.emit('resource-changed', {
          uri: `file://${path}`,
          changeType: 'deleted',
          timestamp: new Date(),
          subscriptionId: subscription.id
        });
      });
    }
  }
  
  private getRelevantSubscriptions(type: string, event: any): Subscription[] {
    return Array.from(this.subscriptions.values())
      .filter(sub => 
        sub.isActive && 
        sub.type === type && 
        this.matchesFilter(sub.filter, event)
      );
  }
  
  private matchesFilter(filter?: SubscriptionFilter, event?: any): boolean {
    if (!filter) return true;
    
    // Check pattern matching
    if (filter.patterns && event.uri) {
      const matches = filter.patterns.some(pattern => {
        const regex = new RegExp(pattern.replace(/\*/g, '.*'));
        return regex.test(event.uri);
      });
      if (!matches) return false;
    }
    
    // Check attribute matching
    if (filter.attributes) {
      for (const [key, value] of Object.entries(filter.attributes)) {
        if (event[key] !== value) return false;
      }
    }
    
    // Check condition matching
    if (filter.conditions) {
      for (const condition of filter.conditions) {
        if (!this.evaluateCondition(condition, event)) {
          return false;
        }
      }
    }
    
    return true;
  }
  
  private evaluateCondition(condition: FilterCondition, event: any): boolean {
    const fieldValue = this.getNestedValue(event, condition.field);
    
    switch (condition.operator) {
      case 'eq':
        return fieldValue === condition.value;
      case 'ne':
        return fieldValue !== condition.value;
      case 'gt':
        return fieldValue > condition.value;
      case 'lt':
        return fieldValue < condition.value;
      case 'contains':
        return String(fieldValue).includes(condition.value);
      case 'matches':
        return new RegExp(condition.value).test(String(fieldValue));
      default:
        return false;
    }
  }
  
  private getNestedValue(obj: any, path: string): any {
    return path.split('.').reduce((current, key) => current?.[key], obj);
  }
  
  private async sendNotification(subscription: Subscription, method: string, params: any): Promise<void> {
    // Actual implementation for sending notifications
    // This should call the client connection manager to send notifications
  }
  
  private generateSubscriptionId(): string {
    return `sub_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

14.2 Plugin System Design

14.2.1 Plugin Architecture

// src/plugins/PluginSystem.ts
export interface Plugin {
  name: string;
  version: string;
  description: string;
  dependencies?: string[];
  
  // Lifecycle methods
  initialize(context: PluginContext): Promise<void>;
  activate(context: PluginContext): Promise<void>;
  deactivate(): Promise<void>;
  dispose(): Promise<void>;
  
  // Extension points
  tools?: ToolContribution[];
  resources?: ResourceContribution[];
  prompts?: PromptContribution[];
  commands?: CommandContribution[];
}

export interface PluginContext {
  server: MCPServer;
  logger: Logger;
  storage: PluginStorage;
  eventBus: EventBus;
  extensionPoints: ExtensionPointRegistry;
}

export interface ToolContribution {
  name: string;
  description: string;
  inputSchema: JSONSchema;
  handler: ToolHandler;
  metadata?: ToolMetadata;
}

export interface ResourceContribution {
  scheme: string;
  handler: ResourceHandler;
  subscriptionSupport?: boolean;
}

export interface PromptContribution {
  name: string;
  description: string;
  template: PromptTemplate;
  parameters?: ParameterDefinition[];
}

export class PluginManager {
  private plugins = new Map<string, PluginInstance>();
  private registry: PluginRegistry;
  private loader: PluginLoader;
  private dependencyResolver: DependencyResolver;
  
  constructor(private context: PluginContext) {
    this.registry = new PluginRegistry();
    this.loader = new PluginLoader();
    this.dependencyResolver = new DependencyResolver();
  }
  
  async loadPlugin(pluginPath: string): Promise<void> {
    try {
      // Load plugin module
      const pluginModule = await this.loader.load(pluginPath);
      const plugin = pluginModule.default || pluginModule;
      
      // Validate plugin interface
      this.validatePlugin(plugin);
      
      // Check dependencies
      await this.resolveDependencies(plugin);
      
      // Create plugin instance
      const instance = new PluginInstance(plugin, this.context);
      
      // Initialize plugin
      await plugin.initialize(this.context);
      
      // Register plugin
      this.plugins.set(plugin.name, instance);
      this.registry.register(plugin);
      
      this.context.logger.info(`Plugin loaded successfully: ${plugin.name}@${plugin.version}`);
    } catch (error) {
      this.context.logger.error(`Failed to load plugin: ${pluginPath}`, error);
      throw error;
    }
  }
  
  async activatePlugin(pluginName: string): Promise<void> {
    const instance = this.plugins.get(pluginName);
    if (!instance) {
      throw new Error(`Plugin not found: ${pluginName}`);
    }
    
    if (instance.isActive) {
      return;
    }
    
    try {
      // Activate dependencies
      if (instance.plugin.dependencies) {
        for (const dep of instance.plugin.dependencies) {
          await this.activatePlugin(dep);
        }
      }
      
      // Activate plugin
      await instance.plugin.activate(this.context);
      
      // Register contributions
      await this.registerContributions(instance);
      
      instance.isActive = true;
      this.context.logger.info(`Plugin activated: ${pluginName}`);
    } catch (error) {
      this.context.logger.error(`Failed to activate plugin: ${pluginName}`, error);
      throw error;
    }
  }
  
  async deactivatePlugin(pluginName: string): Promise<void> {
    const instance = this.plugins.get(pluginName);
    if (!instance || !instance.isActive) {
      return;
    }
    
    try {
      // Unregister contributions
      await this.unregisterContributions(instance);
      
      // Deactivate plugin
      await instance.plugin.deactivate();
      
      instance.isActive = false;
      this.context.logger.info(`Plugin deactivated: ${pluginName}`);
    } catch (error) {
      this.context.logger.error(`Failed to deactivate plugin: ${pluginName}`, error);
      throw error;
    }
  }
  
  private async registerContributions(instance: PluginInstance): Promise<void> {
    const { plugin } = instance;
    
    // Register tools
    if (plugin.tools) {
      for (const tool of plugin.tools) {
        this.context.server.addTool({
          name: tool.name,
          description: tool.description,
          inputSchema: tool.inputSchema
        }, tool.handler);
        
        instance.contributions.tools.push(tool.name);
      }
    }
    
    // Register resources
    if (plugin.resources) {
      for (const resource of plugin.resources) {
        this.context.server.addResourceProvider(resource.scheme, resource.handler);
        instance.contributions.resources.push(resource.scheme);
      }
    }
    
    // Register prompts
    if (plugin.prompts) {
      for (const prompt of plugin.prompts) {
        this.context.server.addPrompt({
          name: prompt.name,
          description: prompt.description,
          arguments: prompt.parameters || []
        }, async (args) => {
          return prompt.template.render(args);
        });
        
        instance.contributions.prompts.push(prompt.name);
      }
    }
  }
  
  private async unregisterContributions(instance: PluginInstance): Promise<void> {
    // Unregister tools
    for (const toolName of instance.contributions.tools) {
      this.context.server.removeTool(toolName);
    }
    
    // Unregister resources
    for (const scheme of instance.contributions.resources) {
      this.context.server.removeResourceProvider(scheme);
    }
    
    // Unregister prompts
    for (const promptName of instance.contributions.prompts) {
      this.context.server.removePrompt(promptName);
    }
    
    // Clear contributions record
    instance.contributions = {
      tools: [],
      resources: [],
      prompts: []
    };
  }
  
  private validatePlugin(plugin: Plugin): void {
    if (!plugin.name || !plugin.version) {
      throw new Error('Plugin must have name and version');
    }
    
    if (typeof plugin.initialize !== 'function') {
      throw new Error('Plugin must implement initialize method');
    }
    
    if (typeof plugin.activate !== 'function') {
      throw new Error('Plugin must implement activate method');
    }
  }
  
  private async resolveDependencies(plugin: Plugin): Promise<void> {
    if (!plugin.dependencies) return;
    
    for (const dep of plugin.dependencies) {
      if (!this.plugins.has(dep)) {
        throw new Error(`Missing dependency: ${dep}`);
      }
    }
  }
  
  getPlugin(name: string): PluginInstance | undefined {
    return this.plugins.get(name);
  }
  
  getActivePlugins(): PluginInstance[] {
    return Array.from(this.plugins.values()).filter(p => p.isActive);
  }
  
  getAllPlugins(): PluginInstance[] {
    return Array.from(this.plugins.values());
  }
}

class PluginInstance {
  public isActive = false;
  public contributions = {
    tools: [] as string[],
    resources: [] as string[],
    prompts: [] as string[]
  };
  
  constructor(
    public plugin: Plugin,
    public context: PluginContext
  ) {}
}

// Example plugin implementation
export class DatabasePlugin implements Plugin {
  name = 'database';
  version = '1.0.0';
  description = 'Database integration plugin';
  
  private dbPool?: any;
  
  async initialize(context: PluginContext): Promise<void> {
    // Initialize database connection pool
    this.dbPool = new DatabasePool({
      host: process.env.DB_HOST,
      database: process.env.DB_NAME,
      username: process.env.DB_USER,
      password: process.env.DB_PASSWORD
    });
    
    context.logger.info('Database plugin initialized');
  }
  
  async activate(context: PluginContext): Promise<void> {
    context.logger.info('Database plugin activated');
  }
  
  async deactivate(): Promise<void> {
    if (this.dbPool) {
      await this.dbPool.close();
    }
  }
  
  async dispose(): Promise<void> {
    await this.deactivate();
  }
  
  get tools(): ToolContribution[] {
    return [
      {
        name: 'db_query',
        description: 'Execute database query',
        inputSchema: {
          type: 'object',
          properties: {
            sql: { type: 'string' },
            params: { type: 'array', items: { type: 'string' } }
          },
          required: ['sql']
        },
        handler: this.executeQuery.bind(this)
      }
    ];
  }
  
  get resources(): ResourceContribution[] {
    return [
      {
        scheme: 'db',
        handler: this.handleDatabaseResource.bind(this),
        subscriptionSupport: true
      }
    ];
  }
  
  private async executeQuery(args: any): Promise<any> {
    const { sql, params = [] } = args;
    const result = await this.dbPool.query(sql, params);
    return {
      rows: result.rows,
      rowCount: result.rowCount
    };
  }
  
  private async handleDatabaseResource(uri: string): Promise<any> {
    // Parse database URI
    const url = new URL(uri);
    const tableName = url.pathname.slice(1);
    
    const result = await this.dbPool.query(`SELECT * FROM ${tableName} LIMIT 100`);
    
    return {
      uri,
      mimeType: 'application/json',
      text: JSON.stringify(result.rows, null, 2),
      metadata: {
        table: tableName,
        rowCount: result.rowCount
      }
    };
  }
}

14.3 Multi-language Support

14.3.1 Python Implementation Example

# python/mcp_server/base.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
import json
import asyncio
import sys
from dataclasses import dataclass

@dataclass
class Tool:
    name: str
    description: str
    input_schema: Dict[str, Any]
    handler: Callable

@dataclass 
class Resource:
    uri: str
    name: Optional[str] = None
    description: Optional[str] = None
    mime_type: Optional[str] = None

@dataclass
class Prompt:
    name: str
    description: str
    arguments: List[Dict[str, Any]]
    handler: Callable

class MCPServer:
    def __init__(self, server_info: Dict[str, Any], capabilities: Dict[str, Any]):
        self.server_info = server_info
        self.capabilities = capabilities
        self.tools: Dict[str, Tool] = {}
        self.resources: Dict[str, Resource] = {}
        self.prompts: Dict[str, Prompt] = {}
        
    def add_tool(self, tool: Tool):
        """Add a tool"""
        self.tools[tool.name] = tool
        
    def add_resource(self, scheme: str, handler: Callable):
        """Add a resource handler"""
        # Implement resource registration logic
        pass
        
    def add_prompt(self, prompt: Prompt):
        """Add a prompt template"""
        self.prompts[prompt.name] = prompt
    
    async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Handle JSON-RPC request"""
        method = request.get('method')
        params = request.get('params', {})
        request_id = request.get('id')
        
        try:
            if method == 'initialize':
                return await self._handle_initialize(params, request_id)
            elif method == 'tools/list':
                return await self._handle_tools_list(params, request_id)
            elif method == 'tools/call':
                return await self._handle_tools_call(params, request_id)
            elif method == 'resources/list':
                return await self._handle_resources_list(params, request_id)
            elif method == 'resources/read':
                return await self._handle_resources_read(params, request_id)
            elif method == 'prompts/list':
                return await self._handle_prompts_list(params, request_id)
            elif method == 'prompts/get':
                return await self._handle_prompts_get(params, request_id)
            else:
                return self._create_error_response(-32601, f'Method not found: {method}', request_id)
                
        except Exception as e:
            return self._create_error_response(-32603, f'Internal error: {str(e)}', request_id)
    
    async def _handle_initialize(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
        return {
            'jsonrpc': '2.0',
            'id': request_id,
            'result': {
                'protocolVersion': '2024-11-05',
                'serverInfo': self.server_info,
                'capabilities': self.capabilities
            }
        }
    
    async def _handle_tools_list(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
        tools_list = []
        for tool in self.tools.values():
            tools_list.append({
                'name': tool.name,
                'description': tool.description,
                'input_schema': tool.input_schema
            })
            
        return {
            'jsonrpc': '2.0',
            'id': request_id,
            'result': {
                'tools': tools_list
            }
        }
    
    async def _handle_tools_call(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
        tool_name = params.get('name')
        arguments = params.get('arguments', {})
        
        if tool_name not in self.tools:
            return self._create_error_response(-32002, f'Tool not found: {tool_name}', request_id)
        
        tool = self.tools[tool_name]
        
        try:
            result = await tool.handler(arguments)
            return {
                'jsonrpc': '2.0',
                'id': request_id,
                'result': {
                    'content': [
                        {
                            'type': 'text',
                            'text': str(result) if not isinstance(result, dict) else json.dumps(result, indent=2)
                        }
                    ]
                }
            }
        except Exception as e:
            return self._create_error_response(-32004, f'Tool execution failed: {str(e)}', request_id)
    
    def _create_error_response(self, code: int, message: str, request_id: Any) -> Dict[str, Any]:
        return {
            'jsonrpc': '2.0',
            'id': request_id,
            'error': {
                'code': code,
                'message': message
            }
        }
    
    async def run_stdio(self):
        """Run STDIO transport mode"""
        while True:
            try:
                line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
                if not line:
                    break
                    
                request = json.loads(line.strip())
                response = await self.handle_request(request)
                
                print(json.dumps(response), flush=True)
                
            except json.JSONDecodeError:
                error_response = self._create_error_response(-32700, 'Parse error', None)
                print(json.dumps(error_response), flush=True)
            except Exception as e:
                error_response = self._create_error_response(-32603, f'Internal error: {str(e)}', None)
                print(json.dumps(error_response), flush=True)

# Example Python MCP Server implementation
class FileSystemServer(MCPServer):
    def __init__(self, allowed_paths: List[str]):
        super().__init__(
            server_info={
                'name': 'filesystem-server',
                'version': '1.0.0'
            },
            capabilities={
                'tools': {},
                'resources': {},
                'prompts': {}
            }
        )
        self.allowed_paths = [os.path.abspath(path) for path in allowed_paths]
        self._setup_tools()
    
    def _setup_tools(self):
        # File read tool
        self.add_tool(Tool(
            name='read_file',
            description='Read file contents',
            input_schema={
                'type': 'object',
                'properties': {
                    'path': {'type': 'string', 'description': 'File path to read'}
                },
                'required': ['path']
            },
            handler=self._read_file
        ))
        
        # File write tool
        self.add_tool(Tool(
            name='write_file',
            description='Write file contents',
            input_schema={
                'type': 'object',
                'properties': {
                    'path': {'type': 'string', 'description': 'File path to write'},
                    'content': {'type': 'string', 'description': 'Content to write'}
                },
                'required': ['path', 'content']
            },
            handler=self._write_file
        ))
    
    def _validate_path(self, file_path: str) -> str:
        """Validate file path security"""
        abs_path = os.path.abspath(file_path)
        
        # Check if path is within allowed directories
        for allowed_path in self.allowed_paths:
            if abs_path.startswith(allowed_path):
                return abs_path
                
        raise ValueError(f"Access denied: {file_path}")
    
    async def _read_file(self, args: Dict[str, Any]) -> Dict[str, Any]:
        """Read file content"""
        file_path = self._validate_path(args['path'])
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            return {
                'path': file_path,
                'content': content,
                'size': len(content)
            }
        except Exception as e:
            raise Exception(f"Failed to read file: {str(e)}")
    
    async def _write_file(self, args: Dict[str, Any]) -> Dict[str, Any]:
        """Write file content"""
        file_path = self._validate_path(args['path'])
        content = args['content']
        
        try:
            # Ensure directory exists
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(content)
            
            return {
                'path': file_path,
                'size': len(content),
                'success': True
            }
        except Exception as e:
            raise Exception(f"Failed to write file: {str(e)}")

# Startup script
import os
import asyncio

async def main():
    allowed_paths = os.environ.get('ALLOWED_PATHS', '/tmp').split(':')
    server = FileSystemServer(allowed_paths)
    await server.run_stdio()

if __name__ == '__main__':
    asyncio.run(main())

14.3.2 Go Implementation Example

// go/mcp/server.go
package mcp

import (
    "bufio"
    "encoding/json"
    "fmt"
    "os"
    "context"
    "log"
)

type Server struct {
    ServerInfo   ServerInfo            `json:"serverInfo"`
    Capabilities ServerCapabilities    `json:"capabilities"`
    Tools        map[string]Tool       `json:"-"`
    Resources    map[string]Resource   `json:"-"`
    Prompts      map[string]Prompt     `json:"-"`
}

type ServerInfo struct {
    Name    string `json:"name"`
    Version string `json:"version"`
}

type ServerCapabilities struct {
    Tools     map[string]interface{} `json:"tools"`
    Resources map[string]interface{} `json:"resources"`
    Prompts   map[string]interface{} `json:"prompts"`
}

type Tool struct {
    Name        string      `json:"name"`
    Description string      `json:"description"`
    InputSchema interface{} `json:"inputSchema"`
    Handler     ToolHandler `json:"-"`
}

type ToolHandler func(ctx context.Context, args map[string]interface{}) (interface{}, error)

type Request struct {
    JsonRPC string      `json:"jsonrpc"`
    ID      interface{} `json:"id"`
    Method  string      `json:"method"`
    Params  interface{} `json:"params"`
}

type Response struct {
    JsonRPC string      `json:"jsonrpc"`
    ID      interface{} `json:"id"`
    Result  interface{} `json:"result,omitempty"`
    Error   *Error      `json:"error,omitempty"`
}

type Error struct {
    Code    int         `json:"code"`
    Message string      `json:"message"`
    Data    interface{} `json:"data,omitempty"`
}

func NewServer(info ServerInfo, capabilities ServerCapabilities) *Server {
    return &Server{
        ServerInfo:   info,
        Capabilities: capabilities,
        Tools:        make(map[string]Tool),
        Resources:    make(map[string]Resource),
        Prompts:      make(map[string]Prompt),
    }
}

func (s *Server) AddTool(tool Tool) {
    s.Tools[tool.Name] = tool
}

func (s *Server) HandleRequest(ctx context.Context, req Request) Response {
    switch req.Method {
    case "initialize":
        return s.handleInitialize(req)
    case "tools/list":
        return s.handleToolsList(req)
    case "tools/call":
        return s.handleToolsCall(ctx, req)
    default:
        return Response{
            JsonRPC: "2.0",
            ID:      req.ID,
            Error: &Error{
                Code:    -32601,
                Message: fmt.Sprintf("Method not found: %s", req.Method),
            },
        }
    }
}

func (s *Server) handleInitialize(req Request) Response {
    return Response{
        JsonRPC: "2.0",
        ID:      req.ID,
        Result: map[string]interface{}{
            "protocolVersion": "2024-11-05",
            "serverInfo":      s.ServerInfo,
            "capabilities":    s.Capabilities,
        },
    }
}

func (s *Server) handleToolsList(req Request) Response {
    tools := make([]map[string]interface{}, 0, len(s.Tools))
    
    for _, tool := range s.Tools {
        tools = append(tools, map[string]interface{}{
            "name":        tool.Name,
            "description": tool.Description,
            "inputSchema": tool.InputSchema,
        })
    }
    
    return Response{
        JsonRPC: "2.0",
        ID:      req.ID,
        Result: map[string]interface{}{
            "tools": tools,
        },
    }
}

func (s *Server) handleToolsCall(ctx context.Context, req Request) Response {
    params, ok := req.Params.(map[string]interface{})
    if !ok {
        return Response{
            JsonRPC: "2.0",
            ID:      req.ID,
            Error: &Error{
                Code:    -32602,
                Message: "Invalid params",
            },
        }
    }
    
    toolName, ok := params["name"].(string)
    if !ok {
        return Response{
            JsonRPC: "2.0",
            ID:      req.ID,
            Error: &Error{
                Code:    -32602,
                Message: "Tool name is required",
            },
        }
    }
    
    tool, exists := s.Tools[toolName]
    if !exists {
        return Response{
            JsonRPC: "2.0",
            ID:      req.ID,
            Error: &Error{
                Code:    -32002,
                Message: fmt.Sprintf("Tool not found: %s", toolName),
            },
        }
    }
    
    args, _ := params["arguments"].(map[string]interface{})
    if args == nil {
        args = make(map[string]interface{})
    }
    
    result, err := tool.Handler(ctx, args)
    if err != nil {
        return Response{
            JsonRPC: "2.0",
            ID:      req.ID,
            Error: &Error{
                Code:    -32004,
                Message: fmt.Sprintf("Tool execution failed: %s", err.Error()),
            },
        }
    }
    
    return Response{
        JsonRPC: "2.0",
        ID:      req.ID,
        Result: map[string]interface{}{
            "content": []map[string]interface{}{
                {
                    "type": "text",
                    "text": fmt.Sprintf("%v", result),
                },
            },
        },
    }
}

func (s *Server) RunStdio(ctx context.Context) error {
    scanner := bufio.NewScanner(os.Stdin)
    encoder := json.NewEncoder(os.Stdout)
    
    for scanner.Scan() {
        line := scanner.Text()
        
        var req Request
        if err := json.Unmarshal([]byte(line), &req); err != nil {
            errorResp := Response{
                JsonRPC: "2.0",
                ID:      nil,
                Error: &Error{
                    Code:    -32700,
                    Message: "Parse error",
                },
            }
            encoder.Encode(errorResp)
            continue
        }
        
        resp := s.HandleRequest(ctx, req)
        if err := encoder.Encode(resp); err != nil {
            log.Printf("Failed to encode response: %v", err)
        }
    }
    
    return scanner.Err()
}

// Example Go MCP Server implementation
package main

import (
    "context"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "strings"
)

type FileSystemServer struct {
    *mcp.Server
    allowedPaths []string
}

func NewFileSystemServer(allowedPaths []string) *FileSystemServer {
    server := mcp.NewServer(
        mcp.ServerInfo{
            Name:    "filesystem-server",
            Version: "1.0.0",
        },
        mcp.ServerCapabilities{
            Tools:     make(map[string]interface{}),
            Resources: make(map[string]interface{}),
            Prompts:   make(map[string]interface{}),
        },
    )
    
    fs := &FileSystemServer{
        Server:       server,
        allowedPaths: allowedPaths,
    }
    
    fs.setupTools()
    return fs
}

func (fs *FileSystemServer) setupTools() {
    // File read tool
    fs.AddTool(mcp.Tool{
        Name:        "read_file",
        Description: "Read file contents",
        InputSchema: map[string]interface{}{
            "type": "object",
            "properties": map[string]interface{}{
                "path": map[string]interface{}{
                    "type":        "string",
                    "description": "File path to read",
                },
            },
            "required": []string{"path"},
        },
        Handler: fs.readFile,
    })
    
    // File write tool
    fs.AddTool(mcp.Tool{
        Name:        "write_file",
        Description: "Write file contents",
        InputSchema: map[string]interface{}{
            "type": "object",
            "properties": map[string]interface{}{
                "path": map[string]interface{}{
                    "type":        "string",
                    "description": "File path to write",
                },
                "content": map[string]interface{}{
                    "type":        "string",
                    "description": "Content to write",
                },
            },
            "required": []string{"path", "content"},
        },
        Handler: fs.writeFile,
    })
}

func (fs *FileSystemServer) validatePath(path string) (string, error) {
    absPath, err := filepath.Abs(path)
    if err != nil {
        return "", err
    }
    
    for _, allowedPath := range fs.allowedPaths {
        if strings.HasPrefix(absPath, allowedPath) {
            return absPath, nil
        }
    }
    
    return "", fmt.Errorf("access denied: %s", path)
}

func (fs *FileSystemServer) readFile(ctx context.Context, args map[string]interface{}) (interface{}, error) {
    pathArg, ok := args["path"].(string)
    if !ok {
        return nil, fmt.Errorf("path is required and must be a string")
    }
    
    validPath, err := fs.validatePath(pathArg)
    if err != nil {
        return nil, err
    }
    
    content, err := ioutil.ReadFile(validPath)
    if err != nil {
        return nil, fmt.Errorf("failed to read file: %v", err)
    }
    
    return map[string]interface{}{
        "path":    validPath,
        "content": string(content),
        "size":    len(content),
    }, nil
}

func (fs *FileSystemServer) writeFile(ctx context.Context, args map[string]interface{}) (interface{}, error) {
    pathArg, ok := args["path"].(string)
    if !ok {
        return nil, fmt.Errorf("path is required and must be a string")
    }
    
    contentArg, ok := args["content"].(string)
    if !ok {
        return nil, fmt.Errorf("content is required and must be a string")
    }
    
    validPath, err := fs.validatePath(pathArg)
    if err != nil {
        return nil, err
    }
    
    // Ensure directory exists
    if err := os.MkdirAll(filepath.Dir(validPath), 0755); err != nil {
        return nil, fmt.Errorf("failed to create directory: %v", err)
    }
    
    if err := ioutil.WriteFile(validPath, []byte(contentArg), 0644); err != nil {
        return nil, fmt.Errorf("failed to write file: %v", err)
    }
    
    return map[string]interface{}{
        "path":    validPath,
        "size":    len(contentArg),
        "success": true,
    }, nil
}

func main() {
    allowedPaths := []string{"/tmp"}
    if paths := os.Getenv("ALLOWED_PATHS"); paths != "" {
        allowedPaths = strings.Split(paths, ":")
    }
    
    server := NewFileSystemServer(allowedPaths)
    
    ctx := context.Background()
    if err := server.RunStdio(ctx); err != nil {
        fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
        os.Exit(1)
    }
}

Chapter Summary

Chapter 14 deeply explores the advanced features and extension development of the MCP protocol:

Core Knowledge Points

  1. Protocol Extension Mechanisms: Learned the implementation of advanced features such as streaming, batch processing, and subscriptions
  2. Plugin System Design: Built a complete plugin loading, management, and contribution mechanism
  3. Multi-language Implementation: Provided Python and Go complete MCP Server implementation examples
  4. Enterprise-level Applications: Explored MCP application patterns in large-scale, complex environments
  5. Future Extensions: Showcased the extension potential and development directions of the MCP protocol

Practical Points

  • Design flexible protocol extension mechanisms to support future requirements
  • Establish a comprehensive plugin system to support ecosystem development
  • Provide multi-language SDKs to lower development barriers
  • Consider the complex requirements and constraints of enterprise-level applications
  • Stay aligned with the community to drive protocol evolution

Through the complete Chapter 14, a comprehensive knowledge system for MCP Server development has been mastered, from basic concepts to advanced features, from simple examples to complex projects, and from single-language to multi-language support, laying a solid foundation for becoming an MCP expert.