Chapter 4: Tools Development
Haiyue
36min
Chapter 4: Tools Development
Learning Objectives
- Understand the concept and mechanism of Tools
- Learn how to define tool schemas and parameters
- Master tool execution and result handling
- Implement common tool types (file operations, network requests, etc.)
- Learn error handling and validation for tools
Knowledge Summary
Tools Concept Explained
Tools is one of the core features of the MCP protocol, allowing AI models to invoke external tools to perform specific operations. Tools provide a standardized way for AI to interact with external systems.
🔄 正在渲染 Mermaid 图表...
Tool Type Classification
| Tool Type | Main Functions | Use Cases | Security Level |
|---|---|---|---|
| File Operations | Read/write, create, delete files | Document processing, configuration management | Medium |
| Network Requests | HTTP/API calls | Data retrieval, service integration | High |
| System Commands | Execute system commands | Build, deploy, monitoring | Very High |
| Data Processing | Compute, transform, analyze | Data science, report generation | Low |
| Database Operations | Query, update database | Data management, analysis | High |
Tool Schema Specification
Schema Elements
Each tool must define:
- name - Unique tool identifier
- description - Tool function description
- inputSchema - JSON Schema for input parameters
- metadata - Optional metadata information
Tool Development Practice
1. Basic Tool Framework
# Basic tool development framework
import json
import asyncio
from typing import Dict, Any, List, Optional, Callable
from abc import ABC, abstractmethod
import jsonschema
from jsonschema import validate, ValidationError
class ToolResult:
"""Tool execution result"""
def __init__(self, content: List[Dict[str, Any]], is_error: bool = False):
self.content = content
self.is_error = is_error
def to_dict(self) -> Dict[str, Any]:
return {
"content": self.content,
"isError": self.is_error
}
class MCPTool(ABC):
"""MCP tool base class"""
def __init__(self, name: str, description: str, input_schema: Dict[str, Any]):
self.name = name
self.description = description
self.input_schema = input_schema
self.metadata = {}
@abstractmethod
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute tool logic"""
pass
def validate_arguments(self, arguments: Dict[str, Any]) -> bool:
"""Validate if arguments conform to schema"""
try:
validate(instance=arguments, schema=self.input_schema)
return True
except ValidationError as e:
raise ValueError(f"Parameter validation failed: {e.message}")
def get_definition(self) -> Dict[str, Any]:
"""Get tool definition"""
definition = {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema
}
if self.metadata:
definition["metadata"] = self.metadata
return definition
class ToolRegistry:
"""Tool registry"""
def __init__(self):
self.tools: Dict[str, MCPTool] = {}
def register(self, tool: MCPTool) -> None:
"""Register tool"""
self.tools[tool.name] = tool
print(f"✅ Registered tool: {tool.name}")
def get_tool(self, name: str) -> Optional[MCPTool]:
"""Get tool"""
return self.tools.get(name)
def list_tools(self) -> List[Dict[str, Any]]:
"""List all tools"""
return [tool.get_definition() for tool in self.tools.values()]
async def execute_tool(self, name: str, arguments: Dict[str, Any]) -> ToolResult:
"""Execute tool"""
tool = self.get_tool(name)
if not tool:
return ToolResult([{
"type": "text",
"text": f"Tool '{name}' does not exist"
}], is_error=True)
try:
# Validate arguments
tool.validate_arguments(arguments)
# Execute tool
result = await tool.execute(arguments)
return result
except Exception as e:
return ToolResult([{
"type": "text",
"text": f"Tool execution failed: {str(e)}"
}], is_error=True)
# Create global tool registry
tool_registry = ToolRegistry()
2. File Operation Tools
# File operation tool implementation
import os
import pathlib
from typing import Union
class FileReadTool(MCPTool):
"""File read tool"""
def __init__(self):
super().__init__(
name="file_read",
description="Read the contents of a specified file",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File path to read"
},
"encoding": {
"type": "string",
"description": "File encoding",
"default": "utf-8"
}
},
"required": ["path"]
}
)
self.metadata = {
"category": "file_operations",
"risk_level": "medium"
}
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute file read"""
path = arguments["path"]
encoding = arguments.get("encoding", "utf-8")
try:
# Security check: prevent path traversal attacks
normalized_path = os.path.normpath(path)
if ".." in normalized_path:
raise ValueError("Unsafe file path")
# Check if file exists
if not os.path.exists(normalized_path):
raise FileNotFoundError(f"File does not exist: {normalized_path}")
# Read file content
with open(normalized_path, 'r', encoding=encoding) as file:
content = file.read()
return ToolResult([{
"type": "text",
"text": content
}])
except Exception as e:
return ToolResult([{
"type": "text",
"text": f"Failed to read file: {str(e)}"
}], is_error=True)
class FileWriteTool(MCPTool):
"""File write tool"""
def __init__(self):
super().__init__(
name="file_write",
description="Write content to a specified file",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File path to write to"
},
"content": {
"type": "string",
"description": "Content to write"
},
"encoding": {
"type": "string",
"description": "File encoding",
"default": "utf-8"
},
"append": {
"type": "boolean",
"description": "Append mode",
"default": False
}
},
"required": ["path", "content"]
}
)
self.metadata = {
"category": "file_operations",
"risk_level": "high"
}
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute file write"""
path = arguments["path"]
content = arguments["content"]
encoding = arguments.get("encoding", "utf-8")
append_mode = arguments.get("append", False)
try:
# Security check
normalized_path = os.path.normpath(path)
if ".." in normalized_path:
raise ValueError("Unsafe file path")
# Ensure directory exists
directory = os.path.dirname(normalized_path)
if directory and not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
# Write file
mode = 'a' if append_mode else 'w'
with open(normalized_path, mode, encoding=encoding) as file:
file.write(content)
operation = "appended to" if append_mode else "written to"
return ToolResult([{
"type": "text",
"text": f"Successfully {operation} file: {normalized_path}"
}])
except Exception as e:
return ToolResult([{
"type": "text",
"text": f"Failed to write file: {str(e)}"
}], is_error=True)
class FileListTool(MCPTool):
"""File list tool"""
def __init__(self):
super().__init__(
name="file_list",
description="List files and folders in a specified directory",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory path to list",
"default": "."
},
"include_hidden": {
"type": "boolean",
"description": "Include hidden files",
"default": False
},
"recursive": {
"type": "boolean",
"description": "Recursively list subdirectories",
"default": False
}
}
}
)
self.metadata = {
"category": "file_operations",
"risk_level": "low"
}
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute file list"""
path = arguments.get("path", ".")
include_hidden = arguments.get("include_hidden", False)
recursive = arguments.get("recursive", False)
try:
normalized_path = os.path.normpath(path)
if not os.path.exists(normalized_path):
raise FileNotFoundError(f"Directory does not exist: {normalized_path}")
if not os.path.isdir(normalized_path):
raise ValueError(f"Path is not a directory: {normalized_path}")
files_info = []
if recursive:
for root, dirs, files in os.walk(normalized_path):
# Process directories
for dir_name in dirs:
if include_hidden or not dir_name.startswith('.'):
full_path = os.path.join(root, dir_name)
rel_path = os.path.relpath(full_path, normalized_path)
files_info.append({
"name": rel_path,
"type": "directory",
"size": None
})
# Process files
for file_name in files:
if include_hidden or not file_name.startswith('.'):
full_path = os.path.join(root, file_name)
rel_path = os.path.relpath(full_path, normalized_path)
stat_info = os.stat(full_path)
files_info.append({
"name": rel_path,
"type": "file",
"size": stat_info.st_size,
"modified": stat_info.st_mtime
})
else:
for item in os.listdir(normalized_path):
if include_hidden or not item.startswith('.'):
item_path = os.path.join(normalized_path, item)
stat_info = os.stat(item_path)
files_info.append({
"name": item,
"type": "directory" if os.path.isdir(item_path) else "file",
"size": stat_info.st_size if os.path.isfile(item_path) else None,
"modified": stat_info.st_mtime
})
# Sort by name
files_info.sort(key=lambda x: x["name"])
# Format output
result_text = f"Directory: {normalized_path}\\n"
result_text += f"Found {len(files_info)} items\\n\\n"
for info in files_info:
type_icon = "📁" if info["type"] == "directory" else "📄"
size_info = f" ({info['size']} bytes)" if info["size"] is not None else ""
result_text += f"{type_icon} {info['name']}{size_info}\\n"
return ToolResult([{
"type": "text",
"text": result_text
}])
except Exception as e:
return ToolResult([{
"type": "text",
"text": f"Failed to list files: {str(e)}"
}], is_error=True)
# Register file operation tools
file_read_tool = FileReadTool()
file_write_tool = FileWriteTool()
file_list_tool = FileListTool()
tool_registry.register(file_read_tool)
tool_registry.register(file_write_tool)
tool_registry.register(file_list_tool)
3. Network Request Tools
# Network request tool implementation
import aiohttp
import json
from urllib.parse import urlparse
class HTTPRequestTool(MCPTool):
"""HTTP request tool"""
def __init__(self):
super().__init__(
name="http_request",
description="Send HTTP request and return response",
input_schema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "Request URL"
},
"method": {
"type": "string",
"description": "HTTP method",
"enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
"default": "GET"
},
"headers": {
"type": "object",
"description": "Request headers",
"default": {}
},
"data": {
"type": "object",
"description": "Request data (JSON)"
},
"timeout": {
"type": "number",
"description": "Timeout (seconds)",
"default": 30
}
},
"required": ["url"]
}
)
self.metadata = {
"category": "network",
"risk_level": "high"
}
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute HTTP request"""
url = arguments["url"]
method = arguments.get("method", "GET").upper()
headers = arguments.get("headers", {})
data = arguments.get("data")
timeout = arguments.get("timeout", 30)
try:
# URL security check
parsed_url = urlparse(url)
if parsed_url.scheme not in ['http', 'https']:
raise ValueError("Only HTTP/HTTPS protocols are supported")
# Set default User-Agent
if 'User-Agent' not in headers:
headers['User-Agent'] = 'MCP-Server/1.0'
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
async with session.request(
method=method,
url=url,
headers=headers,
json=data if data else None
) as response:
# Get response content
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
response_data = await response.json()
response_text = json.dumps(response_data, indent=2, ensure_ascii=False)
else:
response_text = await response.text()
# Build result
result_content = []
# Status information
status_text = f"HTTP {response.status} {response.reason}\\n"
status_text += f"URL: {url}\\n"
status_text += f"Method: {method}\\n"
status_text += f"Content-Type: {content_type}\\n\\n"
result_content.append({
"type": "text",
"text": status_text
})
# Response body
if response_text:
result_content.append({
"type": "text",
"text": f"Response content:\\n{response_text}"
})
return ToolResult(result_content)
except Exception as e:
return ToolResult([{
"type": "text",
"text": f"HTTP request failed: {str(e)}"
}], is_error=True)
class WebScrapeTool(MCPTool):
"""Web scraping tool"""
def __init__(self):
super().__init__(
name="web_scrape",
description="Scrape web page content and extract text",
input_schema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "Web page URL to scrape"
},
"selector": {
"type": "string",
"description": "CSS selector (optional)"
},
"extract_links": {
"type": "boolean",
"description": "Extract links",
"default": False
}
},
"required": ["url"]
}
)
self.metadata = {
"category": "network",
"risk_level": "high"
}
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute web scraping"""
url = arguments["url"]
selector = arguments.get("selector")
extract_links = arguments.get("extract_links", False)
try:
from bs4 import BeautifulSoup
# Send HTTP request
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"HTTP {response.status}: {response.reason}")
html_content = await response.text()
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
result_content = []
# Basic information
title = soup.find('title')
if title:
result_content.append({
"type": "text",
"text": f"Page title: {title.get_text()}\\n"
})
# Extract content based on selector
if selector:
elements = soup.select(selector)
if elements:
content_text = "\\n".join([elem.get_text().strip() for elem in elements])
result_content.append({
"type": "text",
"text": f"Selector '{selector}' matched content:\\n{content_text}"
})
else:
result_content.append({
"type": "text",
"text": f"Selector '{selector}' found no matches"
})
else:
# Extract main text content
main_text = soup.get_text()
# Clean excess whitespace
cleaned_text = '\\n'.join(line.strip() for line in main_text.split('\\n') if line.strip())
result_content.append({
"type": "text",
"text": f"Page text content:\\n{cleaned_text[:2000]}{'...' if len(cleaned_text) > 2000 else ''}"
})
# Extract links
if extract_links:
links = soup.find_all('a', href=True)
if links:
links_text = "\\n".join([f"- {link.get_text().strip()}: {link['href']}" for link in links[:20]])
result_content.append({
"type": "text",
"text": f"\\nPage links:\\n{links_text}"
})
return ToolResult(result_content)
except ImportError:
return ToolResult([{
"type": "text",
"text": "Web scraping requires beautifulsoup4 library: pip install beautifulsoup4"
}], is_error=True)
except Exception as e:
return ToolResult([{
"type": "text",
"text": f"Web scraping failed: {str(e)}"
}], is_error=True)
# Register network tools
http_request_tool = HTTPRequestTool()
web_scrape_tool = WebScrapeTool()
tool_registry.register(http_request_tool)
tool_registry.register(web_scrape_tool)
4. Data Processing Tools
# Data processing tool implementation
import json
import csv
import io
from typing import Union
class JSONProcessTool(MCPTool):
"""JSON data processing tool"""
def __init__(self):
super().__init__(
name="json_process",
description="Process JSON data: parse, format, query",
input_schema={
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "JSON string or data"
},
"operation": {
"type": "string",
"description": "Operation type",
"enum": ["parse", "format", "minify", "query"],
"default": "format"
},
"query_path": {
"type": "string",
"description": "JSONPath query path (query operation only)"
},
"indent": {
"type": "number",
"description": "Format indent spaces",
"default": 2
}
},
"required": ["data"]
}
)
self.metadata = {
"category": "data_processing",
"risk_level": "low"
}
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute JSON processing"""
data = arguments["data"]
operation = arguments.get("operation", "format")
query_path = arguments.get("query_path")
indent = arguments.get("indent", 2)
try:
# Parse JSON data
if isinstance(data, str):
json_data = json.loads(data)
else:
json_data = data
result_text = ""
if operation == "parse":
result_text = f"JSON parsed successfully\\nData type: {type(json_data).__name__}\\n"
if isinstance(json_data, dict):
result_text += f"Number of keys: {len(json_data)}\\n"
result_text += f"Key list: {list(json_data.keys())}"
elif isinstance(json_data, list):
result_text += f"Array length: {len(json_data)}"
elif operation == "format":
formatted_json = json.dumps(json_data, indent=indent, ensure_ascii=False)
result_text = f"Formatted JSON:\\n{formatted_json}"
elif operation == "minify":
minified_json = json.dumps(json_data, separators=(',', ':'), ensure_ascii=False)
result_text = f"Minified JSON:\\n{minified_json}"
elif operation == "query":
if not query_path:
raise ValueError("query operation requires query_path parameter")
# Simple JSONPath implementation
result = self._json_path_query(json_data, query_path)
result_text = f"Query path '{query_path}' result:\\n{json.dumps(result, indent=2, ensure_ascii=False)}"
return ToolResult([{
"type": "text",
"text": result_text
}])
except json.JSONDecodeError as e:
return ToolResult([{
"type": "text",
"text": f"JSON parse error: {str(e)}"
}], is_error=True)
except Exception as e:
return ToolResult([{
"type": "text",
"text": f"JSON processing failed: {str(e)}"
}], is_error=True)
def _json_path_query(self, data: Any, path: str) -> Any:
"""Simple JSONPath query implementation"""
parts = path.strip('$').split('.')
current = data
for part in parts:
if part == '':
continue
if isinstance(current, dict):
current = current.get(part)
elif isinstance(current, list) and part.isdigit():
index = int(part)
current = current[index] if 0 <= index < len(current) else None
else:
return None
return current
class CSVProcessTool(MCPTool):
"""CSV data processing tool"""
def __init__(self):
super().__init__(
name="csv_process",
description="Process CSV data: parse, convert, statistics",
input_schema={
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "CSV string data"
},
"operation": {
"type": "string",
"description": "Operation type",
"enum": ["parse", "to_json", "stats", "filter"],
"default": "parse"
},
"delimiter": {
"type": "string",
"description": "CSV delimiter",
"default": ","
},
"has_header": {
"type": "boolean",
"description": "Has header row",
"default": True
},
"filter_column": {
"type": "string",
"description": "Filter column name (filter operation only)"
},
"filter_value": {
"type": "string",
"description": "Filter value (filter operation only)"
}
},
"required": ["data"]
}
)
self.metadata = {
"category": "data_processing",
"risk_level": "low"
}
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute CSV processing"""
data = arguments["data"]
operation = arguments.get("operation", "parse")
delimiter = arguments.get("delimiter", ",")
has_header = arguments.get("has_header", True)
filter_column = arguments.get("filter_column")
filter_value = arguments.get("filter_value")
try:
# Parse CSV data
csv_reader = csv.reader(io.StringIO(data), delimiter=delimiter)
rows = list(csv_reader)
if not rows:
raise ValueError("CSV data is empty")
headers = rows[0] if has_header else [f"Column{i+1}" for i in range(len(rows[0]))]
data_rows = rows[1:] if has_header else rows
result_text = ""
if operation == "parse":
result_text = f"CSV parse result:\\n"
result_text += f"Number of columns: {len(headers)}\\n"
result_text += f"Number of data rows: {len(data_rows)}\\n"
result_text += f"Column names: {', '.join(headers)}\\n\\n"
# Show first 5 rows
result_text += "First 5 rows:\\n"
for i, row in enumerate(data_rows[:5]):
result_text += f"Row {i+1}: {dict(zip(headers, row))}\\n"
elif operation == "to_json":
json_data = []
for row in data_rows:
json_data.append(dict(zip(headers, row)))
result_text = f"CSV to JSON result:\\n{json.dumps(json_data, indent=2, ensure_ascii=False)}"
elif operation == "stats":
result_text = f"CSV statistics:\\n"
result_text += f"Total rows: {len(data_rows)}\\n"
result_text += f"Total columns: {len(headers)}\\n\\n"
# Column statistics
for i, header in enumerate(headers):
column_values = [row[i] for row in data_rows if i < len(row)]
non_empty = [v for v in column_values if v.strip()]
result_text += f"Column '{header}': {len(non_empty)}/{len(column_values)} non-empty values\\n"
elif operation == "filter":
if not filter_column or filter_value is None:
raise ValueError("filter operation requires filter_column and filter_value parameters")
if filter_column not in headers:
raise ValueError(f"Column '{filter_column}' does not exist")
column_index = headers.index(filter_column)
filtered_rows = [row for row in data_rows
if column_index < len(row) and filter_value in row[column_index]]
result_text = f"Filter result (column '{filter_column}' contains '{filter_value}'):\\n"
result_text += f"Matched rows: {len(filtered_rows)}\\n\\n"
for i, row in enumerate(filtered_rows[:10]): # Show first 10 rows
result_text += f"Row {i+1}: {dict(zip(headers, row))}\\n"
return ToolResult([{
"type": "text",
"text": result_text
}])
except Exception as e:
return ToolResult([{
"type": "text",
"text": f"CSV processing failed: {str(e)}"
}], is_error=True)
# Register data processing tools
json_process_tool = JSONProcessTool()
csv_process_tool = CSVProcessTool()
tool_registry.register(json_process_tool)
tool_registry.register(csv_process_tool)
5. Tool Testing and Validation
# Tool testing framework
async def test_tools():
"""Test all registered tools"""
print("🧪 Starting tool tests\\n")
# 1. Test file read tool
print("📄 Testing file read tool:")
result = await tool_registry.execute_tool("file_read", {
"path": "/etc/hosts" # Test reading system hosts file
})
print(f"Result: {result.to_dict()}\\n")
# 2. Test HTTP request tool
print("🌐 Testing HTTP request tool:")
result = await tool_registry.execute_tool("http_request", {
"url": "https://httpbin.org/json",
"method": "GET"
})
print(f"Result: {result.to_dict()}\\n")
# 3. Test JSON processing tool
print("📊 Testing JSON processing tool:")
test_json = '{"name": "test", "data": [1, 2, 3]}'
result = await tool_registry.execute_tool("json_process", {
"data": test_json,
"operation": "format"
})
print(f"Result: {result.to_dict()}\\n")
# 4. Test tool list
print("📋 Available tool list:")
tools = tool_registry.list_tools()
for tool in tools:
print(f"- {tool['name']}: {tool['description']}")
print("\\n✅ Tool tests completed")
# Run tests (uncomment for actual use)
# asyncio.run(test_tools())
Tool Security and Permission Control
Security Best Practices
Security Considerations
- Input Validation - Strictly validate all user inputs
- Path Security - Prevent directory traversal attacks
- Permission Control - Limit tool operation scope
- Resource Limits - Prevent resource exhaustion attacks
- Error Handling - Avoid information leakage
# Permission control and security mechanisms
class SecurityPolicy:
"""Security policy"""
def __init__(self):
self.allowed_paths = ["/tmp", "/home/user"]
self.blocked_domains = ["localhost", "127.0.0.1"]
self.max_file_size = 10 * 1024 * 1024 # 10MB
self.max_request_size = 1024 * 1024 # 1MB
def validate_file_path(self, path: str) -> bool:
"""Validate file path security"""
normalized = os.path.normpath(path)
# Check path traversal
if ".." in normalized:
return False
# Check allowed path prefixes
return any(normalized.startswith(allowed) for allowed in self.allowed_paths)
def validate_url(self, url: str) -> bool:
"""Validate URL security"""
parsed = urlparse(url)
# Check protocol
if parsed.scheme not in ['http', 'https']:
return False
# Check blocked domains
return parsed.hostname not in self.blocked_domains
# Apply security policy to tools
security_policy = SecurityPolicy()
print("🔒 Tool security mechanisms configured")
Through this chapter, we have mastered the development methods of MCP Tools in depth, including tool definition, parameter validation, execution logic, and security control. These tools provide powerful external operation capabilities for AI models and are core functional components of MCP Server.