Chapter 5: Code Complexity Analysis

Haiyue
23min

Chapter 5: Code Complexity Analysis

Learning Objectives
  • Understand the concept and calculation of cyclomatic complexity
  • Master Pylint’s complexity checking features
  • Learn to analyze and reduce code complexity
  • Master design principles for functions and classes

Key Concepts

Fundamentals of Cyclomatic Complexity

Cyclomatic complexity is an important metric for measuring program complexity:

🔄 正在渲染 Mermaid 图表...

Pylint Complexity Check Types

Check TypeMessage IDDefault ThresholdDescription
Cyclomatic ComplexityR091112Cyclomatic complexity of function/method
Function LengthR091550Number of statements in function
Parameter CountR09135Number of function parameters
Local VariablesR091415Number of local variables in function
Branch CountR091212Number of branches in function
Nesting DepthR01015Nesting levels of code blocks

Code Examples

Cyclomatic Complexity Analysis and Optimization

# High complexity code example (Cyclomatic Complexity = 8)
def process_user_data(user_data):  # pylint: disable=too-many-branches
    """Process user data - high complexity version"""
    if not user_data:  # +1
        return None

    if 'email' not in user_data:  # +1
        if 'username' in user_data:  # +1
            email = f"{user_data['username']}@example.com"
        else:
            return None
    else:
        email = user_data['email']

    if '@' not in email:  # +1
        return None

    if user_data.get('age', 0) < 13:  # +1
        return None
    elif user_data.get('age', 0) > 120:  # +1
        return None

    if user_data.get('country') == 'CN':  # +1
        if user_data.get('id_card'):  # +1
            return {
                'email': email,
                'age': user_data['age'],
                'verified': True
            }

    return {
        'email': email,
        'age': user_data.get('age', 0),
        'verified': False
    }

# Complexity analysis: Base complexity 1 + 8 branches = 9
# Optimized low complexity code (Cyclomatic Complexity = 4)
def process_user_data_optimized(user_data):
    """Process user data - optimized version"""
    if not user_data:  # +1
        return None

    email = _extract_email(user_data)
    if not email:  # +1
        return None

    age = user_data.get('age', 0)
    if not _is_valid_age(age):  # +1
        return None

    verified = _check_verification(user_data)

    return {
        'email': email,
        'age': age,
        'verified': verified
    }

def _extract_email(user_data):
    """Extract user email"""
    if 'email' in user_data:
        email = user_data['email']
        return email if '@' in email else None

    if 'username' in user_data:  # +1
        return f"{user_data['username']}@example.com"

    return None

def _is_valid_age(age):
    """Check age validity"""
    return 13 <= age <= 120

def _check_verification(user_data):
    """Check user verification status"""
    if user_data.get('country') == 'CN':  # +1
        return bool(user_data.get('id_card'))
    return False

# Main function complexity: 1 + 3 = 4
# _extract_email complexity: 1 + 1 = 2
# _is_valid_age complexity: 1 (no branches)
# _check_verification complexity: 1 + 1 = 2

Function Length Optimization

# Too long function example (50+ statements)
def generate_report(data):  # pylint: disable=too-many-statements
    """Generate report - too long version"""
    # Data validation (10 lines)
    if not data:
        raise ValueError("Data cannot be empty")
    if not isinstance(data, list):
        raise TypeError("Data must be a list")
    if len(data) == 0:
        raise ValueError("Data list cannot be empty")

    # Data preprocessing (15 lines)
    cleaned_data = []
    for item in data:
        if isinstance(item, dict):
            if 'value' in item and 'category' in item:
                cleaned_item = {
                    'value': float(item['value']),
                    'category': str(item['category']).strip().lower(),
                    'timestamp': item.get('timestamp', 'unknown')
                }
                cleaned_data.append(cleaned_item)

    # Statistical calculations (20 lines)
    categories = {}
    total_value = 0
    max_value = float('-inf')
    min_value = float('inf')

    for item in cleaned_data:
        category = item['category']
        value = item['value']

        if category not in categories:
            categories[category] = {
                'count': 0,
                'total': 0,
                'values': []
            }

        categories[category]['count'] += 1
        categories[category]['total'] += value
        categories[category]['values'].append(value)

        total_value += value
        max_value = max(max_value, value)
        min_value = min(min_value, value)

    # Report generation (15 lines)
    report = {
        'summary': {
            'total_items': len(cleaned_data),
            'total_value': total_value,
            'average_value': total_value / len(cleaned_data),
            'max_value': max_value,
            'min_value': min_value,
            'categories_count': len(categories)
        },
        'categories': {}
    }

    for category, stats in categories.items():
        report['categories'][category] = {
            'count': stats['count'],
            'total': stats['total'],
            'average': stats['total'] / stats['count'],
            'max': max(stats['values']),
            'min': min(stats['values'])
        }

    return report
# Optimized function split
class ReportGenerator:
    """Report generator"""

    def generate_report(self, data):
        """Generate report - main entry point"""
        validated_data = self._validate_data(data)
        cleaned_data = self._preprocess_data(validated_data)
        statistics = self._calculate_statistics(cleaned_data)
        return self._format_report(statistics, cleaned_data)

    def _validate_data(self, data):
        """Validate input data"""
        if not data:
            raise ValueError("Data cannot be empty")
        if not isinstance(data, list):
            raise TypeError("Data must be a list")
        if len(data) == 0:
            raise ValueError("Data list cannot be empty")
        return data

    def _preprocess_data(self, data):
        """Preprocess data"""
        cleaned_data = []
        for item in data:
            if self._is_valid_item(item):
                cleaned_item = self._clean_item(item)
                cleaned_data.append(cleaned_item)
        return cleaned_data

    def _is_valid_item(self, item):
        """Check if data item is valid"""
        return (isinstance(item, dict) and
                'value' in item and
                'category' in item)

    def _clean_item(self, item):
        """Clean individual data item"""
        return {
            'value': float(item['value']),
            'category': str(item['category']).strip().lower(),
            'timestamp': item.get('timestamp', 'unknown')
        }

    def _calculate_statistics(self, data):
        """Calculate statistics"""
        categories = {}
        basic_stats = self._calculate_basic_stats(data)

        for item in data:
            category = item['category']
            if category not in categories:
                categories[category] = {
                    'count': 0,
                    'total': 0,
                    'values': []
                }

            categories[category]['count'] += 1
            categories[category]['total'] += item['value']
            categories[category]['values'].append(item['value'])

        return {
            'basic': basic_stats,
            'categories': categories
        }

    def _calculate_basic_stats(self, data):
        """Calculate basic statistics"""
        values = [item['value'] for item in data]
        return {
            'total_items': len(data),
            'total_value': sum(values),
            'average_value': sum(values) / len(data),
            'max_value': max(values),
            'min_value': min(values)
        }

    def _format_report(self, statistics, data):
        """Format report output"""
        report = {
            'summary': {
                **statistics['basic'],
                'categories_count': len(statistics['categories'])
            },
            'categories': {}
        }

        for category, stats in statistics['categories'].items():
            report['categories'][category] = {
                'count': stats['count'],
                'total': stats['total'],
                'average': stats['total'] / stats['count'],
                'max': max(stats['values']),
                'min': min(stats['values'])
            }

        return report

Parameter Count Optimization

# Function with too many parameters
def create_user_account(  # pylint: disable=too-many-arguments
    username, email, password, first_name, last_name,
    age, country, phone, address, postal_code,
    newsletter_subscription=True, marketing_emails=False):
    """Create user account - too many parameters version"""
    # Implementation code...
    pass

# Optimization solution 1: Use dataclass
from dataclasses import dataclass
from typing import Optional

@dataclass
class UserProfile:
    """User profile information"""
    username: str
    email: str
    password: str
    first_name: str
    last_name: str
    age: int
    country: str
    phone: Optional[str] = None
    address: Optional[str] = None
    postal_code: Optional[str] = None

@dataclass
class UserPreferences:
    """User preference settings"""
    newsletter_subscription: bool = True
    marketing_emails: bool = False

def create_user_account_optimized(
    profile: UserProfile,
    preferences: UserPreferences = None):
    """Create user account - optimized version"""
    if preferences is None:
        preferences = UserPreferences()

    # Implementation code...
    return {
        'profile': profile,
        'preferences': preferences,
        'status': 'created'
    }

# Usage example
profile = UserProfile(
    username="johndoe",
    email="john@example.com",
    password="secure_password",
    first_name="John",
    last_name="Doe",
    age=30,
    country="US"
)

preferences = UserPreferences(
    newsletter_subscription=False,
    marketing_emails=True
)

user = create_user_account_optimized(profile, preferences)
# Optimization solution 2: Use configuration dictionary
def create_user_account_dict(user_data: dict, preferences: dict = None):
    """Create user account - dictionary parameter version"""
    # Required field validation
    required_fields = [
        'username', 'email', 'password', 'first_name',
        'last_name', 'age', 'country'
    ]

    for field in required_fields:
        if field not in user_data:
            raise ValueError(f"Missing required field: {field}")

    # Default preference settings
    default_preferences = {
        'newsletter_subscription': True,
        'marketing_emails': False
    }

    if preferences:
        default_preferences.update(preferences)

    # Create account logic
    return {
        'user_data': user_data,
        'preferences': default_preferences,
        'status': 'created'
    }

# Usage example
user_info = {
    'username': 'johndoe',
    'email': 'john@example.com',
    'password': 'secure_password',
    'first_name': 'John',
    'last_name': 'Doe',
    'age': 30,
    'country': 'US',
    'phone': '+1234567890'
}

user_prefs = {
    'newsletter_subscription': False,
    'marketing_emails': True
}

user = create_user_account_dict(user_info, user_prefs)

Local Variable Count Optimization

# Function with too many local variables
def calculate_financial_metrics(data):  # pylint: disable=too-many-locals
    """Calculate financial metrics - too many variables version"""
    # Basic data
    revenue = data['revenue']
    costs = data['costs']
    expenses = data['expenses']
    assets = data['assets']
    liabilities = data['liabilities']

    # Intermediate calculations
    gross_profit = revenue - costs
    net_profit = gross_profit - expenses
    total_equity = assets - liabilities

    # Ratio calculations
    gross_margin = gross_profit / revenue if revenue > 0 else 0
    net_margin = net_profit / revenue if revenue > 0 else 0
    profit_margin = net_profit / revenue if revenue > 0 else 0

    # ROI related
    roa = net_profit / assets if assets > 0 else 0
    roe = net_profit / total_equity if total_equity > 0 else 0

    # Liquidity indicators
    current_ratio = data.get('current_assets', 0) / data.get('current_liabilities', 1)
    quick_ratio = (data.get('current_assets', 0) - data.get('inventory', 0)) / data.get('current_liabilities', 1)

    # Efficiency indicators
    asset_turnover = revenue / assets if assets > 0 else 0
    equity_turnover = revenue / total_equity if total_equity > 0 else 0

    return {
        'profitability': {
            'gross_margin': gross_margin,
            'net_margin': net_margin,
            'profit_margin': profit_margin
        },
        'returns': {
            'roa': roa,
            'roe': roe
        },
        'liquidity': {
            'current_ratio': current_ratio,
            'quick_ratio': quick_ratio
        },
        'efficiency': {
            'asset_turnover': asset_turnover,
            'equity_turnover': equity_turnover
        }
    }
# Optimization: Use class and method splitting
class FinancialMetricsCalculator:
    """Financial metrics calculator"""

    def __init__(self, financial_data):
        """Initialize financial data"""
        self.data = financial_data
        self._validate_data()

    def _validate_data(self):
        """Validate financial data completeness"""
        required_fields = ['revenue', 'costs', 'expenses', 'assets', 'liabilities']
        for field in required_fields:
            if field not in self.data:
                raise ValueError(f"Missing required financial data field: {field}")

    def calculate_all_metrics(self):
        """Calculate all financial metrics"""
        return {
            'profitability': self.calculate_profitability(),
            'returns': self.calculate_returns(),
            'liquidity': self.calculate_liquidity(),
            'efficiency': self.calculate_efficiency()
        }

    def calculate_profitability(self):
        """Calculate profitability metrics"""
        revenue = self.data['revenue']
        costs = self.data['costs']
        expenses = self.data['expenses']

        gross_profit = revenue - costs
        net_profit = gross_profit - expenses

        return {
            'gross_margin': self._safe_divide(gross_profit, revenue),
            'net_margin': self._safe_divide(net_profit, revenue),
            'profit_margin': self._safe_divide(net_profit, revenue)
        }

    def calculate_returns(self):
        """Calculate return on investment metrics"""
        net_profit = self._get_net_profit()
        assets = self.data['assets']
        equity = self._get_total_equity()

        return {
            'roa': self._safe_divide(net_profit, assets),
            'roe': self._safe_divide(net_profit, equity)
        }

    def calculate_liquidity(self):
        """Calculate liquidity metrics"""
        current_assets = self.data.get('current_assets', 0)
        current_liabilities = self.data.get('current_liabilities', 1)
        inventory = self.data.get('inventory', 0)

        return {
            'current_ratio': self._safe_divide(current_assets, current_liabilities),
            'quick_ratio': self._safe_divide(current_assets - inventory, current_liabilities)
        }

    def calculate_efficiency(self):
        """Calculate efficiency metrics"""
        revenue = self.data['revenue']
        assets = self.data['assets']
        equity = self._get_total_equity()

        return {
            'asset_turnover': self._safe_divide(revenue, assets),
            'equity_turnover': self._safe_divide(revenue, equity)
        }

    def _get_net_profit(self):
        """Get net profit"""
        return self.data['revenue'] - self.data['costs'] - self.data['expenses']

    def _get_total_equity(self):
        """Get total equity"""
        return self.data['assets'] - self.data['liabilities']

    @staticmethod
    def _safe_divide(numerator, denominator):
        """Safe division to avoid division by zero error"""
        return numerator / denominator if denominator != 0 else 0

# Usage example
financial_data = {
    'revenue': 1000000,
    'costs': 600000,
    'expenses': 200000,
    'assets': 1500000,
    'liabilities': 800000,
    'current_assets': 500000,
    'current_liabilities': 300000,
    'inventory': 100000
}

calculator = FinancialMetricsCalculator(financial_data)
metrics = calculator.calculate_all_metrics()

Nesting Depth Optimization

# Deeply nested code
def process_nested_data(data):  # pylint: disable=too-many-nested-blocks
    """Process nested data - deep nesting version"""
    results = []

    if data:
        for category in data:
            if 'items' in category:
                for item in category['items']:
                    if 'active' in item and item['active']:
                        if 'price' in item:
                            if item['price'] > 0:
                                if 'discount' in item:
                                    if item['discount'] > 0:
                                        final_price = item['price'] * (1 - item['discount'])
                                        if final_price > 10:
                                            results.append({
                                                'name': item.get('name', 'Unknown'),
                                                'final_price': final_price,
                                                'category': category.get('name', 'Uncategorized')
                                            })

    return results
# Optimization: Reduce nesting depth
def process_nested_data_optimized(data):
    """Process nested data - optimized version"""
    if not data:
        return []

    results = []
    for category in data:
        category_results = _process_category(category)
        results.extend(category_results)

    return results

def _process_category(category):
    """Process single category"""
    if 'items' not in category:
        return []

    results = []
    for item in category['items']:
        processed_item = _process_item(item, category)
        if processed_item:
            results.append(processed_item)

    return results

def _process_item(item, category):
    """Process single item"""
    # Early return to reduce nesting
    if not _is_valid_item(item):
        return None

    final_price = _calculate_final_price(item)
    if final_price is None or final_price <= 10:
        return None

    return {
        'name': item.get('name', 'Unknown'),
        'final_price': final_price,
        'category': category.get('name', 'Uncategorized')
    }

def _is_valid_item(item):
    """Check if item is valid"""
    return (item.get('active', False) and
            'price' in item and
            item['price'] > 0)

def _calculate_final_price(item):
    """Calculate final price"""
    price = item['price']
    discount = item.get('discount', 0)

    if discount <= 0:
        return price

    return price * (1 - discount)

Configuring Complexity Thresholds

# .pylintrc configuration example
"""
[DESIGN]
# Maximum function parameter count
max-args = 5

# Maximum local variable count
max-locals = 15

# Maximum return statement count
max-returns = 6

# Maximum branch count
max-branches = 12

# Maximum statement count
max-statements = 50

# Maximum parent class count
max-parents = 7

# Maximum attribute count
max-attributes = 7

# Minimum public method count
min-public-methods = 2

# Maximum public method count
max-public-methods = 20

# Maximum boolean expression count
max-bool-expr = 5
"""

# Project-specific configuration example
def configure_complexity_for_project():
    """Project-specific complexity configuration example"""
    # For data processing projects, may need more relaxed configuration
    data_processing_config = {
        'max-args': 8,  # Data processing functions may need more parameters
        'max-locals': 20,  # Data transformation may need more local variables
        'max-statements': 60,  # Data processing flow may be longer
    }

    # For web API projects, stricter configuration
    web_api_config = {
        'max-args': 4,  # API functions should remain simple
        'max-locals': 10,  # Reduce local variable count
        'max-statements': 30,  # Keep functions concise
    }

    # For algorithm implementation, may need special configuration
    algorithm_config = {
        'max-branches': 15,  # Algorithms may have more branches
        'max-statements': 80,  # Algorithm implementation may be longer
        'max-bool-expr': 8,  # Complex conditional checks
    }

    return {
        'data_processing': data_processing_config,
        'web_api': web_api_config,
        'algorithm': algorithm_config
    }
Complexity Optimization Strategies
  1. Function Splitting: Break down large functions into multiple smaller functions
  2. Extract Methods: Extract repeated logic into independent methods
  3. Early Returns: Use early returns to reduce nesting
  4. Data Structure Optimization: Use appropriate data structures to reduce complexity
  5. Design Patterns: Apply design patterns to simplify complex logic
Precautions
  1. Don’t Over-Split: Avoid creating too many tiny functions
  2. Maintain Logical Integrity: Keep related logic together when splitting
  3. Consider Readability: Optimized code should be easier to understand
  4. Balance Performance: Avoid excessive splitting affecting performance

Code complexity control is an important means of improving code quality. Through reasonable refactoring and design, the maintainability and readability of code can be significantly improved.