Chapter 11: Performance Testing and Concurrency Testing
Haiyue
32min
Chapter 11: Performance Testing and Concurrency Testing
Learning Objectives
- Master writing performance tests
- Learn concurrency test implementation
- Understand test performance optimization techniques
- Master the use of pytest-benchmark
Key Concepts
Performance Test Types
| Test Type | Objective | Metrics |
|---|---|---|
| Benchmark Testing | Establish performance baseline | Execution time, memory usage |
| Load Testing | Verify performance under expected load | Response time, throughput |
| Stress Testing | Find system limits | Maximum concurrency, crash point |
| Stability Testing | Long-term stability | Memory leaks, performance degradation |
Concurrency Test Scenarios
- Thread safety: Verify code correctness in multi-threaded environment
- Race conditions: Detect concurrent access issues with shared resources
- Deadlock detection: Identify potential deadlock situations
- Async operations: Test async code behavior
Performance Testing Tools
# Core tools
pytest-benchmark # Micro benchmarking
pytest-xdist # Parallel test execution
pytest-asyncio # Async test support
memory-profiler # Memory analysis
line-profiler # Line-level performance analysis
Example Code
Basic Performance Testing
# test_performance_basic.py
import pytest
import time
import threading
import concurrent.futures
from concurrent.collections import deque
import psutil
import os
# Algorithms to test
def linear_search(arr, target):
"""Linear search"""
for i, item in enumerate(arr):
if item == target:
return i
return -1
def binary_search(arr, target):
"""Binary search (requires sorted array)"""
left, right = 0, len(arr) - 1
while left <= right:
mid = (left + right) // 2
if arr[mid] == target:
return mid
elif arr[mid] < target:
left = mid + 1
else:
right = mid - 1
return -1
def bubble_sort(arr):
"""Bubble sort"""
n = len(arr)
arr_copy = arr.copy()
for i in range(n):
for j in range(0, n - i - 1):
if arr_copy[j] > arr_copy[j + 1]:
arr_copy[j], arr_copy[j + 1] = arr_copy[j + 1], arr_copy[j]
return arr_copy
def merge_sort(arr):
"""Merge sort"""
if len(arr) <= 1:
return arr
mid = len(arr) // 2
left = merge_sort(arr[:mid])
right = merge_sort(arr[mid:])
return merge(left, right)
def merge(left, right):
"""Merge two sorted arrays"""
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
class TestBasicPerformance:
"""Basic performance tests"""
@pytest.fixture(scope="class")
def small_dataset(self):
"""Small dataset"""
return list(range(100))
@pytest.fixture(scope="class")
def medium_dataset(self):
"""Medium dataset"""
return list(range(1000))
@pytest.fixture(scope="class")
def large_dataset(self):
"""Large dataset"""
return list(range(10000))
def test_linear_search_performance(self, benchmark, medium_dataset):
"""Linear search performance test"""
target = 500
result = benchmark(linear_search, medium_dataset, target)
assert result == 500
def test_binary_search_performance(self, benchmark, medium_dataset):
"""Binary search performance test"""
sorted_data = sorted(medium_dataset)
target = 500
result = benchmark(binary_search, sorted_data, target)
assert result == 500
def test_bubble_sort_small(self, benchmark, small_dataset):
"""Bubble sort small dataset performance"""
import random
data = small_dataset.copy()
random.shuffle(data)
result = benchmark(bubble_sort, data)
assert result == sorted(data)
def test_merge_sort_large(self, benchmark, large_dataset):
"""Merge sort large dataset performance"""
import random
data = large_dataset.copy()
random.shuffle(data)
result = benchmark(merge_sort, data)
assert result == sorted(data)
@pytest.mark.parametrize("size", [100, 500, 1000])
def test_sorting_algorithm_comparison(self, benchmark, size):
"""Sorting algorithm performance comparison"""
import random
data = list(range(size))
random.shuffle(data)
# Test merge sort here
result = benchmark(merge_sort, data)
assert len(result) == size
assert result == sorted(data)
def test_memory_usage_monitoring(self, medium_dataset):
"""Memory usage monitoring"""
process = psutil.Process(os.getpid())
# Record initial memory usage
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# Execute memory-intensive operation
data_copies = []
for _ in range(10):
data_copies.append(medium_dataset.copy())
# Record peak memory usage
peak_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = peak_memory - initial_memory
# Clean up memory
del data_copies
# Verify memory usage is reasonable
assert memory_increase < 50 # Memory growth should not exceed 50MB
print(f"Memory increase: {memory_increase:.2f} MB")
Concurrency Test Implementation
# test_concurrency.py
import pytest
import threading
import time
import queue
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing
# Code to test for concurrency
class ThreadSafeCounter:
"""Thread-safe counter"""
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
"""Increment counter"""
with self._lock:
old_value = self._value
time.sleep(0.001) # Simulate some processing time
self._value = old_value + 1
def decrement(self):
"""Decrement counter"""
with self._lock:
old_value = self._value
time.sleep(0.001)
self._value = old_value - 1
@property
def value(self):
"""Get current value"""
with self._lock:
return self._value
class UnsafeCounter:
"""Non-thread-safe counter (for comparison)"""
def __init__(self):
self._value = 0
def increment(self):
old_value = self._value
time.sleep(0.001)
self._value = old_value + 1
def decrement(self):
old_value = self._value
time.sleep(0.001)
self._value = old_value - 1
@property
def value(self):
return self._value
class ProducerConsumerQueue:
"""Producer-consumer queue"""
def __init__(self, maxsize=0):
self.queue = queue.Queue(maxsize)
self.total_produced = 0
self.total_consumed = 0
self._lock = threading.Lock()
def produce(self, item):
"""Produce item"""
self.queue.put(item)
with self._lock:
self.total_produced += 1
def consume(self, timeout=1):
"""Consume item"""
try:
item = self.queue.get(timeout=timeout)
with self._lock:
self.total_consumed += 1
return item
except queue.Empty:
return None
def get_stats(self):
"""Get statistics"""
with self._lock:
return {
'produced': self.total_produced,
'consumed': self.total_consumed,
'queue_size': self.queue.qsize()
}
class TestConcurrency:
"""Concurrency tests"""
def test_thread_safe_counter(self):
"""Test thread-safe counter"""
counter = ThreadSafeCounter()
threads = []
def increment_worker():
for _ in range(100):
counter.increment()
def decrement_worker():
for _ in range(50):
counter.decrement()
# Create multiple threads
for _ in range(5):
threads.append(threading.Thread(target=increment_worker))
for _ in range(3):
threads.append(threading.Thread(target=decrement_worker))
# Start all threads
for thread in threads:
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Verify result: 5*100 - 3*50 = 350
assert counter.value == 350
def test_unsafe_counter_race_condition(self):
"""Test non-thread-safe counter race condition"""
counter = UnsafeCounter()
threads = []
def increment_worker():
for _ in range(100):
counter.increment()
# Create multiple threads
for _ in range(5):
threads.append(threading.Thread(target=increment_worker))
# Start all threads
for thread in threads:
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Non-thread-safe counter usually doesn't get expected result
expected = 500
actual = counter.value
print(f"Expected: {expected}, Actual: {actual}")
# Due to race condition, actual value is usually less than expected
assert actual <= expected
# In most cases, result will be incorrect due to race condition
# We don't make a strong assertion here, just demonstrate the problem
def test_producer_consumer_pattern(self):
"""Test producer-consumer pattern"""
pc_queue = ProducerConsumerQueue(maxsize=10)
def producer(name, count):
"""Producer function"""
for i in range(count):
item = f"{name}_item_{i}"
pc_queue.produce(item)
time.sleep(0.01)
def consumer(name, max_items):
"""Consumer function"""
consumed = 0
while consumed < max_items:
item = pc_queue.consume(timeout=2)
if item is None:
break
consumed += 1
time.sleep(0.01)
# Create producer and consumer threads
threads = []
# 2 producers, each producing 20 items
for i in range(2):
thread = threading.Thread(
target=producer,
args=(f"producer_{i}", 20)
)
threads.append(thread)
# 3 consumers, each consuming up to 15 items
for i in range(3):
thread = threading.Thread(
target=consumer,
args=(f"consumer_{i}", 15)
)
threads.append(thread)
# Start all threads
for thread in threads:
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Verify production and consumption balance
stats = pc_queue.get_stats()
assert stats['produced'] == 40 # 2 * 20
assert stats['consumed'] <= stats['produced']
assert stats['queue_size'] == stats['produced'] - stats['consumed']
print(f"Statistics: {stats}")
def test_thread_pool_executor(self):
"""Test thread pool executor"""
def worker_task(task_id):
"""Worker task"""
time.sleep(0.1)
return f"Task {task_id} completed"
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit tasks
futures = []
for i in range(10):
future = executor.submit(worker_task, i)
futures.append(future)
# Collect results
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
# Verify all tasks completed
assert len(results) == 10
assert all("completed" in result for result in results)
def test_process_pool_executor(self):
"""Test process pool executor"""
def cpu_intensive_task(n):
"""CPU-intensive task"""
result = 0
for i in range(n):
result += i * i
return result
with ProcessPoolExecutor(max_workers=2) as executor:
# Submit CPU-intensive tasks
futures = []
for i in range(1000, 5000, 1000):
future = executor.submit(cpu_intensive_task, i)
futures.append(future)
# Collect results
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
# Verify all tasks completed
assert len(results) == 4
assert all(isinstance(result, int) and result > 0 for result in results)
@pytest.mark.asyncio
async def test_async_concurrency(self):
"""Test async concurrency"""
async def async_task(task_id, delay):
"""Async task"""
await asyncio.sleep(delay)
return f"Async task {task_id} completed after {delay}s"
# Create multiple async tasks
tasks = []
for i in range(5):
task = async_task(i, 0.1)
tasks.append(task)
# Execute all tasks concurrently
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
# Verify results
assert len(results) == 5
assert all("completed" in result for result in results)
# Verify concurrent execution time benefit
total_time = end_time - start_time
assert total_time < 0.2 # Should be much less than sequential 0.5 seconds
def test_deadlock_detection(self):
"""Deadlock detection test"""
lock1 = threading.Lock()
lock2 = threading.Lock()
deadlock_occurred = threading.Event()
def worker1():
"""Worker thread 1"""
try:
with lock1:
time.sleep(0.1)
if lock2.acquire(timeout=0.5): # Use timeout to avoid real deadlock
lock2.release()
else:
deadlock_occurred.set()
except Exception as e:
deadlock_occurred.set()
def worker2():
"""Worker thread 2"""
try:
with lock2:
time.sleep(0.1)
if lock1.acquire(timeout=0.5): # Use timeout to avoid real deadlock
lock1.release()
else:
deadlock_occurred.set()
except Exception as e:
deadlock_occurred.set()
# Start two threads that may deadlock
thread1 = threading.Thread(target=worker1)
thread2 = threading.Thread(target=worker2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# Verify if deadlock situation detected
if deadlock_occurred.is_set():
print("Potential deadlock detected")
else:
print("No deadlock occurred")
# This test mainly demonstrates deadlock detection mechanism
assert True # Test always passes
Performance Benchmark Testing
# test_benchmark_advanced.py
import pytest
import time
import random
import string
from collections import deque, defaultdict
import heapq
# Data structure performance tests
class DataStructurePerformance:
"""Data structure performance test class"""
@staticmethod
def list_operations(data, operations):
"""List operations"""
lst = list(data)
for op, value in operations:
if op == 'append':
lst.append(value)
elif op == 'insert':
lst.insert(0, value)
elif op == 'remove' and value in lst:
lst.remove(value)
return lst
@staticmethod
def deque_operations(data, operations):
"""Deque operations"""
dq = deque(data)
for op, value in operations:
if op == 'append':
dq.append(value)
elif op == 'insert':
dq.appendleft(value)
elif op == 'remove' and value in dq:
dq.remove(value)
return list(dq)
@staticmethod
def set_operations(data, operations):
"""Set operations"""
s = set(data)
for op, value in operations:
if op == 'add':
s.add(value)
elif op == 'remove' and value in s:
s.remove(value)
return s
@staticmethod
def dict_operations(data, operations):
"""Dictionary operations"""
d = {i: v for i, v in enumerate(data)}
for op, key, value in operations:
if op == 'set':
d[key] = value
elif op == 'get':
_ = d.get(key)
elif op == 'delete' and key in d:
del d[key]
return d
class TestBenchmarkAdvanced:
"""Advanced benchmark tests"""
@pytest.fixture(scope="class")
def sample_data(self):
"""Sample data"""
return list(range(1000))
@pytest.fixture(scope="class")
def operations_data(self):
"""Operations data"""
operations = []
for _ in range(100):
op = random.choice(['append', 'insert', 'remove'])
value = random.randint(1, 1000)
operations.append((op, value))
return operations
def test_list_vs_deque_performance(self, benchmark, sample_data, operations_data):
"""List vs deque performance comparison"""
# This test will run multiple times, testing different data structures each time
data_structures = {
'list': DataStructurePerformance.list_operations,
'deque': DataStructurePerformance.deque_operations
}
# Benchmark will automatically select one to test
# Through parametrization we can test each data structure separately
result = benchmark(
DataStructurePerformance.list_operations,
sample_data,
operations_data
)
assert len(result) >= 900 # Considering possible delete operations
@pytest.mark.parametrize("data_structure", ["list", "deque"])
def test_data_structure_comparison(self, benchmark, sample_data, operations_data, data_structure):
"""Data structure performance comparison parametrized test"""
methods = {
'list': DataStructurePerformance.list_operations,
'deque': DataStructurePerformance.deque_operations
}
method = methods[data_structure]
result = benchmark(method, sample_data, operations_data)
assert len(result) >= 900
def test_string_concatenation_methods(self, benchmark):
"""String concatenation method performance test"""
def string_concat_plus():
"""Concatenate using + operator"""
result = ""
for i in range(1000):
result += str(i)
return result
def string_concat_join():
"""Concatenate using join method"""
parts = []
for i in range(1000):
parts.append(str(i))
return "".join(parts)
def string_concat_format():
"""Concatenate using f-string"""
parts = [str(i) for i in range(1000)]
return "".join(parts)
# Test join method (usually fastest)
result = benchmark(string_concat_join)
assert len(result) > 0
def test_search_algorithms_benchmark(self, benchmark):
"""Search algorithm performance benchmark test"""
# Prepare test data
size = 10000
data = list(range(size))
random.shuffle(data)
sorted_data = sorted(data)
target = size // 2
def linear_search_impl():
for i, item in enumerate(data):
if item == target:
return i
return -1
def binary_search_impl():
left, right = 0, len(sorted_data) - 1
while left <= right:
mid = (left + right) // 2
if sorted_data[mid] == target:
return mid
elif sorted_data[mid] < target:
left = mid + 1
else:
right = mid - 1
return -1
# Test binary search
result = benchmark(binary_search_impl)
assert result != -1
def test_heap_operations_benchmark(self, benchmark):
"""Heap operations performance test"""
def heap_operations():
heap = []
# Insert elements
for i in range(1000):
heapq.heappush(heap, random.randint(1, 10000))
# Pop minimum elements
results = []
for _ in range(100):
if heap:
results.append(heapq.heappop(heap))
return results
result = benchmark(heap_operations)
assert len(result) == 100
assert result == sorted(result) # Verify heap ordering
def test_memory_intensive_operation(self, benchmark):
"""Memory-intensive operation test"""
def memory_operation():
# Create many objects
data = []
for _ in range(10000):
data.append({
'id': random.randint(1, 100000),
'name': ''.join(random.choices(string.ascii_letters, k=10)),
'values': [random.random() for _ in range(10)]
})
# Process data
processed = []
for item in data:
if item['id'] % 2 == 0:
processed.append({
'id': item['id'],
'name_upper': item['name'].upper(),
'avg_value': sum(item['values']) / len(item['values'])
})
return len(processed)
result = benchmark(memory_operation)
assert result > 0
def test_benchmark_with_setup_teardown(self, benchmark):
"""Benchmark test with setup and teardown"""
def setup():
# Prepare test data
return {
'data': [random.randint(1, 1000) for _ in range(1000)],
'lookup': set(random.randint(1, 1000) for _ in range(100))
}
def test_function(test_data):
# Actual function to test
data = test_data['data']
lookup = test_data['lookup']
found = []
for item in data:
if item in lookup:
found.append(item)
return len(found)
def teardown(test_data):
# Cleanup work
test_data.clear()
result = benchmark.pedantic(
test_function,
setup=setup,
teardown=teardown,
rounds=5,
iterations=10
)
assert result >= 0
def test_custom_timer_benchmark(self, benchmark):
"""Custom timer benchmark test"""
@benchmark
def timed_operation():
# Simulate an operation that needs precise timing
start = time.perf_counter()
# Execute some computation
result = 0
for i in range(10000):
result += i ** 0.5
end = time.perf_counter()
# Return result and execution time
return result, end - start
result, duration = timed_operation
assert result > 0
assert duration > 0
Performance Monitoring and Analysis
# test_performance_monitoring.py
import pytest
import psutil
import os
import threading
import time
from contextlib import contextmanager
import gc
class PerformanceMonitor:
"""Performance monitor"""
def __init__(self):
self.process = psutil.Process(os.getpid())
self.baseline_memory = None
self.peak_memory = None
self.start_time = None
self.end_time = None
def start_monitoring(self):
"""Start monitoring"""
gc.collect() # Trigger garbage collection
self.baseline_memory = self.process.memory_info().rss / 1024 / 1024
self.peak_memory = self.baseline_memory
self.start_time = time.perf_counter()
def update_peak_memory(self):
"""Update peak memory"""
current_memory = self.process.memory_info().rss / 1024 / 1024
if current_memory > self.peak_memory:
self.peak_memory = current_memory
def stop_monitoring(self):
"""Stop monitoring"""
self.end_time = time.perf_counter()
self.update_peak_memory()
def get_metrics(self):
"""Get performance metrics"""
return {
'duration': self.end_time - self.start_time if self.end_time else None,
'baseline_memory_mb': self.baseline_memory,
'peak_memory_mb': self.peak_memory,
'memory_increase_mb': self.peak_memory - self.baseline_memory if self.baseline_memory else None,
'cpu_percent': self.process.cpu_percent()
}
@contextmanager
def performance_monitor():
"""Performance monitoring context manager"""
monitor = PerformanceMonitor()
monitor.start_monitoring()
# Start monitoring thread
monitoring = True
def monitor_thread():
while monitoring:
monitor.update_peak_memory()
time.sleep(0.1)
thread = threading.Thread(target=monitor_thread, daemon=True)
thread.start()
try:
yield monitor
finally:
monitoring = False
monitor.stop_monitoring()
thread.join(timeout=1)
class TestPerformanceMonitoring:
"""Performance monitoring tests"""
def test_memory_usage_tracking(self):
"""Memory usage tracking test"""
with performance_monitor() as monitor:
# Execute memory-intensive operation
large_data = []
for _ in range(100):
large_data.append([random.randint(1, 1000) for _ in range(1000)])
# Verify data
assert len(large_data) == 100
assert all(len(sublist) == 1000 for sublist in large_data)
# Analyze performance metrics
metrics = monitor.get_metrics()
print(f"Execution time: {metrics['duration']:.3f} seconds")
print(f"Baseline memory: {metrics['baseline_memory_mb']:.2f} MB")
print(f"Peak memory: {metrics['peak_memory_mb']:.2f} MB")
print(f"Memory increase: {metrics['memory_increase_mb']:.2f} MB")
# Verify memory usage is reasonable
assert metrics['duration'] < 5.0 # Execution time not exceeding 5 seconds
assert metrics['memory_increase_mb'] < 100 # Memory increase not exceeding 100MB
def test_cpu_intensive_monitoring(self):
"""CPU-intensive monitoring test"""
with performance_monitor() as monitor:
# CPU-intensive computation
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
result = fibonacci(25)
assert result > 0
metrics = monitor.get_metrics()
print(f"CPU usage: {metrics['cpu_percent']:.2f}%")
print(f"Computation time: {metrics['duration']:.3f} seconds")
# Verify CPU usage
assert metrics['duration'] > 0
def test_concurrent_performance_monitoring(self):
"""Concurrent performance monitoring test"""
def worker_task(task_id, duration):
"""Worker task"""
start = time.time()
while time.time() - start < duration:
# Simulate workload
_ = [i**2 for i in range(1000)]
return task_id
with performance_monitor() as monitor:
threads = []
# Create multiple concurrent tasks
for i in range(4):
thread = threading.Thread(
target=worker_task,
args=(i, 0.5)
)
threads.append(thread)
thread.start()
# Wait for all tasks to complete
for thread in threads:
thread.join()
metrics = monitor.get_metrics()
print(f"Concurrent execution time: {metrics['duration']:.3f} seconds")
print(f"Memory usage: {metrics['memory_increase_mb']:.2f} MB")
# Verify concurrent performance
assert metrics['duration'] < 1.0 # Concurrent execution should be faster than sequential
@pytest.mark.benchmark
def test_benchmark_with_monitoring(self, benchmark):
"""Performance monitoring combined with benchmark testing"""
def monitored_operation():
with performance_monitor() as monitor:
# Execute operation being tested
data = [random.randint(1, 1000) for _ in range(10000)]
sorted_data = sorted(data)
return sorted_data, monitor.get_metrics()
result, metrics = benchmark(monitored_operation)
# Verify result
assert len(result) == 10000
assert result == sorted(result)
# Print performance metrics
print(f"Benchmark memory increase: {metrics['memory_increase_mb']:.2f} MB")
Performance Testing Best Practices
- Establish baseline: Establish performance baseline, track performance trends
- Environment consistency: Ensure test environment consistency and repeatability
- Multiple measurements: Perform multiple measurements to obtain reliable results
- Resource monitoring: Monitor CPU, memory, I/O and other system resources
- Progressive testing: Start with small data, gradually increase test scale
Important Notes
- Test isolation: Performance tests may affect execution of other tests
- System load: System load affects performance test results
- Garbage collection: Python garbage collection may affect performance measurement
- Concurrent safety: Ensure thread safety in concurrent tests
Performance testing and concurrency testing are important means to ensure application stability in production environment. Through systematic performance testing, performance issues can be identified and resolved early.