Chapter 5: Code Complexity Analysis
Haiyue
23min
Chapter 5: Code Complexity Analysis
Learning Objectives
- Understand the concept and calculation of cyclomatic complexity
- Master Pylint’s complexity checking features
- Learn to analyze and reduce code complexity
- Master design principles for functions and classes
Key Concepts
Fundamentals of Cyclomatic Complexity
Cyclomatic complexity is an important metric for measuring program complexity:
🔄 正在渲染 Mermaid 图表...
Pylint Complexity Check Types
| Check Type | Message ID | Default Threshold | Description |
|---|---|---|---|
| Cyclomatic Complexity | R0911 | 12 | Cyclomatic complexity of function/method |
| Function Length | R0915 | 50 | Number of statements in function |
| Parameter Count | R0913 | 5 | Number of function parameters |
| Local Variables | R0914 | 15 | Number of local variables in function |
| Branch Count | R0912 | 12 | Number of branches in function |
| Nesting Depth | R0101 | 5 | Nesting levels of code blocks |
Code Examples
Cyclomatic Complexity Analysis and Optimization
# High complexity code example (Cyclomatic Complexity = 8)
def process_user_data(user_data): # pylint: disable=too-many-branches
"""Process user data - high complexity version"""
if not user_data: # +1
return None
if 'email' not in user_data: # +1
if 'username' in user_data: # +1
email = f"{user_data['username']}@example.com"
else:
return None
else:
email = user_data['email']
if '@' not in email: # +1
return None
if user_data.get('age', 0) < 13: # +1
return None
elif user_data.get('age', 0) > 120: # +1
return None
if user_data.get('country') == 'CN': # +1
if user_data.get('id_card'): # +1
return {
'email': email,
'age': user_data['age'],
'verified': True
}
return {
'email': email,
'age': user_data.get('age', 0),
'verified': False
}
# Complexity analysis: Base complexity 1 + 8 branches = 9
# Optimized low complexity code (Cyclomatic Complexity = 4)
def process_user_data_optimized(user_data):
"""Process user data - optimized version"""
if not user_data: # +1
return None
email = _extract_email(user_data)
if not email: # +1
return None
age = user_data.get('age', 0)
if not _is_valid_age(age): # +1
return None
verified = _check_verification(user_data)
return {
'email': email,
'age': age,
'verified': verified
}
def _extract_email(user_data):
"""Extract user email"""
if 'email' in user_data:
email = user_data['email']
return email if '@' in email else None
if 'username' in user_data: # +1
return f"{user_data['username']}@example.com"
return None
def _is_valid_age(age):
"""Check age validity"""
return 13 <= age <= 120
def _check_verification(user_data):
"""Check user verification status"""
if user_data.get('country') == 'CN': # +1
return bool(user_data.get('id_card'))
return False
# Main function complexity: 1 + 3 = 4
# _extract_email complexity: 1 + 1 = 2
# _is_valid_age complexity: 1 (no branches)
# _check_verification complexity: 1 + 1 = 2
Function Length Optimization
# Too long function example (50+ statements)
def generate_report(data): # pylint: disable=too-many-statements
"""Generate report - too long version"""
# Data validation (10 lines)
if not data:
raise ValueError("Data cannot be empty")
if not isinstance(data, list):
raise TypeError("Data must be a list")
if len(data) == 0:
raise ValueError("Data list cannot be empty")
# Data preprocessing (15 lines)
cleaned_data = []
for item in data:
if isinstance(item, dict):
if 'value' in item and 'category' in item:
cleaned_item = {
'value': float(item['value']),
'category': str(item['category']).strip().lower(),
'timestamp': item.get('timestamp', 'unknown')
}
cleaned_data.append(cleaned_item)
# Statistical calculations (20 lines)
categories = {}
total_value = 0
max_value = float('-inf')
min_value = float('inf')
for item in cleaned_data:
category = item['category']
value = item['value']
if category not in categories:
categories[category] = {
'count': 0,
'total': 0,
'values': []
}
categories[category]['count'] += 1
categories[category]['total'] += value
categories[category]['values'].append(value)
total_value += value
max_value = max(max_value, value)
min_value = min(min_value, value)
# Report generation (15 lines)
report = {
'summary': {
'total_items': len(cleaned_data),
'total_value': total_value,
'average_value': total_value / len(cleaned_data),
'max_value': max_value,
'min_value': min_value,
'categories_count': len(categories)
},
'categories': {}
}
for category, stats in categories.items():
report['categories'][category] = {
'count': stats['count'],
'total': stats['total'],
'average': stats['total'] / stats['count'],
'max': max(stats['values']),
'min': min(stats['values'])
}
return report
# Optimized function split
class ReportGenerator:
"""Report generator"""
def generate_report(self, data):
"""Generate report - main entry point"""
validated_data = self._validate_data(data)
cleaned_data = self._preprocess_data(validated_data)
statistics = self._calculate_statistics(cleaned_data)
return self._format_report(statistics, cleaned_data)
def _validate_data(self, data):
"""Validate input data"""
if not data:
raise ValueError("Data cannot be empty")
if not isinstance(data, list):
raise TypeError("Data must be a list")
if len(data) == 0:
raise ValueError("Data list cannot be empty")
return data
def _preprocess_data(self, data):
"""Preprocess data"""
cleaned_data = []
for item in data:
if self._is_valid_item(item):
cleaned_item = self._clean_item(item)
cleaned_data.append(cleaned_item)
return cleaned_data
def _is_valid_item(self, item):
"""Check if data item is valid"""
return (isinstance(item, dict) and
'value' in item and
'category' in item)
def _clean_item(self, item):
"""Clean individual data item"""
return {
'value': float(item['value']),
'category': str(item['category']).strip().lower(),
'timestamp': item.get('timestamp', 'unknown')
}
def _calculate_statistics(self, data):
"""Calculate statistics"""
categories = {}
basic_stats = self._calculate_basic_stats(data)
for item in data:
category = item['category']
if category not in categories:
categories[category] = {
'count': 0,
'total': 0,
'values': []
}
categories[category]['count'] += 1
categories[category]['total'] += item['value']
categories[category]['values'].append(item['value'])
return {
'basic': basic_stats,
'categories': categories
}
def _calculate_basic_stats(self, data):
"""Calculate basic statistics"""
values = [item['value'] for item in data]
return {
'total_items': len(data),
'total_value': sum(values),
'average_value': sum(values) / len(data),
'max_value': max(values),
'min_value': min(values)
}
def _format_report(self, statistics, data):
"""Format report output"""
report = {
'summary': {
**statistics['basic'],
'categories_count': len(statistics['categories'])
},
'categories': {}
}
for category, stats in statistics['categories'].items():
report['categories'][category] = {
'count': stats['count'],
'total': stats['total'],
'average': stats['total'] / stats['count'],
'max': max(stats['values']),
'min': min(stats['values'])
}
return report
Parameter Count Optimization
# Function with too many parameters
def create_user_account( # pylint: disable=too-many-arguments
username, email, password, first_name, last_name,
age, country, phone, address, postal_code,
newsletter_subscription=True, marketing_emails=False):
"""Create user account - too many parameters version"""
# Implementation code...
pass
# Optimization solution 1: Use dataclass
from dataclasses import dataclass
from typing import Optional
@dataclass
class UserProfile:
"""User profile information"""
username: str
email: str
password: str
first_name: str
last_name: str
age: int
country: str
phone: Optional[str] = None
address: Optional[str] = None
postal_code: Optional[str] = None
@dataclass
class UserPreferences:
"""User preference settings"""
newsletter_subscription: bool = True
marketing_emails: bool = False
def create_user_account_optimized(
profile: UserProfile,
preferences: UserPreferences = None):
"""Create user account - optimized version"""
if preferences is None:
preferences = UserPreferences()
# Implementation code...
return {
'profile': profile,
'preferences': preferences,
'status': 'created'
}
# Usage example
profile = UserProfile(
username="johndoe",
email="john@example.com",
password="secure_password",
first_name="John",
last_name="Doe",
age=30,
country="US"
)
preferences = UserPreferences(
newsletter_subscription=False,
marketing_emails=True
)
user = create_user_account_optimized(profile, preferences)
# Optimization solution 2: Use configuration dictionary
def create_user_account_dict(user_data: dict, preferences: dict = None):
"""Create user account - dictionary parameter version"""
# Required field validation
required_fields = [
'username', 'email', 'password', 'first_name',
'last_name', 'age', 'country'
]
for field in required_fields:
if field not in user_data:
raise ValueError(f"Missing required field: {field}")
# Default preference settings
default_preferences = {
'newsletter_subscription': True,
'marketing_emails': False
}
if preferences:
default_preferences.update(preferences)
# Create account logic
return {
'user_data': user_data,
'preferences': default_preferences,
'status': 'created'
}
# Usage example
user_info = {
'username': 'johndoe',
'email': 'john@example.com',
'password': 'secure_password',
'first_name': 'John',
'last_name': 'Doe',
'age': 30,
'country': 'US',
'phone': '+1234567890'
}
user_prefs = {
'newsletter_subscription': False,
'marketing_emails': True
}
user = create_user_account_dict(user_info, user_prefs)
Local Variable Count Optimization
# Function with too many local variables
def calculate_financial_metrics(data): # pylint: disable=too-many-locals
"""Calculate financial metrics - too many variables version"""
# Basic data
revenue = data['revenue']
costs = data['costs']
expenses = data['expenses']
assets = data['assets']
liabilities = data['liabilities']
# Intermediate calculations
gross_profit = revenue - costs
net_profit = gross_profit - expenses
total_equity = assets - liabilities
# Ratio calculations
gross_margin = gross_profit / revenue if revenue > 0 else 0
net_margin = net_profit / revenue if revenue > 0 else 0
profit_margin = net_profit / revenue if revenue > 0 else 0
# ROI related
roa = net_profit / assets if assets > 0 else 0
roe = net_profit / total_equity if total_equity > 0 else 0
# Liquidity indicators
current_ratio = data.get('current_assets', 0) / data.get('current_liabilities', 1)
quick_ratio = (data.get('current_assets', 0) - data.get('inventory', 0)) / data.get('current_liabilities', 1)
# Efficiency indicators
asset_turnover = revenue / assets if assets > 0 else 0
equity_turnover = revenue / total_equity if total_equity > 0 else 0
return {
'profitability': {
'gross_margin': gross_margin,
'net_margin': net_margin,
'profit_margin': profit_margin
},
'returns': {
'roa': roa,
'roe': roe
},
'liquidity': {
'current_ratio': current_ratio,
'quick_ratio': quick_ratio
},
'efficiency': {
'asset_turnover': asset_turnover,
'equity_turnover': equity_turnover
}
}
# Optimization: Use class and method splitting
class FinancialMetricsCalculator:
"""Financial metrics calculator"""
def __init__(self, financial_data):
"""Initialize financial data"""
self.data = financial_data
self._validate_data()
def _validate_data(self):
"""Validate financial data completeness"""
required_fields = ['revenue', 'costs', 'expenses', 'assets', 'liabilities']
for field in required_fields:
if field not in self.data:
raise ValueError(f"Missing required financial data field: {field}")
def calculate_all_metrics(self):
"""Calculate all financial metrics"""
return {
'profitability': self.calculate_profitability(),
'returns': self.calculate_returns(),
'liquidity': self.calculate_liquidity(),
'efficiency': self.calculate_efficiency()
}
def calculate_profitability(self):
"""Calculate profitability metrics"""
revenue = self.data['revenue']
costs = self.data['costs']
expenses = self.data['expenses']
gross_profit = revenue - costs
net_profit = gross_profit - expenses
return {
'gross_margin': self._safe_divide(gross_profit, revenue),
'net_margin': self._safe_divide(net_profit, revenue),
'profit_margin': self._safe_divide(net_profit, revenue)
}
def calculate_returns(self):
"""Calculate return on investment metrics"""
net_profit = self._get_net_profit()
assets = self.data['assets']
equity = self._get_total_equity()
return {
'roa': self._safe_divide(net_profit, assets),
'roe': self._safe_divide(net_profit, equity)
}
def calculate_liquidity(self):
"""Calculate liquidity metrics"""
current_assets = self.data.get('current_assets', 0)
current_liabilities = self.data.get('current_liabilities', 1)
inventory = self.data.get('inventory', 0)
return {
'current_ratio': self._safe_divide(current_assets, current_liabilities),
'quick_ratio': self._safe_divide(current_assets - inventory, current_liabilities)
}
def calculate_efficiency(self):
"""Calculate efficiency metrics"""
revenue = self.data['revenue']
assets = self.data['assets']
equity = self._get_total_equity()
return {
'asset_turnover': self._safe_divide(revenue, assets),
'equity_turnover': self._safe_divide(revenue, equity)
}
def _get_net_profit(self):
"""Get net profit"""
return self.data['revenue'] - self.data['costs'] - self.data['expenses']
def _get_total_equity(self):
"""Get total equity"""
return self.data['assets'] - self.data['liabilities']
@staticmethod
def _safe_divide(numerator, denominator):
"""Safe division to avoid division by zero error"""
return numerator / denominator if denominator != 0 else 0
# Usage example
financial_data = {
'revenue': 1000000,
'costs': 600000,
'expenses': 200000,
'assets': 1500000,
'liabilities': 800000,
'current_assets': 500000,
'current_liabilities': 300000,
'inventory': 100000
}
calculator = FinancialMetricsCalculator(financial_data)
metrics = calculator.calculate_all_metrics()
Nesting Depth Optimization
# Deeply nested code
def process_nested_data(data): # pylint: disable=too-many-nested-blocks
"""Process nested data - deep nesting version"""
results = []
if data:
for category in data:
if 'items' in category:
for item in category['items']:
if 'active' in item and item['active']:
if 'price' in item:
if item['price'] > 0:
if 'discount' in item:
if item['discount'] > 0:
final_price = item['price'] * (1 - item['discount'])
if final_price > 10:
results.append({
'name': item.get('name', 'Unknown'),
'final_price': final_price,
'category': category.get('name', 'Uncategorized')
})
return results
# Optimization: Reduce nesting depth
def process_nested_data_optimized(data):
"""Process nested data - optimized version"""
if not data:
return []
results = []
for category in data:
category_results = _process_category(category)
results.extend(category_results)
return results
def _process_category(category):
"""Process single category"""
if 'items' not in category:
return []
results = []
for item in category['items']:
processed_item = _process_item(item, category)
if processed_item:
results.append(processed_item)
return results
def _process_item(item, category):
"""Process single item"""
# Early return to reduce nesting
if not _is_valid_item(item):
return None
final_price = _calculate_final_price(item)
if final_price is None or final_price <= 10:
return None
return {
'name': item.get('name', 'Unknown'),
'final_price': final_price,
'category': category.get('name', 'Uncategorized')
}
def _is_valid_item(item):
"""Check if item is valid"""
return (item.get('active', False) and
'price' in item and
item['price'] > 0)
def _calculate_final_price(item):
"""Calculate final price"""
price = item['price']
discount = item.get('discount', 0)
if discount <= 0:
return price
return price * (1 - discount)
Configuring Complexity Thresholds
# .pylintrc configuration example
"""
[DESIGN]
# Maximum function parameter count
max-args = 5
# Maximum local variable count
max-locals = 15
# Maximum return statement count
max-returns = 6
# Maximum branch count
max-branches = 12
# Maximum statement count
max-statements = 50
# Maximum parent class count
max-parents = 7
# Maximum attribute count
max-attributes = 7
# Minimum public method count
min-public-methods = 2
# Maximum public method count
max-public-methods = 20
# Maximum boolean expression count
max-bool-expr = 5
"""
# Project-specific configuration example
def configure_complexity_for_project():
"""Project-specific complexity configuration example"""
# For data processing projects, may need more relaxed configuration
data_processing_config = {
'max-args': 8, # Data processing functions may need more parameters
'max-locals': 20, # Data transformation may need more local variables
'max-statements': 60, # Data processing flow may be longer
}
# For web API projects, stricter configuration
web_api_config = {
'max-args': 4, # API functions should remain simple
'max-locals': 10, # Reduce local variable count
'max-statements': 30, # Keep functions concise
}
# For algorithm implementation, may need special configuration
algorithm_config = {
'max-branches': 15, # Algorithms may have more branches
'max-statements': 80, # Algorithm implementation may be longer
'max-bool-expr': 8, # Complex conditional checks
}
return {
'data_processing': data_processing_config,
'web_api': web_api_config,
'algorithm': algorithm_config
}
Complexity Optimization Strategies
- Function Splitting: Break down large functions into multiple smaller functions
- Extract Methods: Extract repeated logic into independent methods
- Early Returns: Use early returns to reduce nesting
- Data Structure Optimization: Use appropriate data structures to reduce complexity
- Design Patterns: Apply design patterns to simplify complex logic
Precautions
- Don’t Over-Split: Avoid creating too many tiny functions
- Maintain Logical Integrity: Keep related logic together when splitting
- Consider Readability: Optimized code should be easier to understand
- Balance Performance: Avoid excessive splitting affecting performance
Code complexity control is an important means of improving code quality. Through reasonable refactoring and design, the maintainability and readability of code can be significantly improved.