第10章:性能优化和扩展性
2025/9/1大约 22 分钟
第10章:性能优化和扩展性
学习目标
- 分析和优化Server性能瓶颈
- 实现缓存和资源池管理
- 掌握并发处理和异步编程
- 学习内存管理和垃圾回收优化
- 设计可扩展的架构模式
10.1 性能分析和监控
10.1.1 性能指标体系
// src/performance/MetricsCollector.ts
export interface PerformanceMetrics {
// 请求处理性能
requestMetrics: {
totalRequests: number;
requestsPerSecond: number;
averageResponseTime: number;
p50ResponseTime: number;
p95ResponseTime: number;
p99ResponseTime: number;
errorRate: number;
};
// 系统资源使用
systemMetrics: {
cpuUsage: number;
memoryUsage: {
heapUsed: number;
heapTotal: number;
rss: number;
external: number;
};
eventLoopDelay: number;
eventLoopUtilization: number;
};
// 业务指标
businessMetrics: {
toolExecutions: number;
resourceAccesses: number;
promptGenerations: number;
cacheHitRate: number;
};
// 连接指标
connectionMetrics: {
activeConnections: number;
totalConnections: number;
connectionRate: number;
avgConnectionDuration: number;
};
}
export class MetricsCollector {
private requestTimes: number[] = [];
private requestCount = 0;
private errorCount = 0;
private startTime = Date.now();
private lastCollectionTime = Date.now();
private toolExecutionCount = 0;
private resourceAccessCount = 0;
private promptGenerationCount = 0;
private cacheHits = 0;
private cacheTotal = 0;
private connectionCount = 0;
private connectionDurations: number[] = [];
private maxSampleSize = 10000; // 最大样本数量
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger.child({}, 'metrics');
}
recordRequest(duration: number, success: boolean): void {
this.requestCount++;
if (!success) this.errorCount++;
this.requestTimes.push(duration);
// 限制样本数量
if (this.requestTimes.length > this.maxSampleSize) {
this.requestTimes.shift();
}
}
recordToolExecution(): void {
this.toolExecutionCount++;
}
recordResourceAccess(): void {
this.resourceAccessCount++;
}
recordPromptGeneration(): void {
this.promptGenerationCount++;
}
recordCacheAccess(hit: boolean): void {
this.cacheTotal++;
if (hit) this.cacheHits++;
}
recordConnection(duration?: number): void {
this.connectionCount++;
if (duration !== undefined) {
this.connectionDurations.push(duration);
// 限制样本数量
if (this.connectionDurations.length > this.maxSampleSize) {
this.connectionDurations.shift();
}
}
}
getMetrics(): PerformanceMetrics {
const now = Date.now();
const timePeriod = (now - this.lastCollectionTime) / 1000; // 秒
const totalTime = (now - this.startTime) / 1000;
// 计算请求时间统计
const sortedTimes = [...this.requestTimes].sort((a, b) => a - b);
const p50 = this.getPercentile(sortedTimes, 0.5);
const p95 = this.getPercentile(sortedTimes, 0.95);
const p99 = this.getPercentile(sortedTimes, 0.99);
const avgResponseTime = sortedTimes.length > 0 ?
sortedTimes.reduce((sum, time) => sum + time, 0) / sortedTimes.length : 0;
// 获取系统指标
const memUsage = process.memoryUsage();
const cpuUsage = process.cpuUsage();
const metrics: PerformanceMetrics = {
requestMetrics: {
totalRequests: this.requestCount,
requestsPerSecond: timePeriod > 0 ? this.requestCount / totalTime : 0,
averageResponseTime: avgResponseTime,
p50ResponseTime: p50,
p95ResponseTime: p95,
p99ResponseTime: p99,
errorRate: this.requestCount > 0 ? this.errorCount / this.requestCount : 0,
},
systemMetrics: {
cpuUsage: (cpuUsage.user + cpuUsage.system) / 1000000, // 转换为毫秒
memoryUsage: {
heapUsed: memUsage.heapUsed,
heapTotal: memUsage.heapTotal,
rss: memUsage.rss,
external: memUsage.external,
},
eventLoopDelay: 0, // 将在后面异步获取
eventLoopUtilization: 0, // 将在后面异步获取
},
businessMetrics: {
toolExecutions: this.toolExecutionCount,
resourceAccesses: this.resourceAccessCount,
promptGenerations: this.promptGenerationCount,
cacheHitRate: this.cacheTotal > 0 ? this.cacheHits / this.cacheTotal : 0,
},
connectionMetrics: {
activeConnections: this.connectionCount, // 这里应该是当前活跃连接数
totalConnections: this.connectionCount,
connectionRate: totalTime > 0 ? this.connectionCount / totalTime : 0,
avgConnectionDuration: this.connectionDurations.length > 0 ?
this.connectionDurations.reduce((sum, dur) => sum + dur, 0) / this.connectionDurations.length : 0,
},
};
this.lastCollectionTime = now;
return metrics;
}
async getDetailedMetrics(): Promise<PerformanceMetrics> {
const metrics = this.getMetrics();
// 异步获取事件循环指标
try {
const eventLoopDelay = await this.measureEventLoopDelay();
metrics.systemMetrics.eventLoopDelay = eventLoopDelay;
} catch (error) {
this.logger.warn('Failed to measure event loop delay', { error });
}
return metrics;
}
private getPercentile(sortedArray: number[], percentile: number): number {
if (sortedArray.length === 0) return 0;
const index = Math.ceil(sortedArray.length * percentile) - 1;
return sortedArray[Math.max(0, index)];
}
private measureEventLoopDelay(): Promise<number> {
const start = process.hrtime.bigint();
return new Promise(resolve => {
setImmediate(() => {
const end = process.hrtime.bigint();
const delay = Number(end - start) / 1000000; // 转换为毫秒
resolve(delay);
});
});
}
reset(): void {
this.requestTimes.length = 0;
this.requestCount = 0;
this.errorCount = 0;
this.toolExecutionCount = 0;
this.resourceAccessCount = 0;
this.promptGenerationCount = 0;
this.cacheHits = 0;
this.cacheTotal = 0;
this.connectionCount = 0;
this.connectionDurations.length = 0;
this.startTime = Date.now();
this.lastCollectionTime = Date.now();
}
}
10.1.2 性能分析器
// src/performance/Profiler.ts
export interface ProfileResult {
name: string;
duration: number;
memoryDelta: number;
children?: ProfileResult[];
metadata?: Record<string, any>;
}
export class Profiler {
private activeProfiles = new Map<string, ProfileSession>();
private completedProfiles: ProfileResult[] = [];
private maxProfiles = 1000;
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger.child({}, 'profiler');
}
startProfile(name: string, metadata?: Record<string, any>): string {
const sessionId = `${name}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const session: ProfileSession = {
id: sessionId,
name,
startTime: performance.now(),
startMemory: process.memoryUsage().heapUsed,
metadata,
children: [],
};
this.activeProfiles.set(sessionId, session);
return sessionId;
}
endProfile(sessionId: string): ProfileResult | null {
const session = this.activeProfiles.get(sessionId);
if (!session) {
this.logger.warn('Profile session not found', { sessionId });
return null;
}
const endTime = performance.now();
const endMemory = process.memoryUsage().heapUsed;
const result: ProfileResult = {
name: session.name,
duration: endTime - session.startTime,
memoryDelta: endMemory - session.startMemory,
children: session.children,
metadata: session.metadata,
};
this.activeProfiles.delete(sessionId);
this.completedProfiles.push(result);
// 限制保存的profile数量
if (this.completedProfiles.length > this.maxProfiles) {
this.completedProfiles.shift();
}
this.logger.debug('Profile completed', {
name: session.name,
duration: result.duration,
memoryDelta: result.memoryDelta,
});
return result;
}
getProfiles(limit?: number): ProfileResult[] {
const profiles = [...this.completedProfiles].reverse();
return limit ? profiles.slice(0, limit) : profiles;
}
getProfilesByName(name: string, limit?: number): ProfileResult[] {
const filtered = this.completedProfiles.filter(p => p.name === name).reverse();
return limit ? filtered.slice(0, limit) : filtered;
}
getSlowProfiles(thresholdMs: number = 1000): ProfileResult[] {
return this.completedProfiles
.filter(p => p.duration > thresholdMs)
.sort((a, b) => b.duration - a.duration);
}
getMemoryIntensiveProfiles(thresholdBytes: number = 1024 * 1024): ProfileResult[] {
return this.completedProfiles
.filter(p => Math.abs(p.memoryDelta) > thresholdBytes)
.sort((a, b) => Math.abs(b.memoryDelta) - Math.abs(a.memoryDelta));
}
clear(): void {
this.completedProfiles.length = 0;
this.activeProfiles.clear();
}
}
interface ProfileSession {
id: string;
name: string;
startTime: number;
startMemory: number;
metadata?: Record<string, any>;
children: ProfileResult[];
}
// Profile装饰器
export function profile(name?: string) {
return function (target: any, propertyKey: string, descriptor: PropertyDescriptor) {
const originalMethod = descriptor.value;
const profileName = name || `${target.constructor.name}.${propertyKey}`;
descriptor.value = async function (...args: any[]) {
const profiler = (this as any).profiler as Profiler;
if (profiler) {
const sessionId = profiler.startProfile(profileName, {
args: args.length,
className: target.constructor.name,
methodName: propertyKey,
});
try {
const result = await originalMethod.apply(this, args);
profiler.endProfile(sessionId);
return result;
} catch (error) {
profiler.endProfile(sessionId);
throw error;
}
} else {
return originalMethod.apply(this, args);
}
};
return descriptor;
};
}
10.2 缓存系统
10.2.1 多层缓存架构
// src/cache/CacheManager.ts
export interface CacheEntry<T> {
value: T;
timestamp: number;
ttl: number;
accessCount: number;
lastAccess: number;
size?: number;
}
export interface CacheStrategy {
name: string;
shouldEvict(entry: CacheEntry<any>, now: number): boolean;
getPriority(entry: CacheEntry<any>, now: number): number;
}
export interface CacheConfig {
maxSize: number; // 最大条目数
maxMemory?: number; // 最大内存使用(字节)
defaultTTL: number; // 默认TTL(毫秒)
strategy: CacheStrategy;
enableStatistics: boolean;
}
export interface CacheStatistics {
hits: number;
misses: number;
hitRate: number;
size: number;
memoryUsage: number;
evictions: number;
}
// LRU缓存策略
export class LRUCacheStrategy implements CacheStrategy {
name = 'LRU';
shouldEvict(entry: CacheEntry<any>, now: number): boolean {
return (now - entry.timestamp) > entry.ttl;
}
getPriority(entry: CacheEntry<any>, now: number): number {
return now - entry.lastAccess; // 越久未访问优先级越高(越应该被清除)
}
}
// LFU缓存策略
export class LFUCacheStrategy implements CacheStrategy {
name = 'LFU';
shouldEvict(entry: CacheEntry<any>, now: number): boolean {
return (now - entry.timestamp) > entry.ttl;
}
getPriority(entry: CacheEntry<any>, now: number): number {
return -entry.accessCount; // 访问次数越少优先级越高(越应该被清除)
}
}
export class CacheManager<T = any> {
private cache = new Map<string, CacheEntry<T>>();
private config: CacheConfig;
private statistics: CacheStatistics;
private logger: Logger;
constructor(config: CacheConfig, logger: Logger) {
this.config = config;
this.logger = logger.child({}, 'cache');
this.statistics = {
hits: 0,
misses: 0,
hitRate: 0,
size: 0,
memoryUsage: 0,
evictions: 0,
};
// 定期清理过期条目
setInterval(() => {
this.cleanup();
}, 60000); // 每分钟清理一次
}
async get(key: string): Promise<T | null> {
const entry = this.cache.get(key);
if (!entry) {
this.statistics.misses++;
this.updateHitRate();
return null;
}
const now = Date.now();
// 检查是否过期
if (this.config.strategy.shouldEvict(entry, now)) {
this.cache.delete(key);
this.statistics.misses++;
this.statistics.evictions++;
this.updateHitRate();
return null;
}
// 更新访问信息
entry.accessCount++;
entry.lastAccess = now;
this.statistics.hits++;
this.updateHitRate();
return entry.value;
}
async set(key: string, value: T, ttl?: number): Promise<void> {
const now = Date.now();
const entryTTL = ttl || this.config.defaultTTL;
const size = this.estimateSize(value);
const entry: CacheEntry<T> = {
value,
timestamp: now,
ttl: entryTTL,
accessCount: 1,
lastAccess: now,
size,
};
// 检查是否需要清理空间
await this.ensureCapacity(size);
this.cache.set(key, entry);
this.updateStatistics();
this.logger.debug('Cache entry set', { key, ttl: entryTTL, size });
}
async delete(key: string): Promise<boolean> {
const deleted = this.cache.delete(key);
if (deleted) {
this.updateStatistics();
this.logger.debug('Cache entry deleted', { key });
}
return deleted;
}
async clear(): Promise<void> {
this.cache.clear();
this.statistics.size = 0;
this.statistics.memoryUsage = 0;
this.logger.info('Cache cleared');
}
async has(key: string): Promise<boolean> {
const entry = this.cache.get(key);
if (!entry) return false;
const now = Date.now();
if (this.config.strategy.shouldEvict(entry, now)) {
this.cache.delete(key);
this.statistics.evictions++;
return false;
}
return true;
}
getStatistics(): CacheStatistics {
return { ...this.statistics };
}
private async ensureCapacity(newEntrySize: number): Promise<void> {
// 检查条目数量限制
if (this.cache.size >= this.config.maxSize) {
await this.evictEntries(1);
}
// 检查内存限制
if (this.config.maxMemory) {
while (this.statistics.memoryUsage + newEntrySize > this.config.maxMemory) {
const evicted = await this.evictEntries(1);
if (evicted === 0) break; // 无法释放更多空间
}
}
}
private async evictEntries(count: number): Promise<number> {
if (this.cache.size === 0) return 0;
const now = Date.now();
const entries = Array.from(this.cache.entries());
// 按策略优先级排序
entries.sort(([, a], [, b]) => {
return this.config.strategy.getPriority(b, now) - this.config.strategy.getPriority(a, now);
});
let evicted = 0;
for (let i = 0; i < Math.min(count, entries.length); i++) {
const [key] = entries[i];
this.cache.delete(key);
evicted++;
this.statistics.evictions++;
}
this.updateStatistics();
if (evicted > 0) {
this.logger.debug('Cache entries evicted', { count: evicted, strategy: this.config.strategy.name });
}
return evicted;
}
private cleanup(): void {
const now = Date.now();
let cleaned = 0;
for (const [key, entry] of this.cache) {
if (this.config.strategy.shouldEvict(entry, now)) {
this.cache.delete(key);
cleaned++;
this.statistics.evictions++;
}
}
if (cleaned > 0) {
this.updateStatistics();
this.logger.debug('Cache cleanup completed', { cleaned });
}
}
private updateStatistics(): void {
this.statistics.size = this.cache.size;
this.statistics.memoryUsage = this.calculateMemoryUsage();
}
private updateHitRate(): void {
const total = this.statistics.hits + this.statistics.misses;
this.statistics.hitRate = total > 0 ? this.statistics.hits / total : 0;
}
private calculateMemoryUsage(): number {
let total = 0;
for (const entry of this.cache.values()) {
total += entry.size || 0;
}
return total;
}
private estimateSize(value: T): number {
// 简单的大小估计,实际应用中可能需要更精确的计算
try {
return JSON.stringify(value).length * 2; // UTF-16编码估计
} catch {
return 1024; // 默认大小
}
}
}
// 分层缓存管理器
export class TieredCacheManager {
private l1Cache: CacheManager; // 内存缓存
private l2Cache?: CacheManager; // 更大的内存缓存
private logger: Logger;
constructor(
l1Config: CacheConfig,
l2Config?: CacheConfig,
logger?: Logger
) {
this.logger = logger || new Logger('cache');
this.l1Cache = new CacheManager(l1Config, this.logger);
if (l2Config) {
this.l2Cache = new CacheManager(l2Config, this.logger);
}
}
async get<T>(key: string): Promise<T | null> {
// 首先检查L1缓存
let value = await this.l1Cache.get<T>(key);
if (value !== null) {
return value;
}
// 检查L2缓存
if (this.l2Cache) {
value = await this.l2Cache.get<T>(key);
if (value !== null) {
// 将数据提升到L1缓存
await this.l1Cache.set(key, value);
return value;
}
}
return null;
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
// 同时设置L1和L2缓存
await this.l1Cache.set(key, value, ttl);
if (this.l2Cache) {
await this.l2Cache.set(key, value, ttl);
}
}
async delete(key: string): Promise<void> {
await this.l1Cache.delete(key);
if (this.l2Cache) {
await this.l2Cache.delete(key);
}
}
async clear(): Promise<void> {
await this.l1Cache.clear();
if (this.l2Cache) {
await this.l2Cache.clear();
}
}
getStatistics(): { l1: CacheStatistics; l2?: CacheStatistics } {
return {
l1: this.l1Cache.getStatistics(),
l2: this.l2Cache?.getStatistics(),
};
}
}
10.2.2 应用级缓存实现
// src/cache/ApplicationCache.ts
export class ToolResultCache {
private cache: TieredCacheManager;
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger.child({}, 'tool-cache');
// L1: 小而快的缓存,用于最近访问的结果
const l1Config: CacheConfig = {
maxSize: 100,
defaultTTL: 5 * 60 * 1000, // 5分钟
strategy: new LRUCacheStrategy(),
enableStatistics: true,
};
// L2: 更大的缓存,用于存储更多结果
const l2Config: CacheConfig = {
maxSize: 1000,
maxMemory: 50 * 1024 * 1024, // 50MB
defaultTTL: 30 * 60 * 1000, // 30分钟
strategy: new LFUCacheStrategy(),
enableStatistics: true,
};
this.cache = new TieredCacheManager(l1Config, l2Config, this.logger);
}
async getCachedResult(toolName: string, args: any): Promise<any> {
const key = this.generateCacheKey(toolName, args);
const result = await this.cache.get(key);
if (result) {
this.logger.debug('Tool result cache hit', { toolName, key: key.substring(0, 32) });
}
return result;
}
async cacheResult(toolName: string, args: any, result: any, ttl?: number): Promise<void> {
const key = this.generateCacheKey(toolName, args);
await this.cache.set(key, result, ttl);
this.logger.debug('Tool result cached', { toolName, key: key.substring(0, 32) });
}
async invalidateByTool(toolName: string): Promise<void> {
// 由于当前实现没有按前缀删除功能,这里只是示例
// 实际实现中应该维护工具->键的映射
await this.cache.clear();
this.logger.info('Tool cache invalidated', { toolName });
}
private generateCacheKey(toolName: string, args: any): string {
const crypto = require('crypto');
const argsString = JSON.stringify(args || {}, Object.keys(args || {}).sort());
return crypto
.createHash('sha256')
.update(`${toolName}:${argsString}`)
.digest('hex');
}
getStatistics() {
return this.cache.getStatistics();
}
}
export class ResourceCache {
private contentCache: CacheManager<Buffer>;
private metadataCache: CacheManager<any>;
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger.child({}, 'resource-cache');
// 内容缓存配置
const contentConfig: CacheConfig = {
maxSize: 200,
maxMemory: 100 * 1024 * 1024, // 100MB
defaultTTL: 10 * 60 * 1000, // 10分钟
strategy: new LRUCacheStrategy(),
enableStatistics: true,
};
// 元数据缓存配置
const metadataConfig: CacheConfig = {
maxSize: 1000,
defaultTTL: 60 * 60 * 1000, // 1小时
strategy: new LRUCacheStrategy(),
enableStatistics: true,
};
this.contentCache = new CacheManager(contentConfig, this.logger);
this.metadataCache = new CacheManager(metadataConfig, this.logger);
}
async getCachedContent(uri: string): Promise<Buffer | null> {
return await this.contentCache.get(uri);
}
async cacheContent(uri: string, content: Buffer, ttl?: number): Promise<void> {
await this.contentCache.set(uri, content, ttl);
this.logger.debug('Resource content cached', { uri, size: content.length });
}
async getCachedMetadata(uri: string): Promise<any> {
return await this.metadataCache.get(uri);
}
async cacheMetadata(uri: string, metadata: any, ttl?: number): Promise<void> {
await this.metadataCache.set(uri, metadata, ttl);
this.logger.debug('Resource metadata cached', { uri });
}
async invalidateResource(uri: string): Promise<void> {
await this.contentCache.delete(uri);
await this.metadataCache.delete(uri);
this.logger.debug('Resource cache invalidated', { uri });
}
getStatistics() {
return {
content: this.contentCache.getStatistics(),
metadata: this.metadataCache.getStatistics(),
};
}
}
10.3 并发处理和异步优化
10.3.1 请求队列和限流
// src/concurrency/RequestQueue.ts
export interface QueuedRequest {
id: string;
request: any;
priority: number;
timestamp: number;
resolve: (result: any) => void;
reject: (error: any) => void;
}
export interface QueueConfig {
maxConcurrent: number;
maxQueue: number;
timeout: number; // 请求超时时间
priorityLevels: number;
}
export class RequestQueue {
private queue: QueuedRequest[] = [];
private processing = new Map<string, QueuedRequest>();
private config: QueueConfig;
private logger: Logger;
private processingCount = 0;
constructor(config: QueueConfig, logger: Logger) {
this.config = config;
this.logger = logger.child({}, 'request-queue');
// 定期清理超时请求
setInterval(() => {
this.cleanupTimeouts();
}, 5000);
}
async enqueue<T>(
request: any,
processor: (request: any) => Promise<T>,
priority: number = 0
): Promise<T> {
// 检查队列容量
if (this.queue.length >= this.config.maxQueue) {
throw new Error('Request queue is full');
}
return new Promise<T>((resolve, reject) => {
const queuedRequest: QueuedRequest = {
id: this.generateRequestId(),
request,
priority,
timestamp: Date.now(),
resolve,
reject,
};
// 插入队列并按优先级排序
this.queue.push(queuedRequest);
this.queue.sort((a, b) => b.priority - a.priority);
this.logger.debug('Request enqueued', {
requestId: queuedRequest.id,
priority,
queueSize: this.queue.length,
});
// 尝试处理请求
this.processNext(processor);
});
}
private async processNext<T>(processor: (request: any) => Promise<T>): Promise<void> {
// 检查并发限制
if (this.processingCount >= this.config.maxConcurrent) {
return;
}
// 从队列中取出下一个请求
const queuedRequest = this.queue.shift();
if (!queuedRequest) {
return;
}
this.processingCount++;
this.processing.set(queuedRequest.id, queuedRequest);
this.logger.debug('Request processing started', {
requestId: queuedRequest.id,
processingCount: this.processingCount,
});
try {
// 处理请求
const result = await processor(queuedRequest.request);
// 成功完成
queuedRequest.resolve(result);
this.logger.debug('Request completed successfully', {
requestId: queuedRequest.id,
duration: Date.now() - queuedRequest.timestamp,
});
} catch (error) {
// 处理失败
queuedRequest.reject(error);
this.logger.warn('Request processing failed', {
requestId: queuedRequest.id,
duration: Date.now() - queuedRequest.timestamp,
error: error instanceof Error ? error.message : String(error),
});
} finally {
// 清理并继续处理下一个
this.processing.delete(queuedRequest.id);
this.processingCount--;
// 递归处理下一个请求
setImmediate(() => this.processNext(processor));
}
}
private cleanupTimeouts(): void {
const now = Date.now();
let timeoutCount = 0;
// 清理队列中超时的请求
this.queue = this.queue.filter(req => {
if (now - req.timestamp > this.config.timeout) {
req.reject(new Error('Request timeout'));
timeoutCount++;
return false;
}
return true;
});
// 清理处理中超时的请求
for (const [id, req] of this.processing) {
if (now - req.timestamp > this.config.timeout) {
req.reject(new Error('Request timeout'));
this.processing.delete(id);
this.processingCount--;
timeoutCount++;
}
}
if (timeoutCount > 0) {
this.logger.warn('Cleaned up timed out requests', { count: timeoutCount });
}
}
getStatistics() {
return {
queueSize: this.queue.length,
processingCount: this.processingCount,
maxConcurrent: this.config.maxConcurrent,
utilization: this.processingCount / this.config.maxConcurrent,
};
}
private generateRequestId(): string {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
10.3.2 工作池和任务调度
// src/concurrency/WorkerPool.ts
export interface WorkerTask {
id: string;
type: string;
data: any;
priority: number;
timeout?: number;
}
export interface WorkerResult {
taskId: string;
success: boolean;
result?: any;
error?: string;
duration: number;
}
export interface Worker {
id: string;
busy: boolean;
currentTask?: WorkerTask;
totalTasks: number;
successfulTasks: number;
failedTasks: number;
averageTaskTime: number;
lastTaskTime?: number;
}
export class WorkerPool {
private workers: Map<string, Worker> = new Map();
private taskQueue: WorkerTask[] = [];
private taskProcessors = new Map<string, (data: any) => Promise<any>>();
private maxWorkers: number;
private logger: Logger;
private taskTimeout = 30000; // 30秒默认超时
constructor(maxWorkers: number, logger: Logger) {
this.maxWorkers = maxWorkers;
this.logger = logger.child({}, 'worker-pool');
// 初始化工作进程
for (let i = 0; i < maxWorkers; i++) {
this.createWorker();
}
}
registerTaskProcessor(taskType: string, processor: (data: any) => Promise<any>): void {
this.taskProcessors.set(taskType, processor);
this.logger.debug('Task processor registered', { taskType });
}
async submitTask(task: Omit<WorkerTask, 'id'>): Promise<WorkerResult> {
const fullTask: WorkerTask = {
...task,
id: this.generateTaskId(),
};
return new Promise((resolve, reject) => {
const timeout = task.timeout || this.taskTimeout;
// 设置超时
const timeoutHandle = setTimeout(() => {
reject(new Error(`Task timeout after ${timeout}ms`));
}, timeout);
// 处理任务
this.processTask(fullTask).then(result => {
clearTimeout(timeoutHandle);
resolve(result);
}).catch(error => {
clearTimeout(timeoutHandle);
reject(error);
});
});
}
private async processTask(task: WorkerTask): Promise<WorkerResult> {
const processor = this.taskProcessors.get(task.type);
if (!processor) {
throw new Error(`No processor registered for task type: ${task.type}`);
}
// 找到可用的工作进程
const worker = this.getAvailableWorker();
if (!worker) {
// 将任务加入队列
this.taskQueue.push(task);
this.taskQueue.sort((a, b) => b.priority - a.priority);
this.logger.debug('Task queued', {
taskId: task.id,
taskType: task.type,
queueSize: this.taskQueue.length,
});
// 等待工作进程可用
return await this.waitForWorker(task);
}
return await this.executeTaskOnWorker(worker, task, processor);
}
private async executeTaskOnWorker(
worker: Worker,
task: WorkerTask,
processor: (data: any) => Promise<any>
): Promise<WorkerResult> {
const startTime = Date.now();
// 标记工作进程为忙碌
worker.busy = true;
worker.currentTask = task;
this.logger.debug('Task execution started', {
workerId: worker.id,
taskId: task.id,
taskType: task.type,
});
try {
const result = await processor(task.data);
const duration = Date.now() - startTime;
// 更新工作进程统计
worker.totalTasks++;
worker.successfulTasks++;
worker.averageTaskTime = this.updateAverageTime(worker.averageTaskTime, duration, worker.totalTasks);
worker.lastTaskTime = duration;
const workerResult: WorkerResult = {
taskId: task.id,
success: true,
result,
duration,
};
this.logger.debug('Task completed successfully', {
workerId: worker.id,
taskId: task.id,
duration,
});
return workerResult;
} catch (error) {
const duration = Date.now() - startTime;
// 更新工作进程统计
worker.totalTasks++;
worker.failedTasks++;
worker.averageTaskTime = this.updateAverageTime(worker.averageTaskTime, duration, worker.totalTasks);
worker.lastTaskTime = duration;
const workerResult: WorkerResult = {
taskId: task.id,
success: false,
error: error instanceof Error ? error.message : String(error),
duration,
};
this.logger.warn('Task execution failed', {
workerId: worker.id,
taskId: task.id,
duration,
error: error instanceof Error ? error.message : String(error),
});
return workerResult;
} finally {
// 释放工作进程
worker.busy = false;
worker.currentTask = undefined;
// 处理队列中的下一个任务
setImmediate(() => this.processQueuedTasks());
}
}
private async waitForWorker(task: WorkerTask): Promise<WorkerResult> {
return new Promise((resolve, reject) => {
const checkForWorker = () => {
const worker = this.getAvailableWorker();
if (worker) {
const processor = this.taskProcessors.get(task.type)!;
this.executeTaskOnWorker(worker, task, processor).then(resolve).catch(reject);
} else {
setTimeout(checkForWorker, 10); // 10ms后重试
}
};
checkForWorker();
});
}
private processQueuedTasks(): void {
while (this.taskQueue.length > 0) {
const worker = this.getAvailableWorker();
if (!worker) break;
const task = this.taskQueue.shift()!;
const processor = this.taskProcessors.get(task.type);
if (processor) {
this.executeTaskOnWorker(worker, task, processor).catch(error => {
this.logger.error('Queued task execution failed', error);
});
}
}
}
private getAvailableWorker(): Worker | null {
for (const worker of this.workers.values()) {
if (!worker.busy) {
return worker;
}
}
return null;
}
private createWorker(): Worker {
const worker: Worker = {
id: this.generateWorkerId(),
busy: false,
totalTasks: 0,
successfulTasks: 0,
failedTasks: 0,
averageTaskTime: 0,
};
this.workers.set(worker.id, worker);
this.logger.debug('Worker created', { workerId: worker.id });
return worker;
}
private updateAverageTime(currentAverage: number, newTime: number, totalTasks: number): number {
if (totalTasks === 1) return newTime;
return (currentAverage * (totalTasks - 1) + newTime) / totalTasks;
}
getStatistics() {
const workers = Array.from(this.workers.values());
const busyWorkers = workers.filter(w => w.busy).length;
const totalTasks = workers.reduce((sum, w) => sum + w.totalTasks, 0);
const successfulTasks = workers.reduce((sum, w) => sum + w.successfulTasks, 0);
const failedTasks = workers.reduce((sum, w) => sum + w.failedTasks, 0);
return {
totalWorkers: workers.length,
busyWorkers,
availableWorkers: workers.length - busyWorkers,
utilization: busyWorkers / workers.length,
queueSize: this.taskQueue.length,
totalTasks,
successfulTasks,
failedTasks,
successRate: totalTasks > 0 ? successfulTasks / totalTasks : 0,
};
}
private generateTaskId(): string {
return `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private generateWorkerId(): string {
return `worker_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
10.4 内存管理和优化
10.4.1 内存池和对象复用
// src/memory/ObjectPool.ts
export interface PoolConfig {
initialSize: number;
maxSize: number;
factory: () => any;
reset?: (obj: any) => void;
validate?: (obj: any) => boolean;
}
export class ObjectPool<T> {
private available: T[] = [];
private inUse = new Set<T>();
private config: PoolConfig;
private created = 0;
private logger: Logger;
constructor(config: PoolConfig, logger: Logger) {
this.config = config;
this.logger = logger.child({}, 'object-pool');
// 预创建对象
for (let i = 0; i < config.initialSize; i++) {
this.createObject();
}
this.logger.debug('Object pool initialized', {
initialSize: config.initialSize,
maxSize: config.maxSize,
});
}
acquire(): T {
let obj: T;
if (this.available.length > 0) {
obj = this.available.pop()!;
this.logger.debug('Object acquired from pool', { available: this.available.length });
} else if (this.created < this.config.maxSize) {
obj = this.createObject();
this.logger.debug('New object created', { created: this.created });
} else {
throw new Error('Object pool exhausted');
}
// 验证对象
if (this.config.validate && !this.config.validate(obj)) {
this.logger.warn('Invalid object detected, creating new one');
obj = this.config.factory();
this.created++;
}
this.inUse.add(obj);
return obj;
}
release(obj: T): void {
if (!this.inUse.has(obj)) {
this.logger.warn('Attempting to release object not from this pool');
return;
}
this.inUse.delete(obj);
// 重置对象状态
if (this.config.reset) {
try {
this.config.reset(obj);
} catch (error) {
this.logger.warn('Object reset failed, discarding object', { error });
this.created--;
return;
}
}
// 验证重置后的对象
if (this.config.validate && !this.config.validate(obj)) {
this.logger.warn('Object validation failed after reset, discarding');
this.created--;
return;
}
this.available.push(obj);
this.logger.debug('Object returned to pool', { available: this.available.length });
}
clear(): void {
this.available.length = 0;
this.inUse.clear();
this.created = 0;
this.logger.info('Object pool cleared');
}
getStatistics() {
return {
available: this.available.length,
inUse: this.inUse.size,
created: this.created,
maxSize: this.config.maxSize,
utilization: this.inUse.size / this.created,
};
}
private createObject(): T {
const obj = this.config.factory();
this.created++;
this.available.push(obj);
return obj;
}
}
// Buffer池,用于复用Buffer对象
export class BufferPool {
private pools = new Map<number, ObjectPool<Buffer>>();
private commonSizes = [1024, 4096, 16384, 65536, 262144]; // 常用的Buffer大小
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger.child({}, 'buffer-pool');
// 为常用大小创建对象池
for (const size of this.commonSizes) {
this.createPoolForSize(size);
}
}
acquire(size: number): Buffer {
const poolSize = this.findBestPoolSize(size);
const pool = this.pools.get(poolSize);
if (pool) {
try {
const buffer = pool.acquire();
// 如果需要的大小小于Pool的大小,返回slice
return size < poolSize ? buffer.slice(0, size) : buffer;
} catch (error) {
this.logger.debug('Pool exhausted, creating new buffer', { size, poolSize });
}
}
// 如果没有合适的池或池已满,创建新的Buffer
return Buffer.alloc(size);
}
release(buffer: Buffer): void {
const size = buffer.length;
const poolSize = this.commonSizes.find(s => s >= size);
if (poolSize) {
const pool = this.pools.get(poolSize);
if (pool) {
// 如果buffer大小匹配池大小,直接返回;否则需要重新分配
if (size === poolSize) {
buffer.fill(0); // 清理数据
pool.release(buffer);
}
// 如果大小不匹配,让GC处理
}
}
}
private findBestPoolSize(size: number): number {
return this.commonSizes.find(s => s >= size) || size;
}
private createPoolForSize(size: number): void {
const pool = new ObjectPool<Buffer>({
initialSize: 10,
maxSize: 50,
factory: () => Buffer.alloc(size),
reset: (buffer) => buffer.fill(0),
validate: (buffer) => buffer.length === size,
}, this.logger);
this.pools.set(size, pool);
}
getStatistics() {
const stats: Record<number, any> = {};
for (const [size, pool] of this.pools) {
stats[size] = pool.getStatistics();
}
return stats;
}
}
10.4.2 内存监控和泄漏检测
// src/memory/MemoryMonitor.ts
export interface MemorySnapshot {
timestamp: Date;
heapUsed: number;
heapTotal: number;
external: number;
rss: number;
arrayBuffers: number;
}
export interface MemoryLeak {
id: string;
detectedAt: Date;
type: 'heap-growth' | 'external-growth' | 'handle-leak';
severity: 'low' | 'medium' | 'high' | 'critical';
details: Record<string, any>;
snapshots: MemorySnapshot[];
}
export class MemoryMonitor {
private snapshots: MemorySnapshot[] = [];
private maxSnapshots = 1000;
private monitorInterval: NodeJS.Timeout | null = null;
private baselineSnapshot?: MemorySnapshot;
private detectedLeaks: MemoryLeak[] = [];
private logger: Logger;
// 阈值配置
private thresholds = {
heapGrowthRate: 10 * 1024 * 1024, // 10MB/分钟
externalGrowthRate: 5 * 1024 * 1024, // 5MB/分钟
maxHeapSize: 500 * 1024 * 1024, // 500MB
maxExternalSize: 100 * 1024 * 1024, // 100MB
};
constructor(logger: Logger, intervalMs: number = 60000) {
this.logger = logger.child({}, 'memory-monitor');
// 立即获取基线快照
this.baselineSnapshot = this.takeSnapshot();
// 开始定期监控
this.start(intervalMs);
}
start(intervalMs: number): void {
if (this.monitorInterval) {
clearInterval(this.monitorInterval);
}
this.monitorInterval = setInterval(() => {
this.collectMemoryData();
}, intervalMs);
this.logger.info('Memory monitoring started', { intervalMs });
}
stop(): void {
if (this.monitorInterval) {
clearInterval(this.monitorInterval);
this.monitorInterval = null;
}
this.logger.info('Memory monitoring stopped');
}
private collectMemoryData(): void {
const snapshot = this.takeSnapshot();
this.snapshots.push(snapshot);
// 限制快照数量
if (this.snapshots.length > this.maxSnapshots) {
this.snapshots.shift();
}
// 检查内存泄漏
this.detectMemoryLeaks();
// 记录当前内存使用情况
this.logger.debug('Memory snapshot taken', {
heapUsed: Math.round(snapshot.heapUsed / 1024 / 1024),
heapTotal: Math.round(snapshot.heapTotal / 1024 / 1024),
rss: Math.round(snapshot.rss / 1024 / 1024),
});
}
private takeSnapshot(): MemorySnapshot {
const memUsage = process.memoryUsage();
return {
timestamp: new Date(),
heapUsed: memUsage.heapUsed,
heapTotal: memUsage.heapTotal,
external: memUsage.external,
rss: memUsage.rss,
arrayBuffers: memUsage.arrayBuffers,
};
}
private detectMemoryLeaks(): void {
if (this.snapshots.length < 10) {
return; // 需要足够的数据点
}
const recent = this.snapshots.slice(-10);
const old = this.snapshots.slice(-20, -10);
if (old.length < 10) return;
// 检查堆内存增长
this.checkHeapGrowth(old, recent);
// 检查外部内存增长
this.checkExternalGrowth(old, recent);
// 检查绝对大小阈值
this.checkAbsoluteThresholds(recent[recent.length - 1]);
}
private checkHeapGrowth(oldSnapshots: MemorySnapshot[], recentSnapshots: MemorySnapshot[]): void {
const oldAvg = this.calculateAverage(oldSnapshots, 'heapUsed');
const recentAvg = this.calculateAverage(recentSnapshots, 'heapUsed');
const growth = recentAvg - oldAvg;
if (growth > this.thresholds.heapGrowthRate) {
const leak: MemoryLeak = {
id: this.generateLeakId(),
detectedAt: new Date(),
type: 'heap-growth',
severity: this.calculateSeverity(growth, this.thresholds.heapGrowthRate),
details: {
growthBytes: growth,
growthMB: Math.round(growth / 1024 / 1024),
oldAverage: oldAvg,
recentAverage: recentAvg,
threshold: this.thresholds.heapGrowthRate,
},
snapshots: [...oldSnapshots, ...recentSnapshots],
};
this.detectedLeaks.push(leak);
this.logger.warn('Heap memory leak detected', {
leakId: leak.id,
growth: Math.round(growth / 1024 / 1024) + 'MB',
severity: leak.severity,
});
}
}
private checkExternalGrowth(oldSnapshots: MemorySnapshot[], recentSnapshots: MemorySnapshot[]): void {
const oldAvg = this.calculateAverage(oldSnapshots, 'external');
const recentAvg = this.calculateAverage(recentSnapshots, 'external');
const growth = recentAvg - oldAvg;
if (growth > this.thresholds.externalGrowthRate) {
const leak: MemoryLeak = {
id: this.generateLeakId(),
detectedAt: new Date(),
type: 'external-growth',
severity: this.calculateSeverity(growth, this.thresholds.externalGrowthRate),
details: {
growthBytes: growth,
growthMB: Math.round(growth / 1024 / 1024),
oldAverage: oldAvg,
recentAverage: recentAvg,
threshold: this.thresholds.externalGrowthRate,
},
snapshots: [...oldSnapshots, ...recentSnapshots],
};
this.detectedLeaks.push(leak);
this.logger.warn('External memory leak detected', {
leakId: leak.id,
growth: Math.round(growth / 1024 / 1024) + 'MB',
severity: leak.severity,
});
}
}
private checkAbsoluteThresholds(snapshot: MemorySnapshot): void {
if (snapshot.heapUsed > this.thresholds.maxHeapSize) {
this.logger.error('Heap size exceeded threshold', {
current: Math.round(snapshot.heapUsed / 1024 / 1024) + 'MB',
threshold: Math.round(this.thresholds.maxHeapSize / 1024 / 1024) + 'MB',
});
}
if (snapshot.external > this.thresholds.maxExternalSize) {
this.logger.error('External memory size exceeded threshold', {
current: Math.round(snapshot.external / 1024 / 1024) + 'MB',
threshold: Math.round(this.thresholds.maxExternalSize / 1024 / 1024) + 'MB',
});
}
}
private calculateAverage(snapshots: MemorySnapshot[], field: keyof MemorySnapshot): number {
const values = snapshots.map(s => s[field] as number);
return values.reduce((sum, val) => sum + val, 0) / values.length;
}
private calculateSeverity(value: number, threshold: number): 'low' | 'medium' | 'high' | 'critical' {
const ratio = value / threshold;
if (ratio >= 5) return 'critical';
if (ratio >= 3) return 'high';
if (ratio >= 2) return 'medium';
return 'low';
}
getCurrentMemoryUsage(): MemorySnapshot {
return this.takeSnapshot();
}
getMemoryTrend(windowMinutes: number = 30): {
trend: 'increasing' | 'decreasing' | 'stable';
changeRate: number; // bytes per minute
confidence: number; // 0-1
} {
const windowMs = windowMinutes * 60 * 1000;
const cutoff = Date.now() - windowMs;
const relevantSnapshots = this.snapshots.filter(s => s.timestamp.getTime() > cutoff);
if (relevantSnapshots.length < 3) {
return { trend: 'stable', changeRate: 0, confidence: 0 };
}
// 简单线性回归计算趋势
const n = relevantSnapshots.length;
const times = relevantSnapshots.map(s => s.timestamp.getTime() - cutoff);
const heapUsages = relevantSnapshots.map(s => s.heapUsed);
const sumX = times.reduce((sum, t) => sum + t, 0);
const sumY = heapUsages.reduce((sum, h) => sum + h, 0);
const sumXY = times.reduce((sum, t, i) => sum + t * heapUsages[i], 0);
const sumXX = times.reduce((sum, t) => sum + t * t, 0);
const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
const changeRatePerMs = slope;
const changeRatePerMinute = changeRatePerMs * 60 * 1000;
// 计算相关系数作为置信度
const avgX = sumX / n;
const avgY = sumY / n;
const numerator = times.reduce((sum, t, i) => sum + (t - avgX) * (heapUsages[i] - avgY), 0);
const denomX = Math.sqrt(times.reduce((sum, t) => sum + (t - avgX) * (t - avgX), 0));
const denomY = Math.sqrt(heapUsages.reduce((sum, h) => sum + (h - avgY) * (h - avgY), 0));
const correlation = numerator / (denomX * denomY);
const confidence = Math.abs(correlation);
let trend: 'increasing' | 'decreasing' | 'stable';
if (Math.abs(changeRatePerMinute) < 1024 * 1024) { // < 1MB/min
trend = 'stable';
} else if (changeRatePerMinute > 0) {
trend = 'increasing';
} else {
trend = 'decreasing';
}
return {
trend,
changeRate: changeRatePerMinute,
confidence: Math.min(1, confidence),
};
}
getDetectedLeaks(): MemoryLeak[] {
return [...this.detectedLeaks];
}
getMemoryReport(): {
current: MemorySnapshot;
baseline: MemorySnapshot;
trend: ReturnType<typeof this.getMemoryTrend>;
leaks: MemoryLeak[];
statistics: {
uptimeHours: number;
snapshotCount: number;
avgHeapUsage: number;
maxHeapUsage: number;
avgRss: number;
maxRss: number;
};
} {
const current = this.getCurrentMemoryUsage();
const trend = this.getMemoryTrend();
const leaks = this.getDetectedLeaks();
const heapUsages = this.snapshots.map(s => s.heapUsed);
const rssUsages = this.snapshots.map(s => s.rss);
const statistics = {
uptimeHours: process.uptime() / 3600,
snapshotCount: this.snapshots.length,
avgHeapUsage: heapUsages.reduce((sum, h) => sum + h, 0) / heapUsages.length || 0,
maxHeapUsage: Math.max(...heapUsages) || 0,
avgRss: rssUsages.reduce((sum, r) => sum + r, 0) / rssUsages.length || 0,
maxRss: Math.max(...rssUsages) || 0,
};
return {
current,
baseline: this.baselineSnapshot!,
trend,
leaks,
statistics,
};
}
private generateLeakId(): string {
return `leak_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
10.5 可扩展架构设计
10.5.1 微服务架构模式
// src/architecture/MicroserviceServer.ts
export interface ServiceConfig {
name: string;
version: string;
port?: number;
dependencies?: string[];
resources?: {
cpu?: number;
memory?: number;
};
}
export interface ServiceDiscovery {
register(service: ServiceConfig, endpoint: string): Promise<void>;
discover(serviceName: string): Promise<string[]>;
unregister(serviceName: string, endpoint: string): Promise<void>;
healthCheck(serviceName: string, endpoint: string): Promise<boolean>;
}
// 简单的服务发现实现
export class InMemoryServiceDiscovery implements ServiceDiscovery {
private services = new Map<string, Set<string>>();
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger.child({}, 'service-discovery');
}
async register(service: ServiceConfig, endpoint: string): Promise<void> {
if (!this.services.has(service.name)) {
this.services.set(service.name, new Set());
}
this.services.get(service.name)!.add(endpoint);
this.logger.info('Service registered', {
serviceName: service.name,
endpoint,
version: service.version,
});
}
async discover(serviceName: string): Promise<string[]> {
const endpoints = this.services.get(serviceName);
return endpoints ? Array.from(endpoints) : [];
}
async unregister(serviceName: string, endpoint: string): Promise<void> {
const endpoints = this.services.get(serviceName);
if (endpoints) {
endpoints.delete(endpoint);
if (endpoints.size === 0) {
this.services.delete(serviceName);
}
this.logger.info('Service unregistered', { serviceName, endpoint });
}
}
async healthCheck(serviceName: string, endpoint: string): Promise<boolean> {
// 简单的健康检查实现
try {
const response = await fetch(`${endpoint}/health`);
return response.ok;
} catch {
return false;
}
}
}
export class MicroserviceServer extends SecureMCPServer {
private serviceConfig: ServiceConfig;
private serviceDiscovery: ServiceDiscovery;
private httpServer?: any;
private serviceEndpoints = new Map<string, string>();
constructor(
config: ServerConfig & { security: SecurityConfig },
serviceConfig: ServiceConfig,
serviceDiscovery: ServiceDiscovery
) {
super(config);
this.serviceConfig = serviceConfig;
this.serviceDiscovery = serviceDiscovery;
}
async start(): Promise<void> {
// 启动基础MCP Server
await super.start();
// 启动HTTP健康检查端点
if (this.serviceConfig.port) {
await this.startHTTPServer();
}
// 注册到服务发现
await this.registerService();
// 发现依赖服务
await this.discoverDependencies();
this.logger.info('Microservice started', {
serviceName: this.serviceConfig.name,
port: this.serviceConfig.port,
});
}
async stop(reason?: string): Promise<void> {
// 从服务发现注销
await this.unregisterService();
// 关闭HTTP服务器
if (this.httpServer) {
this.httpServer.close();
}
// 停止基础服务
await super.stop(reason);
}
private async startHTTPServer(): Promise<void> {
const express = require('express');
const app = express();
// 健康检查端点
app.get('/health', async (req: any, res: any) => {
const health = await this.getHealth();
const statusCode = health.status === 'healthy' ? 200 : 503;
res.status(statusCode).json(health);
});
// 指标端点
app.get('/metrics', async (req: any, res: any) => {
const metrics = await this.getMetrics();
res.json(metrics);
});
// 服务信息端点
app.get('/info', (req: any, res: any) => {
res.json({
name: this.serviceConfig.name,
version: this.serviceConfig.version,
dependencies: this.serviceConfig.dependencies || [],
resources: this.serviceConfig.resources || {},
endpoints: Object.fromEntries(this.serviceEndpoints),
});
});
this.httpServer = app.listen(this.serviceConfig.port, () => {
this.logger.info('HTTP server started', { port: this.serviceConfig.port });
});
}
private async registerService(): Promise<void> {
if (this.serviceConfig.port) {
const endpoint = `http://localhost:${this.serviceConfig.port}`;
await this.serviceDiscovery.register(this.serviceConfig, endpoint);
}
}
private async unregisterService(): Promise<void> {
if (this.serviceConfig.port) {
const endpoint = `http://localhost:${this.serviceConfig.port}`;
await this.serviceDiscovery.unregister(this.serviceConfig.name, endpoint);
}
}
private async discoverDependencies(): Promise<void> {
if (!this.serviceConfig.dependencies) return;
for (const dependency of this.serviceConfig.dependencies) {
const endpoints = await this.serviceDiscovery.discover(dependency);
if (endpoints.length > 0) {
// 选择第一个可用端点(实际中可能需要负载均衡)
this.serviceEndpoints.set(dependency, endpoints[0]);
this.logger.info('Dependency discovered', {
dependency,
endpoint: endpoints[0],
totalEndpoints: endpoints.length,
});
} else {
this.logger.warn('Dependency not found', { dependency });
}
}
}
// 调用其他服务的方法
protected async callService(serviceName: string, method: string, params?: any): Promise<any> {
const endpoint = this.serviceEndpoints.get(serviceName);
if (!endpoint) {
throw new Error(`Service not available: ${serviceName}`);
}
// 这里应该实现实际的服务调用逻辑
// 例如HTTP请求或gRPC调用
this.logger.debug('Calling service', { serviceName, method, endpoint });
// 示例实现(需要根据实际协议调整)
const response = await fetch(`${endpoint}/api/${method}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(params || {}),
});
if (!response.ok) {
throw new Error(`Service call failed: ${response.statusText}`);
}
return await response.json();
}
}
10.5.2 负载均衡和高可用
// src/architecture/LoadBalancer.ts
export interface LoadBalancingStrategy {
name: string;
selectEndpoint(endpoints: ServiceEndpoint[]): ServiceEndpoint | null;
recordResult(endpoint: ServiceEndpoint, success: boolean, responseTime: number): void;
}
export interface ServiceEndpoint {
id: string;
url: string;
weight: number;
healthy: boolean;
lastCheck: Date;
responseTime: number;
successRate: number;
totalRequests: number;
successfulRequests: number;
failedRequests: number;
}
// 轮询策略
export class RoundRobinStrategy implements LoadBalancingStrategy {
name = 'round-robin';
private currentIndex = 0;
selectEndpoint(endpoints: ServiceEndpoint[]): ServiceEndpoint | null {
const healthyEndpoints = endpoints.filter(ep => ep.healthy);
if (healthyEndpoints.length === 0) return null;
const selected = healthyEndpoints[this.currentIndex % healthyEndpoints.length];
this.currentIndex = (this.currentIndex + 1) % healthyEndpoints.length;
return selected;
}
recordResult(endpoint: ServiceEndpoint, success: boolean, responseTime: number): void {
endpoint.totalRequests++;
endpoint.responseTime = responseTime;
if (success) {
endpoint.successfulRequests++;
} else {
endpoint.failedRequests++;
}
endpoint.successRate = endpoint.successfulRequests / endpoint.totalRequests;
}
}
// 加权轮询策略
export class WeightedRoundRobinStrategy implements LoadBalancingStrategy {
name = 'weighted-round-robin';
private weights = new Map<string, number>();
selectEndpoint(endpoints: ServiceEndpoint[]): ServiceEndpoint | null {
const healthyEndpoints = endpoints.filter(ep => ep.healthy);
if (healthyEndpoints.length === 0) return null;
// 计算总权重
const totalWeight = healthyEndpoints.reduce((sum, ep) => sum + ep.weight, 0);
if (totalWeight === 0) return healthyEndpoints[0];
// 使用权重选择
let currentWeight = 0;
for (const endpoint of healthyEndpoints) {
currentWeight += endpoint.weight;
const endpointWeight = this.weights.get(endpoint.id) || 0;
if (endpointWeight < endpoint.weight) {
this.weights.set(endpoint.id, endpointWeight + 1);
return endpoint;
}
}
// 重置权重并选择第一个
this.weights.clear();
this.weights.set(healthyEndpoints[0].id, 1);
return healthyEndpoints[0];
}
recordResult(endpoint: ServiceEndpoint, success: boolean, responseTime: number): void {
endpoint.totalRequests++;
endpoint.responseTime = responseTime;
if (success) {
endpoint.successfulRequests++;
} else {
endpoint.failedRequests++;
}
endpoint.successRate = endpoint.successfulRequests / endpoint.totalRequests;
// 动态调整权重(可选)
if (success && responseTime < 100) {
endpoint.weight = Math.min(10, endpoint.weight + 0.1);
} else if (!success || responseTime > 1000) {
endpoint.weight = Math.max(1, endpoint.weight - 0.2);
}
}
}
// 最少连接策略
export class LeastConnectionsStrategy implements LoadBalancingStrategy {
name = 'least-connections';
private activeConnections = new Map<string, number>();
selectEndpoint(endpoints: ServiceEndpoint[]): ServiceEndpoint | null {
const healthyEndpoints = endpoints.filter(ep => ep.healthy);
if (healthyEndpoints.length === 0) return null;
// 选择活跃连接最少的端点
let selected = healthyEndpoints[0];
let minConnections = this.activeConnections.get(selected.id) || 0;
for (const endpoint of healthyEndpoints) {
const connections = this.activeConnections.get(endpoint.id) || 0;
if (connections < minConnections) {
selected = endpoint;
minConnections = connections;
}
}
// 增加活跃连接数
this.activeConnections.set(selected.id, minConnections + 1);
return selected;
}
recordResult(endpoint: ServiceEndpoint, success: boolean, responseTime: number): void {
// 减少活跃连接数
const current = this.activeConnections.get(endpoint.id) || 0;
this.activeConnections.set(endpoint.id, Math.max(0, current - 1));
endpoint.totalRequests++;
endpoint.responseTime = responseTime;
if (success) {
endpoint.successfulRequests++;
} else {
endpoint.failedRequests++;
}
endpoint.successRate = endpoint.successfulRequests / endpoint.totalRequests;
}
}
export class LoadBalancer {
private endpoints = new Map<string, ServiceEndpoint[]>();
private strategies = new Map<string, LoadBalancingStrategy>();
private defaultStrategy: string = 'round-robin';
private healthCheckInterval: NodeJS.Timeout | null = null;
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger.child({}, 'load-balancer');
// 注册默认策略
this.registerStrategy(new RoundRobinStrategy());
this.registerStrategy(new WeightedRoundRobinStrategy());
this.registerStrategy(new LeastConnectionsStrategy());
// 开始健康检查
this.startHealthChecks();
}
registerStrategy(strategy: LoadBalancingStrategy): void {
this.strategies.set(strategy.name, strategy);
this.logger.debug('Load balancing strategy registered', { name: strategy.name });
}
addEndpoints(serviceName: string, endpoints: ServiceEndpoint[]): void {
this.endpoints.set(serviceName, endpoints);
this.logger.info('Service endpoints added', {
serviceName,
count: endpoints.length,
endpoints: endpoints.map(ep => ep.url),
});
}
removeEndpoint(serviceName: string, endpointId: string): void {
const endpoints = this.endpoints.get(serviceName);
if (endpoints) {
const filtered = endpoints.filter(ep => ep.id !== endpointId);
this.endpoints.set(serviceName, filtered);
this.logger.info('Service endpoint removed', { serviceName, endpointId });
}
}
async callService(
serviceName: string,
requestFn: (endpoint: ServiceEndpoint) => Promise<any>,
strategyName?: string
): Promise<any> {
const endpoints = this.endpoints.get(serviceName);
if (!endpoints || endpoints.length === 0) {
throw new Error(`No endpoints available for service: ${serviceName}`);
}
const strategy = this.strategies.get(strategyName || this.defaultStrategy);
if (!strategy) {
throw new Error(`Unknown load balancing strategy: ${strategyName}`);
}
const endpoint = strategy.selectEndpoint(endpoints);
if (!endpoint) {
throw new Error(`No healthy endpoints available for service: ${serviceName}`);
}
const startTime = Date.now();
let success = false;
try {
const result = await requestFn(endpoint);
success = true;
return result;
} catch (error) {
success = false;
throw error;
} finally {
const responseTime = Date.now() - startTime;
strategy.recordResult(endpoint, success, responseTime);
this.logger.debug('Service call completed', {
serviceName,
endpointId: endpoint.id,
success,
responseTime,
strategy: strategy.name,
});
}
}
private startHealthChecks(): void {
this.healthCheckInterval = setInterval(async () => {
await this.performHealthChecks();
}, 30000); // 每30秒检查一次
}
private async performHealthChecks(): Promise<void> {
const promises: Promise<void>[] = [];
for (const [serviceName, endpoints] of this.endpoints) {
for (const endpoint of endpoints) {
promises.push(this.checkEndpointHealth(serviceName, endpoint));
}
}
await Promise.allSettled(promises);
}
private async checkEndpointHealth(serviceName: string, endpoint: ServiceEndpoint): Promise<void> {
const startTime = Date.now();
try {
// 执行健康检查(这里需要根据实际协议实现)
const response = await fetch(`${endpoint.url}/health`, {
method: 'GET',
timeout: 5000,
});
const healthy = response.ok;
const responseTime = Date.now() - startTime;
if (endpoint.healthy !== healthy) {
this.logger.info('Endpoint health status changed', {
serviceName,
endpointId: endpoint.id,
url: endpoint.url,
healthy,
previousHealth: endpoint.healthy,
});
}
endpoint.healthy = healthy;
endpoint.lastCheck = new Date();
endpoint.responseTime = responseTime;
} catch (error) {
if (endpoint.healthy) {
this.logger.warn('Endpoint health check failed', {
serviceName,
endpointId: endpoint.id,
url: endpoint.url,
error: error instanceof Error ? error.message : String(error),
});
}
endpoint.healthy = false;
endpoint.lastCheck = new Date();
}
}
getEndpointStatistics(serviceName: string): ServiceEndpoint[] | null {
return this.endpoints.get(serviceName) || null;
}
getLoadBalancerStatistics(): {
services: number;
totalEndpoints: number;
healthyEndpoints: number;
strategies: string[];
defaultStrategy: string;
} {
let totalEndpoints = 0;
let healthyEndpoints = 0;
for (const endpoints of this.endpoints.values()) {
totalEndpoints += endpoints.length;
healthyEndpoints += endpoints.filter(ep => ep.healthy).length;
}
return {
services: this.endpoints.size,
totalEndpoints,
healthyEndpoints,
strategies: Array.from(this.strategies.keys()),
defaultStrategy: this.defaultStrategy,
};
}
stop(): void {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
this.logger.info('Load balancer stopped');
}
}
本章总结
第10章深入学习了MCP Server的性能优化和扩展性设计:
核心知识点
- 性能监控:建立了完整的性能指标收集和分析体系
- 缓存系统:实现了多层缓存架构和各种缓存策略
- 并发处理:构建了请求队列、工作池等并发处理机制
- 内存管理:实现了对象池、内存监控和泄漏检测
- 微服务架构:设计了服务发现、负载均衡等分布式系统组件
实践要点
- 建立全面的性能监控和分析体系
- 实现智能的多层缓存系统
- 合理设计并发处理和任务调度机制
- 主动进行内存管理和优化
- 采用微服务架构支持水平扩展
- 实现负载均衡和高可用保障
通过本章学习,掌握了构建高性能、可扩展MCP Server的完整技术体系,为处理大规模、高并发场景提供了坚实的技术基础。