Chapter 4: Tools Development

Haiyue
36min

Chapter 4: Tools Development

Learning Objectives
  • Understand the concept and mechanism of Tools
  • Learn how to define tool schemas and parameters
  • Master tool execution and result handling
  • Implement common tool types (file operations, network requests, etc.)
  • Learn error handling and validation for tools

Knowledge Summary

Tools Concept Explained

Tools is one of the core features of the MCP protocol, allowing AI models to invoke external tools to perform specific operations. Tools provide a standardized way for AI to interact with external systems.

🔄 正在渲染 Mermaid 图表...

Tool Type Classification

Tool TypeMain FunctionsUse CasesSecurity Level
File OperationsRead/write, create, delete filesDocument processing, configuration managementMedium
Network RequestsHTTP/API callsData retrieval, service integrationHigh
System CommandsExecute system commandsBuild, deploy, monitoringVery High
Data ProcessingCompute, transform, analyzeData science, report generationLow
Database OperationsQuery, update databaseData management, analysisHigh

Tool Schema Specification

Schema Elements

Each tool must define:

  1. name - Unique tool identifier
  2. description - Tool function description
  3. inputSchema - JSON Schema for input parameters
  4. metadata - Optional metadata information

Tool Development Practice

1. Basic Tool Framework

# Basic tool development framework
import json
import asyncio
from typing import Dict, Any, List, Optional, Callable
from abc import ABC, abstractmethod
import jsonschema
from jsonschema import validate, ValidationError

class ToolResult:
    """Tool execution result"""

    def __init__(self, content: List[Dict[str, Any]], is_error: bool = False):
        self.content = content
        self.is_error = is_error

    def to_dict(self) -> Dict[str, Any]:
        return {
            "content": self.content,
            "isError": self.is_error
        }

class MCPTool(ABC):
    """MCP tool base class"""

    def __init__(self, name: str, description: str, input_schema: Dict[str, Any]):
        self.name = name
        self.description = description
        self.input_schema = input_schema
        self.metadata = {}

    @abstractmethod
    async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
        """Execute tool logic"""
        pass

    def validate_arguments(self, arguments: Dict[str, Any]) -> bool:
        """Validate if arguments conform to schema"""
        try:
            validate(instance=arguments, schema=self.input_schema)
            return True
        except ValidationError as e:
            raise ValueError(f"Parameter validation failed: {e.message}")

    def get_definition(self) -> Dict[str, Any]:
        """Get tool definition"""
        definition = {
            "name": self.name,
            "description": self.description,
            "inputSchema": self.input_schema
        }
        if self.metadata:
            definition["metadata"] = self.metadata
        return definition

class ToolRegistry:
    """Tool registry"""

    def __init__(self):
        self.tools: Dict[str, MCPTool] = {}

    def register(self, tool: MCPTool) -> None:
        """Register tool"""
        self.tools[tool.name] = tool
        print(f"✅ Registered tool: {tool.name}")

    def get_tool(self, name: str) -> Optional[MCPTool]:
        """Get tool"""
        return self.tools.get(name)

    def list_tools(self) -> List[Dict[str, Any]]:
        """List all tools"""
        return [tool.get_definition() for tool in self.tools.values()]

    async def execute_tool(self, name: str, arguments: Dict[str, Any]) -> ToolResult:
        """Execute tool"""
        tool = self.get_tool(name)
        if not tool:
            return ToolResult([{
                "type": "text",
                "text": f"Tool '{name}' does not exist"
            }], is_error=True)

        try:
            # Validate arguments
            tool.validate_arguments(arguments)

            # Execute tool
            result = await tool.execute(arguments)
            return result

        except Exception as e:
            return ToolResult([{
                "type": "text",
                "text": f"Tool execution failed: {str(e)}"
            }], is_error=True)

# Create global tool registry
tool_registry = ToolRegistry()

2. File Operation Tools

# File operation tool implementation
import os
import pathlib
from typing import Union

class FileReadTool(MCPTool):
    """File read tool"""

    def __init__(self):
        super().__init__(
            name="file_read",
            description="Read the contents of a specified file",
            input_schema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "File path to read"
                    },
                    "encoding": {
                        "type": "string",
                        "description": "File encoding",
                        "default": "utf-8"
                    }
                },
                "required": ["path"]
            }
        )
        self.metadata = {
            "category": "file_operations",
            "risk_level": "medium"
        }

    async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
        """Execute file read"""
        path = arguments["path"]
        encoding = arguments.get("encoding", "utf-8")

        try:
            # Security check: prevent path traversal attacks
            normalized_path = os.path.normpath(path)
            if ".." in normalized_path:
                raise ValueError("Unsafe file path")

            # Check if file exists
            if not os.path.exists(normalized_path):
                raise FileNotFoundError(f"File does not exist: {normalized_path}")

            # Read file content
            with open(normalized_path, 'r', encoding=encoding) as file:
                content = file.read()

            return ToolResult([{
                "type": "text",
                "text": content
            }])

        except Exception as e:
            return ToolResult([{
                "type": "text",
                "text": f"Failed to read file: {str(e)}"
            }], is_error=True)

class FileWriteTool(MCPTool):
    """File write tool"""

    def __init__(self):
        super().__init__(
            name="file_write",
            description="Write content to a specified file",
            input_schema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "File path to write to"
                    },
                    "content": {
                        "type": "string",
                        "description": "Content to write"
                    },
                    "encoding": {
                        "type": "string",
                        "description": "File encoding",
                        "default": "utf-8"
                    },
                    "append": {
                        "type": "boolean",
                        "description": "Append mode",
                        "default": False
                    }
                },
                "required": ["path", "content"]
            }
        )
        self.metadata = {
            "category": "file_operations",
            "risk_level": "high"
        }

    async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
        """Execute file write"""
        path = arguments["path"]
        content = arguments["content"]
        encoding = arguments.get("encoding", "utf-8")
        append_mode = arguments.get("append", False)

        try:
            # Security check
            normalized_path = os.path.normpath(path)
            if ".." in normalized_path:
                raise ValueError("Unsafe file path")

            # Ensure directory exists
            directory = os.path.dirname(normalized_path)
            if directory and not os.path.exists(directory):
                os.makedirs(directory, exist_ok=True)

            # Write file
            mode = 'a' if append_mode else 'w'
            with open(normalized_path, mode, encoding=encoding) as file:
                file.write(content)

            operation = "appended to" if append_mode else "written to"
            return ToolResult([{
                "type": "text",
                "text": f"Successfully {operation} file: {normalized_path}"
            }])

        except Exception as e:
            return ToolResult([{
                "type": "text",
                "text": f"Failed to write file: {str(e)}"
            }], is_error=True)

class FileListTool(MCPTool):
    """File list tool"""

    def __init__(self):
        super().__init__(
            name="file_list",
            description="List files and folders in a specified directory",
            input_schema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "Directory path to list",
                        "default": "."
                    },
                    "include_hidden": {
                        "type": "boolean",
                        "description": "Include hidden files",
                        "default": False
                    },
                    "recursive": {
                        "type": "boolean",
                        "description": "Recursively list subdirectories",
                        "default": False
                    }
                }
            }
        )
        self.metadata = {
            "category": "file_operations",
            "risk_level": "low"
        }

    async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
        """Execute file list"""
        path = arguments.get("path", ".")
        include_hidden = arguments.get("include_hidden", False)
        recursive = arguments.get("recursive", False)

        try:
            normalized_path = os.path.normpath(path)
            if not os.path.exists(normalized_path):
                raise FileNotFoundError(f"Directory does not exist: {normalized_path}")

            if not os.path.isdir(normalized_path):
                raise ValueError(f"Path is not a directory: {normalized_path}")

            files_info = []

            if recursive:
                for root, dirs, files in os.walk(normalized_path):
                    # Process directories
                    for dir_name in dirs:
                        if include_hidden or not dir_name.startswith('.'):
                            full_path = os.path.join(root, dir_name)
                            rel_path = os.path.relpath(full_path, normalized_path)
                            files_info.append({
                                "name": rel_path,
                                "type": "directory",
                                "size": None
                            })

                    # Process files
                    for file_name in files:
                        if include_hidden or not file_name.startswith('.'):
                            full_path = os.path.join(root, file_name)
                            rel_path = os.path.relpath(full_path, normalized_path)
                            stat_info = os.stat(full_path)
                            files_info.append({
                                "name": rel_path,
                                "type": "file",
                                "size": stat_info.st_size,
                                "modified": stat_info.st_mtime
                            })
            else:
                for item in os.listdir(normalized_path):
                    if include_hidden or not item.startswith('.'):
                        item_path = os.path.join(normalized_path, item)
                        stat_info = os.stat(item_path)

                        files_info.append({
                            "name": item,
                            "type": "directory" if os.path.isdir(item_path) else "file",
                            "size": stat_info.st_size if os.path.isfile(item_path) else None,
                            "modified": stat_info.st_mtime
                        })

            # Sort by name
            files_info.sort(key=lambda x: x["name"])

            # Format output
            result_text = f"Directory: {normalized_path}\\n"
            result_text += f"Found {len(files_info)} items\\n\\n"

            for info in files_info:
                type_icon = "📁" if info["type"] == "directory" else "📄"
                size_info = f" ({info['size']} bytes)" if info["size"] is not None else ""
                result_text += f"{type_icon} {info['name']}{size_info}\\n"

            return ToolResult([{
                "type": "text",
                "text": result_text
            }])

        except Exception as e:
            return ToolResult([{
                "type": "text",
                "text": f"Failed to list files: {str(e)}"
            }], is_error=True)

# Register file operation tools
file_read_tool = FileReadTool()
file_write_tool = FileWriteTool()
file_list_tool = FileListTool()

tool_registry.register(file_read_tool)
tool_registry.register(file_write_tool)
tool_registry.register(file_list_tool)

3. Network Request Tools

# Network request tool implementation
import aiohttp
import json
from urllib.parse import urlparse

class HTTPRequestTool(MCPTool):
    """HTTP request tool"""

    def __init__(self):
        super().__init__(
            name="http_request",
            description="Send HTTP request and return response",
            input_schema={
                "type": "object",
                "properties": {
                    "url": {
                        "type": "string",
                        "description": "Request URL"
                    },
                    "method": {
                        "type": "string",
                        "description": "HTTP method",
                        "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
                        "default": "GET"
                    },
                    "headers": {
                        "type": "object",
                        "description": "Request headers",
                        "default": {}
                    },
                    "data": {
                        "type": "object",
                        "description": "Request data (JSON)"
                    },
                    "timeout": {
                        "type": "number",
                        "description": "Timeout (seconds)",
                        "default": 30
                    }
                },
                "required": ["url"]
            }
        )
        self.metadata = {
            "category": "network",
            "risk_level": "high"
        }

    async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
        """Execute HTTP request"""
        url = arguments["url"]
        method = arguments.get("method", "GET").upper()
        headers = arguments.get("headers", {})
        data = arguments.get("data")
        timeout = arguments.get("timeout", 30)

        try:
            # URL security check
            parsed_url = urlparse(url)
            if parsed_url.scheme not in ['http', 'https']:
                raise ValueError("Only HTTP/HTTPS protocols are supported")

            # Set default User-Agent
            if 'User-Agent' not in headers:
                headers['User-Agent'] = 'MCP-Server/1.0'

            async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
                async with session.request(
                    method=method,
                    url=url,
                    headers=headers,
                    json=data if data else None
                ) as response:

                    # Get response content
                    content_type = response.headers.get('Content-Type', '')

                    if 'application/json' in content_type:
                        response_data = await response.json()
                        response_text = json.dumps(response_data, indent=2, ensure_ascii=False)
                    else:
                        response_text = await response.text()

                    # Build result
                    result_content = []

                    # Status information
                    status_text = f"HTTP {response.status} {response.reason}\\n"
                    status_text += f"URL: {url}\\n"
                    status_text += f"Method: {method}\\n"
                    status_text += f"Content-Type: {content_type}\\n\\n"

                    result_content.append({
                        "type": "text",
                        "text": status_text
                    })

                    # Response body
                    if response_text:
                        result_content.append({
                            "type": "text",
                            "text": f"Response content:\\n{response_text}"
                        })

                    return ToolResult(result_content)

        except Exception as e:
            return ToolResult([{
                "type": "text",
                "text": f"HTTP request failed: {str(e)}"
            }], is_error=True)

class WebScrapeTool(MCPTool):
    """Web scraping tool"""

    def __init__(self):
        super().__init__(
            name="web_scrape",
            description="Scrape web page content and extract text",
            input_schema={
                "type": "object",
                "properties": {
                    "url": {
                        "type": "string",
                        "description": "Web page URL to scrape"
                    },
                    "selector": {
                        "type": "string",
                        "description": "CSS selector (optional)"
                    },
                    "extract_links": {
                        "type": "boolean",
                        "description": "Extract links",
                        "default": False
                    }
                },
                "required": ["url"]
            }
        )
        self.metadata = {
            "category": "network",
            "risk_level": "high"
        }

    async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
        """Execute web scraping"""
        url = arguments["url"]
        selector = arguments.get("selector")
        extract_links = arguments.get("extract_links", False)

        try:
            from bs4 import BeautifulSoup

            # Send HTTP request
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    if response.status != 200:
                        raise Exception(f"HTTP {response.status}: {response.reason}")

                    html_content = await response.text()

            # Parse HTML
            soup = BeautifulSoup(html_content, 'html.parser')

            result_content = []

            # Basic information
            title = soup.find('title')
            if title:
                result_content.append({
                    "type": "text",
                    "text": f"Page title: {title.get_text()}\\n"
                })

            # Extract content based on selector
            if selector:
                elements = soup.select(selector)
                if elements:
                    content_text = "\\n".join([elem.get_text().strip() for elem in elements])
                    result_content.append({
                        "type": "text",
                        "text": f"Selector '{selector}' matched content:\\n{content_text}"
                    })
                else:
                    result_content.append({
                        "type": "text",
                        "text": f"Selector '{selector}' found no matches"
                    })
            else:
                # Extract main text content
                main_text = soup.get_text()
                # Clean excess whitespace
                cleaned_text = '\\n'.join(line.strip() for line in main_text.split('\\n') if line.strip())
                result_content.append({
                    "type": "text",
                    "text": f"Page text content:\\n{cleaned_text[:2000]}{'...' if len(cleaned_text) > 2000 else ''}"
                })

            # Extract links
            if extract_links:
                links = soup.find_all('a', href=True)
                if links:
                    links_text = "\\n".join([f"- {link.get_text().strip()}: {link['href']}" for link in links[:20]])
                    result_content.append({
                        "type": "text",
                        "text": f"\\nPage links:\\n{links_text}"
                    })

            return ToolResult(result_content)

        except ImportError:
            return ToolResult([{
                "type": "text",
                "text": "Web scraping requires beautifulsoup4 library: pip install beautifulsoup4"
            }], is_error=True)
        except Exception as e:
            return ToolResult([{
                "type": "text",
                "text": f"Web scraping failed: {str(e)}"
            }], is_error=True)

# Register network tools
http_request_tool = HTTPRequestTool()
web_scrape_tool = WebScrapeTool()

tool_registry.register(http_request_tool)
tool_registry.register(web_scrape_tool)

4. Data Processing Tools

# Data processing tool implementation
import json
import csv
import io
from typing import Union

class JSONProcessTool(MCPTool):
    """JSON data processing tool"""

    def __init__(self):
        super().__init__(
            name="json_process",
            description="Process JSON data: parse, format, query",
            input_schema={
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "JSON string or data"
                    },
                    "operation": {
                        "type": "string",
                        "description": "Operation type",
                        "enum": ["parse", "format", "minify", "query"],
                        "default": "format"
                    },
                    "query_path": {
                        "type": "string",
                        "description": "JSONPath query path (query operation only)"
                    },
                    "indent": {
                        "type": "number",
                        "description": "Format indent spaces",
                        "default": 2
                    }
                },
                "required": ["data"]
            }
        )
        self.metadata = {
            "category": "data_processing",
            "risk_level": "low"
        }

    async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
        """Execute JSON processing"""
        data = arguments["data"]
        operation = arguments.get("operation", "format")
        query_path = arguments.get("query_path")
        indent = arguments.get("indent", 2)

        try:
            # Parse JSON data
            if isinstance(data, str):
                json_data = json.loads(data)
            else:
                json_data = data

            result_text = ""

            if operation == "parse":
                result_text = f"JSON parsed successfully\\nData type: {type(json_data).__name__}\\n"
                if isinstance(json_data, dict):
                    result_text += f"Number of keys: {len(json_data)}\\n"
                    result_text += f"Key list: {list(json_data.keys())}"
                elif isinstance(json_data, list):
                    result_text += f"Array length: {len(json_data)}"

            elif operation == "format":
                formatted_json = json.dumps(json_data, indent=indent, ensure_ascii=False)
                result_text = f"Formatted JSON:\\n{formatted_json}"

            elif operation == "minify":
                minified_json = json.dumps(json_data, separators=(',', ':'), ensure_ascii=False)
                result_text = f"Minified JSON:\\n{minified_json}"

            elif operation == "query":
                if not query_path:
                    raise ValueError("query operation requires query_path parameter")

                # Simple JSONPath implementation
                result = self._json_path_query(json_data, query_path)
                result_text = f"Query path '{query_path}' result:\\n{json.dumps(result, indent=2, ensure_ascii=False)}"

            return ToolResult([{
                "type": "text",
                "text": result_text
            }])

        except json.JSONDecodeError as e:
            return ToolResult([{
                "type": "text",
                "text": f"JSON parse error: {str(e)}"
            }], is_error=True)
        except Exception as e:
            return ToolResult([{
                "type": "text",
                "text": f"JSON processing failed: {str(e)}"
            }], is_error=True)

    def _json_path_query(self, data: Any, path: str) -> Any:
        """Simple JSONPath query implementation"""
        parts = path.strip('$').split('.')
        current = data

        for part in parts:
            if part == '':
                continue

            if isinstance(current, dict):
                current = current.get(part)
            elif isinstance(current, list) and part.isdigit():
                index = int(part)
                current = current[index] if 0 <= index < len(current) else None
            else:
                return None

        return current

class CSVProcessTool(MCPTool):
    """CSV data processing tool"""

    def __init__(self):
        super().__init__(
            name="csv_process",
            description="Process CSV data: parse, convert, statistics",
            input_schema={
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "CSV string data"
                    },
                    "operation": {
                        "type": "string",
                        "description": "Operation type",
                        "enum": ["parse", "to_json", "stats", "filter"],
                        "default": "parse"
                    },
                    "delimiter": {
                        "type": "string",
                        "description": "CSV delimiter",
                        "default": ","
                    },
                    "has_header": {
                        "type": "boolean",
                        "description": "Has header row",
                        "default": True
                    },
                    "filter_column": {
                        "type": "string",
                        "description": "Filter column name (filter operation only)"
                    },
                    "filter_value": {
                        "type": "string",
                        "description": "Filter value (filter operation only)"
                    }
                },
                "required": ["data"]
            }
        )
        self.metadata = {
            "category": "data_processing",
            "risk_level": "low"
        }

    async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
        """Execute CSV processing"""
        data = arguments["data"]
        operation = arguments.get("operation", "parse")
        delimiter = arguments.get("delimiter", ",")
        has_header = arguments.get("has_header", True)
        filter_column = arguments.get("filter_column")
        filter_value = arguments.get("filter_value")

        try:
            # Parse CSV data
            csv_reader = csv.reader(io.StringIO(data), delimiter=delimiter)
            rows = list(csv_reader)

            if not rows:
                raise ValueError("CSV data is empty")

            headers = rows[0] if has_header else [f"Column{i+1}" for i in range(len(rows[0]))]
            data_rows = rows[1:] if has_header else rows

            result_text = ""

            if operation == "parse":
                result_text = f"CSV parse result:\\n"
                result_text += f"Number of columns: {len(headers)}\\n"
                result_text += f"Number of data rows: {len(data_rows)}\\n"
                result_text += f"Column names: {', '.join(headers)}\\n\\n"

                # Show first 5 rows
                result_text += "First 5 rows:\\n"
                for i, row in enumerate(data_rows[:5]):
                    result_text += f"Row {i+1}: {dict(zip(headers, row))}\\n"

            elif operation == "to_json":
                json_data = []
                for row in data_rows:
                    json_data.append(dict(zip(headers, row)))

                result_text = f"CSV to JSON result:\\n{json.dumps(json_data, indent=2, ensure_ascii=False)}"

            elif operation == "stats":
                result_text = f"CSV statistics:\\n"
                result_text += f"Total rows: {len(data_rows)}\\n"
                result_text += f"Total columns: {len(headers)}\\n\\n"

                # Column statistics
                for i, header in enumerate(headers):
                    column_values = [row[i] for row in data_rows if i < len(row)]
                    non_empty = [v for v in column_values if v.strip()]
                    result_text += f"Column '{header}': {len(non_empty)}/{len(column_values)} non-empty values\\n"

            elif operation == "filter":
                if not filter_column or filter_value is None:
                    raise ValueError("filter operation requires filter_column and filter_value parameters")

                if filter_column not in headers:
                    raise ValueError(f"Column '{filter_column}' does not exist")

                column_index = headers.index(filter_column)
                filtered_rows = [row for row in data_rows
                               if column_index < len(row) and filter_value in row[column_index]]

                result_text = f"Filter result (column '{filter_column}' contains '{filter_value}'):\\n"
                result_text += f"Matched rows: {len(filtered_rows)}\\n\\n"

                for i, row in enumerate(filtered_rows[:10]):  # Show first 10 rows
                    result_text += f"Row {i+1}: {dict(zip(headers, row))}\\n"

            return ToolResult([{
                "type": "text",
                "text": result_text
            }])

        except Exception as e:
            return ToolResult([{
                "type": "text",
                "text": f"CSV processing failed: {str(e)}"
            }], is_error=True)

# Register data processing tools
json_process_tool = JSONProcessTool()
csv_process_tool = CSVProcessTool()

tool_registry.register(json_process_tool)
tool_registry.register(csv_process_tool)

5. Tool Testing and Validation

# Tool testing framework
async def test_tools():
    """Test all registered tools"""
    print("🧪 Starting tool tests\\n")

    # 1. Test file read tool
    print("📄 Testing file read tool:")
    result = await tool_registry.execute_tool("file_read", {
        "path": "/etc/hosts"  # Test reading system hosts file
    })
    print(f"Result: {result.to_dict()}\\n")

    # 2. Test HTTP request tool
    print("🌐 Testing HTTP request tool:")
    result = await tool_registry.execute_tool("http_request", {
        "url": "https://httpbin.org/json",
        "method": "GET"
    })
    print(f"Result: {result.to_dict()}\\n")

    # 3. Test JSON processing tool
    print("📊 Testing JSON processing tool:")
    test_json = '{"name": "test", "data": [1, 2, 3]}'
    result = await tool_registry.execute_tool("json_process", {
        "data": test_json,
        "operation": "format"
    })
    print(f"Result: {result.to_dict()}\\n")

    # 4. Test tool list
    print("📋 Available tool list:")
    tools = tool_registry.list_tools()
    for tool in tools:
        print(f"- {tool['name']}: {tool['description']}")

    print("\\n✅ Tool tests completed")

# Run tests (uncomment for actual use)
# asyncio.run(test_tools())

Tool Security and Permission Control

Security Best Practices

Security Considerations
  1. Input Validation - Strictly validate all user inputs
  2. Path Security - Prevent directory traversal attacks
  3. Permission Control - Limit tool operation scope
  4. Resource Limits - Prevent resource exhaustion attacks
  5. Error Handling - Avoid information leakage
# Permission control and security mechanisms
class SecurityPolicy:
    """Security policy"""

    def __init__(self):
        self.allowed_paths = ["/tmp", "/home/user"]
        self.blocked_domains = ["localhost", "127.0.0.1"]
        self.max_file_size = 10 * 1024 * 1024  # 10MB
        self.max_request_size = 1024 * 1024    # 1MB

    def validate_file_path(self, path: str) -> bool:
        """Validate file path security"""
        normalized = os.path.normpath(path)

        # Check path traversal
        if ".." in normalized:
            return False

        # Check allowed path prefixes
        return any(normalized.startswith(allowed) for allowed in self.allowed_paths)

    def validate_url(self, url: str) -> bool:
        """Validate URL security"""
        parsed = urlparse(url)

        # Check protocol
        if parsed.scheme not in ['http', 'https']:
            return False

        # Check blocked domains
        return parsed.hostname not in self.blocked_domains

# Apply security policy to tools
security_policy = SecurityPolicy()

print("🔒 Tool security mechanisms configured")

Through this chapter, we have mastered the development methods of MCP Tools in depth, including tool definition, parameter validation, execution logic, and security control. These tools provide powerful external operation capabilities for AI models and are core functional components of MCP Server.