第11章:实战项目开发
2025/9/1大约 16 分钟
第11章:实战项目开发
学习目标
- 开发文件系统管理MCP Server
- 创建数据库操作MCP Server
- 实现API网关和代理Server
- 构建内容管理和搜索Server
- 开发自定义业务逻辑Server
11.1 项目一:文件系统管理Server
11.1.1 项目需求分析
// 文件系统MCP Server功能需求:
// 1. 文件/目录的增删改查操作
// 2. 文件内容读取和写入
// 3. 文件搜索和过滤
// 4. 文件权限管理
// 5. 文件监控和变更通知
// 6. 文件元数据管理
// src/projects/filesystem/FileSystemServer.ts
import path from 'path';
import fs from 'fs/promises';
import chokidar from 'chokidar';
export class FileSystemMCPServer extends SecureMCPServer {
private watchers = new Map<string, chokidar.FSWatcher>();
private allowedPaths: string[] = [];
constructor(config: ServerConfig, allowedPaths: string[]) {
super(config);
this.allowedPaths = allowedPaths.map(p => path.resolve(p));
this.setupFileSystemTools();
this.setupFileSystemResources();
}
private setupFileSystemTools(): void {
// 文件读取工具
this.addTool({
name: 'fs_read_file',
description: '读取文件内容',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '文件路径' },
encoding: { type: 'string', enum: ['utf8', 'base64'], default: 'utf8' }
},
required: ['path']
}
}, this.handleReadFile.bind(this));
// 文件写入工具
this.addTool({
name: 'fs_write_file',
description: '写入文件内容',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '文件路径' },
content: { type: 'string', description: '文件内容' },
encoding: { type: 'string', enum: ['utf8', 'base64'], default: 'utf8' },
createDir: { type: 'boolean', default: false, description: '是否创建目录' }
},
required: ['path', 'content']
}
}, this.handleWriteFile.bind(this));
// 目录列出工具
this.addTool({
name: 'fs_list_directory',
description: '列出目录内容',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '目录路径' },
recursive: { type: 'boolean', default: false },
includeStats: { type: 'boolean', default: false }
},
required: ['path']
}
}, this.handleListDirectory.bind(this));
// 文件搜索工具
this.addTool({
name: 'fs_search_files',
description: '搜索文件',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '搜索路径' },
pattern: { type: 'string', description: '搜索模式(glob)' },
content: { type: 'string', description: '内容搜索' },
maxResults: { type: 'number', default: 100 }
},
required: ['path']
}
}, this.handleSearchFiles.bind(this));
// 文件操作工具
this.addTool({
name: 'fs_file_operation',
description: '文件操作(复制、移动、删除)',
inputSchema: {
type: 'object',
properties: {
operation: { type: 'string', enum: ['copy', 'move', 'delete', 'mkdir'] },
source: { type: 'string', description: '源路径' },
target: { type: 'string', description: '目标路径' }
},
required: ['operation', 'source']
}
}, this.handleFileOperation.bind(this));
}
private setupFileSystemResources(): void {
// 文件内容资源
this.addResource('file', async (uri: string) => {
const filePath = this.parseFileUri(uri);
this.validatePath(filePath);
const content = await fs.readFile(filePath, 'utf8');
const stats = await fs.stat(filePath);
return {
uri,
mimeType: this.getMimeType(filePath),
text: content,
metadata: {
size: stats.size,
modified: stats.mtime,
created: stats.ctime,
type: stats.isDirectory() ? 'directory' : 'file'
}
};
});
// 目录资源
this.addResource('directory', async (uri: string) => {
const dirPath = this.parseFileUri(uri);
this.validatePath(dirPath);
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const items = entries.map(entry => ({
name: entry.name,
type: entry.isDirectory() ? 'directory' : 'file',
path: path.join(dirPath, entry.name)
}));
return {
uri,
mimeType: 'application/json',
text: JSON.stringify(items, null, 2),
metadata: {
type: 'directory',
itemCount: items.length
}
};
});
}
// 工具实现方法
private async handleReadFile(args: any): Promise<any> {
const { path: filePath, encoding = 'utf8' } = args;
this.validatePath(filePath);
try {
const content = await fs.readFile(filePath, encoding);
const stats = await fs.stat(filePath);
return {
content,
metadata: {
size: stats.size,
modified: stats.mtime,
encoding
}
};
} catch (error) {
throw new ToolExecutionException(
'fs_read_file',
`Failed to read file: ${error.message}`,
{ filePath }
);
}
}
private async handleWriteFile(args: any): Promise<any> {
const { path: filePath, content, encoding = 'utf8', createDir = false } = args;
this.validatePath(filePath);
try {
if (createDir) {
await fs.mkdir(path.dirname(filePath), { recursive: true });
}
await fs.writeFile(filePath, content, encoding);
const stats = await fs.stat(filePath);
return {
success: true,
metadata: {
size: stats.size,
modified: stats.mtime
}
};
} catch (error) {
throw new ToolExecutionException(
'fs_write_file',
`Failed to write file: ${error.message}`,
{ filePath }
);
}
}
private async handleListDirectory(args: any): Promise<any> {
const { path: dirPath, recursive = false, includeStats = false } = args;
this.validatePath(dirPath);
try {
const result = await this.listDirectoryRecursive(dirPath, recursive, includeStats);
return { items: result };
} catch (error) {
throw new ToolExecutionException(
'fs_list_directory',
`Failed to list directory: ${error.message}`,
{ dirPath }
);
}
}
private async handleSearchFiles(args: any): Promise<any> {
const { path: searchPath, pattern, content, maxResults = 100 } = args;
this.validatePath(searchPath);
try {
const results = await this.searchFiles(searchPath, pattern, content, maxResults);
return { results };
} catch (error) {
throw new ToolExecutionException(
'fs_search_files',
`Failed to search files: ${error.message}`,
{ searchPath, pattern }
);
}
}
private async handleFileOperation(args: any): Promise<any> {
const { operation, source, target } = args;
this.validatePath(source);
if (target) this.validatePath(target);
try {
switch (operation) {
case 'copy':
await fs.copyFile(source, target!);
break;
case 'move':
await fs.rename(source, target!);
break;
case 'delete':
const stats = await fs.stat(source);
if (stats.isDirectory()) {
await fs.rmdir(source, { recursive: true });
} else {
await fs.unlink(source);
}
break;
case 'mkdir':
await fs.mkdir(source, { recursive: true });
break;
default:
throw new Error(`Unknown operation: ${operation}`);
}
return { success: true, operation, source, target };
} catch (error) {
throw new ToolExecutionException(
'fs_file_operation',
`Failed to perform operation ${operation}: ${error.message}`,
{ operation, source, target }
);
}
}
// 辅助方法
private validatePath(filePath: string): void {
const resolvedPath = path.resolve(filePath);
const allowed = this.allowedPaths.some(allowedPath =>
resolvedPath.startsWith(allowedPath)
);
if (!allowed) {
throw new Error(`Access denied: Path ${filePath} is not in allowed paths`);
}
}
private parseFileUri(uri: string): string {
if (uri.startsWith('file://')) {
return uri.substring(7);
}
return uri;
}
private getMimeType(filePath: string): string {
const ext = path.extname(filePath).toLowerCase();
const mimeTypes: Record<string, string> = {
'.txt': 'text/plain',
'.js': 'application/javascript',
'.ts': 'application/typescript',
'.json': 'application/json',
'.html': 'text/html',
'.css': 'text/css',
'.md': 'text/markdown',
'.py': 'text/x-python',
'.java': 'text/x-java-source',
};
return mimeTypes[ext] || 'application/octet-stream';
}
private async listDirectoryRecursive(
dirPath: string,
recursive: boolean,
includeStats: boolean
): Promise<any[]> {
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const result: any[] = [];
for (const entry of entries) {
const fullPath = path.join(dirPath, entry.name);
const item: any = {
name: entry.name,
path: fullPath,
type: entry.isDirectory() ? 'directory' : 'file'
};
if (includeStats) {
const stats = await fs.stat(fullPath);
item.stats = {
size: stats.size,
modified: stats.mtime,
created: stats.ctime,
mode: stats.mode
};
}
result.push(item);
if (recursive && entry.isDirectory()) {
const children = await this.listDirectoryRecursive(fullPath, true, includeStats);
result.push(...children);
}
}
return result;
}
private async searchFiles(
searchPath: string,
pattern?: string,
content?: string,
maxResults: number = 100
): Promise<any[]> {
const glob = require('glob');
const results: any[] = [];
// 使用glob模式搜索文件
if (pattern) {
const globPattern = path.join(searchPath, pattern);
const files = glob.sync(globPattern, { nodir: true });
for (const file of files.slice(0, maxResults)) {
const stats = await fs.stat(file);
const item = {
path: file,
name: path.basename(file),
size: stats.size,
modified: stats.mtime,
type: 'file'
};
// 如果有内容搜索要求
if (content) {
try {
const fileContent = await fs.readFile(file, 'utf8');
if (fileContent.includes(content)) {
item['matches'] = this.findContentMatches(fileContent, content);
results.push(item);
}
} catch (error) {
// 忽略无法读取的文件
}
} else {
results.push(item);
}
if (results.length >= maxResults) break;
}
}
return results;
}
private findContentMatches(content: string, searchTerm: string): any[] {
const lines = content.split('\n');
const matches: any[] = [];
lines.forEach((line, index) => {
if (line.includes(searchTerm)) {
matches.push({
line: index + 1,
content: line.trim(),
position: line.indexOf(searchTerm)
});
}
});
return matches.slice(0, 10); // 限制匹配结果
}
}
11.2 项目二:数据库操作Server
11.2.1 数据库MCP Server实现
// src/projects/database/DatabaseServer.ts
import { Pool } from 'pg'; // PostgreSQL
import mysql from 'mysql2/promise'; // MySQL
import sqlite3 from 'sqlite3';
import { Database } from 'sqlite3';
export interface DatabaseConfig {
type: 'postgresql' | 'mysql' | 'sqlite';
connection: {
host?: string;
port?: number;
database: string;
username?: string;
password?: string;
filename?: string; // for SQLite
};
pool?: {
min: number;
max: number;
};
}
export class DatabaseMCPServer extends SecureMCPServer {
private dbConfig: DatabaseConfig;
private connection: any;
private queryCache: Map<string, { result: any; timestamp: number }> = new Map();
private cacheTimeout = 5 * 60 * 1000; // 5分钟
constructor(config: ServerConfig, dbConfig: DatabaseConfig) {
super(config);
this.dbConfig = dbConfig;
this.setupDatabaseTools();
this.setupDatabaseResources();
}
async start(): Promise<void> {
await super.start();
await this.connectToDatabase();
}
async stop(reason?: string): Promise<void> {
await this.disconnectFromDatabase();
await super.stop(reason);
}
private async connectToDatabase(): Promise<void> {
try {
switch (this.dbConfig.type) {
case 'postgresql':
this.connection = new Pool({
host: this.dbConfig.connection.host,
port: this.dbConfig.connection.port,
database: this.dbConfig.connection.database,
user: this.dbConfig.connection.username,
password: this.dbConfig.connection.password,
min: this.dbConfig.pool?.min || 1,
max: this.dbConfig.pool?.max || 10,
});
break;
case 'mysql':
this.connection = mysql.createPool({
host: this.dbConfig.connection.host,
port: this.dbConfig.connection.port,
database: this.dbConfig.connection.database,
user: this.dbConfig.connection.username,
password: this.dbConfig.connection.password,
connectionLimit: this.dbConfig.pool?.max || 10,
});
break;
case 'sqlite':
this.connection = new sqlite3.Database(
this.dbConfig.connection.filename!,
sqlite3.OPEN_READWRITE | sqlite3.OPEN_CREATE
);
break;
default:
throw new Error(`Unsupported database type: ${this.dbConfig.type}`);
}
this.logger.info('Database connected successfully', { type: this.dbConfig.type });
} catch (error) {
this.logger.error('Database connection failed', error);
throw error;
}
}
private async disconnectFromDatabase(): Promise<void> {
if (this.connection) {
try {
switch (this.dbConfig.type) {
case 'postgresql':
case 'mysql':
await this.connection.end();
break;
case 'sqlite':
this.connection.close();
break;
}
this.logger.info('Database disconnected');
} catch (error) {
this.logger.error('Database disconnection error', error);
}
}
}
private setupDatabaseTools(): void {
// 查询工具
this.addTool({
name: 'db_query',
description: '执行SQL查询',
inputSchema: {
type: 'object',
properties: {
sql: { type: 'string', description: 'SQL查询语句' },
params: { type: 'array', description: '查询参数', items: { type: 'string' } },
useCache: { type: 'boolean', default: true, description: '是否使用缓存' },
limit: { type: 'number', default: 100, description: '结果限制' }
},
required: ['sql']
}
}, this.handleQuery.bind(this));
// 执行工具(INSERT, UPDATE, DELETE)
this.addTool({
name: 'db_execute',
description: '执行SQL命令',
inputSchema: {
type: 'object',
properties: {
sql: { type: 'string', description: 'SQL命令' },
params: { type: 'array', description: '命令参数', items: { type: 'string' } },
transaction: { type: 'boolean', default: false, description: '是否使用事务' }
},
required: ['sql']
}
}, this.handleExecute.bind(this));
// 表信息工具
this.addTool({
name: 'db_table_info',
description: '获取表信息',
inputSchema: {
type: 'object',
properties: {
table: { type: 'string', description: '表名' },
includeColumns: { type: 'boolean', default: true },
includeIndexes: { type: 'boolean', default: false }
},
required: ['table']
}
}, this.handleTableInfo.bind(this));
// 数据库架构工具
this.addTool({
name: 'db_schema',
description: '获取数据库架构',
inputSchema: {
type: 'object',
properties: {
includeViews: { type: 'boolean', default: false },
includeIndexes: { type: 'boolean', default: false }
}
}
}, this.handleSchema.bind(this));
}
private setupDatabaseResources(): void {
// 表数据资源
this.addResource('table', async (uri: string) => {
const tableName = this.parseTableUri(uri);
const sql = this.buildSelectQuery(tableName);
const result = await this.executeQuery(sql);
return {
uri,
mimeType: 'application/json',
text: JSON.stringify(result.rows, null, 2),
metadata: {
table: tableName,
rowCount: result.rowCount,
columns: result.columns
}
};
});
// 查询结果资源
this.addResource('query', async (uri: string) => {
const queryId = this.parseQueryUri(uri);
const cachedResult = this.queryCache.get(queryId);
if (!cachedResult) {
throw new Error(`Query result not found: ${queryId}`);
}
return {
uri,
mimeType: 'application/json',
text: JSON.stringify(cachedResult.result, null, 2),
metadata: {
cached: true,
timestamp: cachedResult.timestamp
}
};
});
}
// 工具实现方法
private async handleQuery(args: any): Promise<any> {
const { sql, params = [], useCache = true, limit = 100 } = args;
try {
// 检查缓存
if (useCache) {
const cacheKey = this.generateCacheKey(sql, params);
const cached = this.queryCache.get(cacheKey);
if (cached && Date.now() - cached.timestamp < this.cacheTimeout) {
this.logger.debug('Query cache hit', { sql: sql.substring(0, 100) });
return { ...cached.result, fromCache: true };
}
}
// 添加LIMIT子句(如果不存在)
const limitedSql = this.addLimitClause(sql, limit);
const result = await this.executeQuery(limitedSql, params);
// 缓存结果
if (useCache) {
const cacheKey = this.generateCacheKey(sql, params);
this.queryCache.set(cacheKey, {
result,
timestamp: Date.now()
});
// 清理过期缓存
this.cleanExpiredCache();
}
return result;
} catch (error) {
throw new ToolExecutionException(
'db_query',
`Query execution failed: ${error.message}`,
{ sql: sql.substring(0, 200), params }
);
}
}
private async handleExecute(args: any): Promise<any> {
const { sql, params = [], transaction = false } = args;
try {
let result;
if (transaction) {
result = await this.executeTransaction(async (client) => {
return await this.executeCommand(sql, params, client);
});
} else {
result = await this.executeCommand(sql, params);
}
// 清除相关缓存
this.invalidateQueryCache();
return result;
} catch (error) {
throw new ToolExecutionException(
'db_execute',
`Command execution failed: ${error.message}`,
{ sql: sql.substring(0, 200), params }
);
}
}
private async handleTableInfo(args: any): Promise<any> {
const { table, includeColumns = true, includeIndexes = false } = args;
try {
const info: any = { name: table };
if (includeColumns) {
info.columns = await this.getTableColumns(table);
}
if (includeIndexes) {
info.indexes = await this.getTableIndexes(table);
}
info.rowCount = await this.getTableRowCount(table);
return info;
} catch (error) {
throw new ToolExecutionException(
'db_table_info',
`Failed to get table info: ${error.message}`,
{ table }
);
}
}
private async handleSchema(args: any): Promise<any> {
const { includeViews = false, includeIndexes = false } = args;
try {
const schema: any = {
tables: await this.getAllTables()
};
if (includeViews) {
schema.views = await this.getAllViews();
}
if (includeIndexes) {
schema.indexes = await this.getAllIndexes();
}
return schema;
} catch (error) {
throw new ToolExecutionException(
'db_schema',
`Failed to get schema: ${error.message}`,
args
);
}
}
// 数据库操作方法
private async executeQuery(sql: string, params: any[] = []): Promise<any> {
switch (this.dbConfig.type) {
case 'postgresql':
const pgResult = await this.connection.query(sql, params);
return {
rows: pgResult.rows,
rowCount: pgResult.rowCount,
columns: pgResult.fields?.map((f: any) => f.name) || []
};
case 'mysql':
const [mysqlRows, mysqlFields] = await this.connection.execute(sql, params);
return {
rows: mysqlRows,
rowCount: Array.isArray(mysqlRows) ? mysqlRows.length : 0,
columns: mysqlFields?.map((f: any) => f.name) || []
};
case 'sqlite':
return new Promise((resolve, reject) => {
const stmt = this.connection.prepare(sql);
stmt.all(params, (err: any, rows: any[]) => {
if (err) {
reject(err);
} else {
resolve({
rows,
rowCount: rows.length,
columns: rows.length > 0 ? Object.keys(rows[0]) : []
});
}
});
});
default:
throw new Error(`Unsupported database type: ${this.dbConfig.type}`);
}
}
private async executeCommand(sql: string, params: any[] = [], client?: any): Promise<any> {
const connection = client || this.connection;
switch (this.dbConfig.type) {
case 'postgresql':
const pgResult = await connection.query(sql, params);
return {
success: true,
affectedRows: pgResult.rowCount,
insertId: pgResult.insertId
};
case 'mysql':
const [mysqlResult] = await connection.execute(sql, params);
return {
success: true,
affectedRows: (mysqlResult as any).affectedRows,
insertId: (mysqlResult as any).insertId
};
case 'sqlite':
return new Promise((resolve, reject) => {
this.connection.run(sql, params, function(err: any) {
if (err) {
reject(err);
} else {
resolve({
success: true,
affectedRows: this.changes,
insertId: this.lastID
});
}
});
});
default:
throw new Error(`Unsupported database type: ${this.dbConfig.type}`);
}
}
private async executeTransaction(callback: (client: any) => Promise<any>): Promise<any> {
switch (this.dbConfig.type) {
case 'postgresql':
const client = await this.connection.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
case 'mysql':
const connection = await this.connection.getConnection();
try {
await connection.beginTransaction();
const result = await callback(connection);
await connection.commit();
return result;
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
case 'sqlite':
// SQLite事务处理
return new Promise((resolve, reject) => {
this.connection.serialize(() => {
this.connection.run('BEGIN TRANSACTION');
callback(this.connection)
.then(result => {
this.connection.run('COMMIT', (err: any) => {
if (err) reject(err);
else resolve(result);
});
})
.catch(error => {
this.connection.run('ROLLBACK', () => {
reject(error);
});
});
});
});
default:
throw new Error(`Unsupported database type: ${this.dbConfig.type}`);
}
}
// 辅助方法
private parseTableUri(uri: string): string {
return uri.replace('table://', '');
}
private parseQueryUri(uri: string): string {
return uri.replace('query://', '');
}
private buildSelectQuery(tableName: string): string {
return `SELECT * FROM ${tableName} LIMIT 100`;
}
private addLimitClause(sql: string, limit: number): string {
if (sql.toLowerCase().includes('limit')) {
return sql;
}
return `${sql} LIMIT ${limit}`;
}
private generateCacheKey(sql: string, params: any[]): string {
const crypto = require('crypto');
const content = sql + JSON.stringify(params);
return crypto.createHash('sha256').update(content).digest('hex');
}
private cleanExpiredCache(): void {
const now = Date.now();
for (const [key, value] of this.queryCache.entries()) {
if (now - value.timestamp > this.cacheTimeout) {
this.queryCache.delete(key);
}
}
}
private invalidateQueryCache(): void {
this.queryCache.clear();
this.logger.debug('Query cache invalidated');
}
private async getAllTables(): Promise<string[]> {
let sql: string;
switch (this.dbConfig.type) {
case 'postgresql':
sql = `SELECT tablename FROM pg_tables WHERE schemaname = 'public'`;
break;
case 'mysql':
sql = `SHOW TABLES`;
break;
case 'sqlite':
sql = `SELECT name FROM sqlite_master WHERE type='table'`;
break;
default:
throw new Error(`Unsupported database type: ${this.dbConfig.type}`);
}
const result = await this.executeQuery(sql);
return result.rows.map((row: any) => Object.values(row)[0]);
}
private async getTableColumns(tableName: string): Promise<any[]> {
let sql: string;
switch (this.dbConfig.type) {
case 'postgresql':
sql = `SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = $1`;
return (await this.executeQuery(sql, [tableName])).rows;
case 'mysql':
sql = `DESCRIBE ${tableName}`;
return (await this.executeQuery(sql)).rows;
case 'sqlite':
sql = `PRAGMA table_info(${tableName})`;
return (await this.executeQuery(sql)).rows;
default:
throw new Error(`Unsupported database type: ${this.dbConfig.type}`);
}
}
private async getTableRowCount(tableName: string): Promise<number> {
const sql = `SELECT COUNT(*) as count FROM ${tableName}`;
const result = await this.executeQuery(sql);
return result.rows[0].count;
}
private async getTableIndexes(tableName: string): Promise<any[]> {
// 实现获取表索引的逻辑
return [];
}
private async getAllViews(): Promise<string[]> {
// 实现获取所有视图的逻辑
return [];
}
private async getAllIndexes(): Promise<any[]> {
// 实现获取所有索引的逻辑
return [];
}
}
11.3 项目三:API网关和代理Server
11.3.1 API网关实现
// src/projects/gateway/ApiGatewayServer.ts
export interface ServiceConfig {
name: string;
baseUrl: string;
timeout: number;
retries: number;
circuitBreaker: {
enabled: boolean;
threshold: number;
timeout: number;
};
}
export interface RouteConfig {
path: string;
method: string;
service: string;
targetPath?: string;
auth: boolean;
rateLimit?: {
requests: number;
window: number;
};
}
export class ApiGatewayMCPServer extends SecureMCPServer {
private services = new Map<string, ServiceConfig>();
private routes = new Map<string, RouteConfig>();
private circuitBreakers = new Map<string, CircuitBreaker>();
constructor(config: ServerConfig) {
super(config);
this.setupGatewayTools();
this.setupGatewayResources();
}
private setupGatewayTools(): void {
// API代理工具
this.addTool({
name: 'gateway_proxy',
description: 'API代理请求',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '请求路径' },
method: { type: 'string', enum: ['GET', 'POST', 'PUT', 'DELETE'] },
headers: { type: 'object', description: '请求头' },
body: { type: 'string', description: '请求体' },
params: { type: 'object', description: '查询参数' }
},
required: ['path', 'method']
}
}, this.handleProxy.bind(this));
// 服务注册工具
this.addTool({
name: 'gateway_register_service',
description: '注册服务',
inputSchema: {
type: 'object',
properties: {
name: { type: 'string' },
baseUrl: { type: 'string' },
timeout: { type: 'number', default: 30000 },
retries: { type: 'number', default: 3 }
},
required: ['name', 'baseUrl']
}
}, this.handleRegisterService.bind(this));
// 路由配置工具
this.addTool({
name: 'gateway_add_route',
description: '添加路由',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string' },
method: { type: 'string' },
service: { type: 'string' },
targetPath: { type: 'string' },
auth: { type: 'boolean', default: true }
},
required: ['path', 'method', 'service']
}
}, this.handleAddRoute.bind(this));
}
private setupGatewayResources(): void {
// 服务状态资源
this.addResource('services', async () => {
const services = Array.from(this.services.entries()).map(([name, config]) => ({
name,
...config,
status: this.getServiceStatus(name)
}));
return {
uri: 'services://',
mimeType: 'application/json',
text: JSON.stringify(services, null, 2)
};
});
// 路由配置资源
this.addResource('routes', async () => {
const routes = Array.from(this.routes.values());
return {
uri: 'routes://',
mimeType: 'application/json',
text: JSON.stringify(routes, null, 2)
};
});
}
private async handleProxy(args: any): Promise<any> {
const { path, method, headers = {}, body, params = {} } = args;
// 查找匹配的路由
const route = this.findMatchingRoute(path, method);
if (!route) {
throw new Error(`No route found for ${method} ${path}`);
}
// 获取服务配置
const service = this.services.get(route.service);
if (!service) {
throw new Error(`Service not found: ${route.service}`);
}
// 检查熔断器
const circuitBreaker = this.getCircuitBreaker(route.service);
if (!circuitBreaker.allowRequest()) {
throw new Error(`Circuit breaker open for service: ${route.service}`);
}
try {
// 构建目标URL
const targetPath = route.targetPath || path;
const url = new URL(targetPath, service.baseUrl);
// 添加查询参数
Object.entries(params).forEach(([key, value]) => {
url.searchParams.set(key, String(value));
});
// 发送请求
const response = await this.makeRequest(url.toString(), {
method,
headers,
body: body ? JSON.stringify(body) : undefined,
timeout: service.timeout,
});
circuitBreaker.recordSuccess();
return {
status: response.status,
headers: response.headers,
body: await response.text(),
service: route.service,
targetUrl: url.toString()
};
} catch (error) {
circuitBreaker.recordFailure();
throw new ToolExecutionException(
'gateway_proxy',
`Proxy request failed: ${error.message}`,
{ path, method, service: route.service }
);
}
}
private async handleRegisterService(args: any): Promise<any> {
const { name, baseUrl, timeout = 30000, retries = 3 } = args;
const serviceConfig: ServiceConfig = {
name,
baseUrl,
timeout,
retries,
circuitBreaker: {
enabled: true,
threshold: 5,
timeout: 60000
}
};
this.services.set(name, serviceConfig);
this.circuitBreakers.set(name, new CircuitBreaker(serviceConfig.circuitBreaker));
this.logger.info('Service registered', { name, baseUrl });
return { success: true, service: name };
}
private async handleAddRoute(args: any): Promise<any> {
const { path, method, service, targetPath, auth = true } = args;
// 验证服务存在
if (!this.services.has(service)) {
throw new Error(`Service not found: ${service}`);
}
const routeKey = `${method.toUpperCase()}:${path}`;
const routeConfig: RouteConfig = {
path,
method: method.toUpperCase(),
service,
targetPath,
auth
};
this.routes.set(routeKey, routeConfig);
this.logger.info('Route added', { path, method, service });
return { success: true, route: routeKey };
}
private findMatchingRoute(path: string, method: string): RouteConfig | null {
const exactKey = `${method.toUpperCase()}:${path}`;
const exactMatch = this.routes.get(exactKey);
if (exactMatch) {
return exactMatch;
}
// 支持路径参数匹配(简化版)
for (const [key, route] of this.routes) {
if (this.matchesRoute(route, path, method)) {
return route;
}
}
return null;
}
private matchesRoute(route: RouteConfig, path: string, method: string): boolean {
if (route.method !== method.toUpperCase()) {
return false;
}
// 简化的路径匹配逻辑
const routePattern = route.path.replace(/:\w+/g, '([^/]+)');
const regex = new RegExp(`^${routePattern}$`);
return regex.test(path);
}
private getCircuitBreaker(serviceName: string): CircuitBreaker {
let circuitBreaker = this.circuitBreakers.get(serviceName);
if (!circuitBreaker) {
const service = this.services.get(serviceName);
circuitBreaker = new CircuitBreaker(service?.circuitBreaker || {
enabled: true,
threshold: 5,
timeout: 60000
});
this.circuitBreakers.set(serviceName, circuitBreaker);
}
return circuitBreaker;
}
private async makeRequest(url: string, options: any): Promise<Response> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), options.timeout || 30000);
try {
const response = await fetch(url, {
...options,
signal: controller.signal
});
return response;
} finally {
clearTimeout(timeoutId);
}
}
private getServiceStatus(serviceName: string): string {
const circuitBreaker = this.circuitBreakers.get(serviceName);
if (!circuitBreaker) {
return 'unknown';
}
if (circuitBreaker.isOpen()) {
return 'circuit_open';
} else if (circuitBreaker.isHalfOpen()) {
return 'half_open';
} else {
return 'healthy';
}
}
}
// 熔断器实现
class CircuitBreaker {
private failures = 0;
private lastFailTime = 0;
private state: 'closed' | 'open' | 'half_open' = 'closed';
constructor(private config: { enabled: boolean; threshold: number; timeout: number }) {}
allowRequest(): boolean {
if (!this.config.enabled) {
return true;
}
const now = Date.now();
if (this.state === 'open') {
if (now - this.lastFailTime > this.config.timeout) {
this.state = 'half_open';
return true;
}
return false;
}
return true;
}
recordSuccess(): void {
this.failures = 0;
this.state = 'closed';
}
recordFailure(): void {
this.failures++;
this.lastFailTime = Date.now();
if (this.failures >= this.config.threshold) {
this.state = 'open';
}
}
isOpen(): boolean {
return this.state === 'open';
}
isHalfOpen(): boolean {
return this.state === 'half_open';
}
}
11.4 项目四:内容管理和搜索Server
11.4.1 内容管理系统实现
// src/projects/cms/ContentManagementServer.ts
export interface ContentItem {
id: string;
title: string;
content: string;
type: 'article' | 'page' | 'document' | 'media';
status: 'draft' | 'published' | 'archived';
author: string;
tags: string[];
metadata: Record<string, any>;
createdAt: Date;
updatedAt: Date;
}
export interface SearchIndex {
id: string;
title: string;
content: string;
tokens: string[];
type: string;
tags: string[];
score?: number;
}
export class ContentManagementMCPServer extends SecureMCPServer {
private contents = new Map<string, ContentItem>();
private searchIndex = new Map<string, SearchIndex>();
private contentStorage: string; // 内容存储目录
constructor(config: ServerConfig, contentStorage: string) {
super(config);
this.contentStorage = contentStorage;
this.setupCMSTools();
this.setupCMSResources();
}
async start(): Promise<void> {
await super.start();
await this.loadContentFromStorage();
this.buildSearchIndex();
}
private setupCMSTools(): void {
// 创建内容工具
this.addTool({
name: 'cms_create_content',
description: '创建内容',
inputSchema: {
type: 'object',
properties: {
title: { type: 'string' },
content: { type: 'string' },
type: { type: 'string', enum: ['article', 'page', 'document', 'media'] },
tags: { type: 'array', items: { type: 'string' } },
metadata: { type: 'object' }
},
required: ['title', 'content', 'type']
}
}, this.handleCreateContent.bind(this));
// 更新内容工具
this.addTool({
name: 'cms_update_content',
description: '更新内容',
inputSchema: {
type: 'object',
properties: {
id: { type: 'string' },
title: { type: 'string' },
content: { type: 'string' },
tags: { type: 'array', items: { type: 'string' } },
status: { type: 'string', enum: ['draft', 'published', 'archived'] },
metadata: { type: 'object' }
},
required: ['id']
}
}, this.handleUpdateContent.bind(this));
// 删除内容工具
this.addTool({
name: 'cms_delete_content',
description: '删除内容',
inputSchema: {
type: 'object',
properties: {
id: { type: 'string' }
},
required: ['id']
}
}, this.handleDeleteContent.bind(this));
// 搜索内容工具
this.addTool({
name: 'cms_search_content',
description: '搜索内容',
inputSchema: {
type: 'object',
properties: {
query: { type: 'string' },
type: { type: 'string' },
tags: { type: 'array', items: { type: 'string' } },
status: { type: 'string' },
limit: { type: 'number', default: 20 }
},
required: ['query']
}
}, this.handleSearchContent.bind(this));
// 获取内容列表工具
this.addTool({
name: 'cms_list_content',
description: '获取内容列表',
inputSchema: {
type: 'object',
properties: {
type: { type: 'string' },
status: { type: 'string' },
author: { type: 'string' },
limit: { type: 'number', default: 20 },
offset: { type: 'number', default: 0 }
}
}
}, this.handleListContent.bind(this));
}
private setupCMSResources(): void {
// 内容资源
this.addResource('content', async (uri: string) => {
const contentId = this.parseContentUri(uri);
const content = this.contents.get(contentId);
if (!content) {
throw new Error(`Content not found: ${contentId}`);
}
return {
uri,
mimeType: 'application/json',
text: JSON.stringify(content, null, 2),
metadata: {
id: content.id,
type: content.type,
status: content.status,
lastModified: content.updatedAt
}
};
});
// 内容列表资源
this.addResource('content-list', async (uri: string) => {
const params = this.parseListUri(uri);
const contents = this.filterContents(params);
return {
uri,
mimeType: 'application/json',
text: JSON.stringify(contents, null, 2),
metadata: {
total: contents.length,
filters: params
}
};
});
}
// 工具实现方法
private async handleCreateContent(args: any): Promise<any> {
const { title, content, type, tags = [], metadata = {} } = args;
const contentItem: ContentItem = {
id: this.generateContentId(),
title,
content,
type,
status: 'draft',
author: 'system', // 实际应从认证信息获取
tags,
metadata,
createdAt: new Date(),
updatedAt: new Date()
};
this.contents.set(contentItem.id, contentItem);
await this.saveContentToStorage(contentItem);
this.addToSearchIndex(contentItem);
this.logger.info('Content created', { id: contentItem.id, title, type });
return {
success: true,
id: contentItem.id,
content: contentItem
};
}
private async handleUpdateContent(args: any): Promise<any> {
const { id, ...updates } = args;
const existingContent = this.contents.get(id);
if (!existingContent) {
throw new Error(`Content not found: ${id}`);
}
const updatedContent: ContentItem = {
...existingContent,
...updates,
updatedAt: new Date()
};
this.contents.set(id, updatedContent);
await this.saveContentToStorage(updatedContent);
this.updateSearchIndex(updatedContent);
this.logger.info('Content updated', { id, updates: Object.keys(updates) });
return {
success: true,
content: updatedContent
};
}
private async handleDeleteContent(args: any): Promise<any> {
const { id } = args;
const content = this.contents.get(id);
if (!content) {
throw new Error(`Content not found: ${id}`);
}
this.contents.delete(id);
await this.deleteContentFromStorage(id);
this.removeFromSearchIndex(id);
this.logger.info('Content deleted', { id, title: content.title });
return {
success: true,
deletedContent: content
};
}
private async handleSearchContent(args: any): Promise<any> {
const { query, type, tags, status, limit = 20 } = args;
const results = this.searchContents({
query,
type,
tags,
status,
limit
});
return {
results,
query,
total: results.length
};
}
private async handleListContent(args: any): Promise<any> {
const { type, status, author, limit = 20, offset = 0 } = args;
const contents = this.filterContents({ type, status, author });
const paginatedContents = contents.slice(offset, offset + limit);
return {
contents: paginatedContents,
total: contents.length,
offset,
limit
};
}
// 搜索相关方法
private buildSearchIndex(): void {
for (const content of this.contents.values()) {
this.addToSearchIndex(content);
}
this.logger.info('Search index built', { totalItems: this.searchIndex.size });
}
private addToSearchIndex(content: ContentItem): void {
const tokens = this.tokenize(content.title + ' ' + content.content);
const indexItem: SearchIndex = {
id: content.id,
title: content.title,
content: content.content.substring(0, 500), // 摘要
tokens,
type: content.type,
tags: content.tags
};
this.searchIndex.set(content.id, indexItem);
}
private updateSearchIndex(content: ContentItem): void {
this.addToSearchIndex(content);
}
private removeFromSearchIndex(contentId: string): void {
this.searchIndex.delete(contentId);
}
private searchContents(criteria: any): ContentItem[] {
const { query, type, tags, status, limit } = criteria;
let candidates = Array.from(this.contents.values());
// 基本过滤
if (type) {
candidates = candidates.filter(c => c.type === type);
}
if (status) {
candidates = candidates.filter(c => c.status === status);
}
if (tags && tags.length > 0) {
candidates = candidates.filter(c =>
tags.some((tag: string) => c.tags.includes(tag))
);
}
// 文本搜索
if (query) {
const queryTokens = this.tokenize(query.toLowerCase());
const searchResults = this.performTextSearch(queryTokens, candidates);
candidates = searchResults.sort((a, b) => (b.score || 0) - (a.score || 0));
}
return candidates.slice(0, limit);
}
private performTextSearch(queryTokens: string[], candidates: ContentItem[]): any[] {
return candidates.map(content => {
const indexItem = this.searchIndex.get(content.id);
if (!indexItem) {
return { ...content, score: 0 };
}
let score = 0;
// 标题匹配权重更高
const titleTokens = this.tokenize(content.title.toLowerCase());
for (const token of queryTokens) {
if (titleTokens.includes(token)) {
score += 10;
}
if (indexItem.tokens.includes(token)) {
score += 1;
}
}
// 标签匹配
for (const token of queryTokens) {
if (content.tags.some(tag => tag.toLowerCase().includes(token))) {
score += 5;
}
}
return { ...content, score };
}).filter(item => item.score > 0);
}
private tokenize(text: string): string[] {
return text
.toLowerCase()
.replace(/[^\w\s]/g, ' ')
.split(/\s+/)
.filter(token => token.length > 2);
}
// 存储相关方法
private async loadContentFromStorage(): Promise<void> {
try {
const contentDir = path.join(this.contentStorage, 'contents');
const files = await fs.readdir(contentDir);
for (const file of files) {
if (file.endsWith('.json')) {
const filePath = path.join(contentDir, file);
const data = await fs.readFile(filePath, 'utf8');
const content: ContentItem = JSON.parse(data);
this.contents.set(content.id, content);
}
}
this.logger.info('Contents loaded from storage', { count: this.contents.size });
} catch (error) {
this.logger.warn('Failed to load contents from storage', { error });
}
}
private async saveContentToStorage(content: ContentItem): Promise<void> {
try {
const contentDir = path.join(this.contentStorage, 'contents');
await fs.mkdir(contentDir, { recursive: true });
const filePath = path.join(contentDir, `${content.id}.json`);
await fs.writeFile(filePath, JSON.stringify(content, null, 2));
} catch (error) {
this.logger.error('Failed to save content to storage', { error, contentId: content.id });
}
}
private async deleteContentFromStorage(contentId: string): Promise<void> {
try {
const filePath = path.join(this.contentStorage, 'contents', `${contentId}.json`);
await fs.unlink(filePath);
} catch (error) {
this.logger.warn('Failed to delete content from storage', { error, contentId });
}
}
// 辅助方法
private generateContentId(): string {
return `content_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private parseContentUri(uri: string): string {
return uri.replace('content://', '');
}
private parseListUri(uri: string): any {
const params = new URLSearchParams(uri.replace('content-list://', ''));
return Object.fromEntries(params.entries());
}
private filterContents(criteria: any): ContentItem[] {
const { type, status, author } = criteria;
return Array.from(this.contents.values()).filter(content => {
if (type && content.type !== type) return false;
if (status && content.status !== status) return false;
if (author && content.author !== author) return false;
return true;
});
}
}
本章总结
第11章通过四个完整的实战项目,展示了MCP Server在不同领域的应用:
实战项目总结
- 文件系统管理Server:实现了完整的文件操作、搜索和监控功能
- 数据库操作Server:支持多种数据库的查询、事务和缓存机制
- API网关代理Server:提供了服务注册、路由配置和熔断保护
- 内容管理搜索Server:建立了内容管理和全文搜索系统
核心实践要点
- 合理设计工具和资源的接口
- 实现完善的错误处理和日志记录
- 提供灵活的配置和扩展机制
- 注重安全性和权限控制
- 优化性能和用户体验
通过这些实战项目的学习,掌握了MCP Server在实际业务场景中的开发技巧和最佳实践,为开发复杂的生产级应用奠定了坚实基础。