Chapter 11: Performance Testing and Concurrency Testing

Haiyue
32min

Chapter 11: Performance Testing and Concurrency Testing

Learning Objectives
  • Master writing performance tests
  • Learn concurrency test implementation
  • Understand test performance optimization techniques
  • Master the use of pytest-benchmark

Key Concepts

Performance Test Types

Test TypeObjectiveMetrics
Benchmark TestingEstablish performance baselineExecution time, memory usage
Load TestingVerify performance under expected loadResponse time, throughput
Stress TestingFind system limitsMaximum concurrency, crash point
Stability TestingLong-term stabilityMemory leaks, performance degradation

Concurrency Test Scenarios

  • Thread safety: Verify code correctness in multi-threaded environment
  • Race conditions: Detect concurrent access issues with shared resources
  • Deadlock detection: Identify potential deadlock situations
  • Async operations: Test async code behavior

Performance Testing Tools

# Core tools
pytest-benchmark    # Micro benchmarking
pytest-xdist       # Parallel test execution
pytest-asyncio     # Async test support
memory-profiler    # Memory analysis
line-profiler      # Line-level performance analysis

Example Code

Basic Performance Testing

# test_performance_basic.py
import pytest
import time
import threading
import concurrent.futures
from concurrent.collections import deque
import psutil
import os

# Algorithms to test
def linear_search(arr, target):
    """Linear search"""
    for i, item in enumerate(arr):
        if item == target:
            return i
    return -1

def binary_search(arr, target):
    """Binary search (requires sorted array)"""
    left, right = 0, len(arr) - 1

    while left <= right:
        mid = (left + right) // 2
        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1
        else:
            right = mid - 1

    return -1

def bubble_sort(arr):
    """Bubble sort"""
    n = len(arr)
    arr_copy = arr.copy()

    for i in range(n):
        for j in range(0, n - i - 1):
            if arr_copy[j] > arr_copy[j + 1]:
                arr_copy[j], arr_copy[j + 1] = arr_copy[j + 1], arr_copy[j]

    return arr_copy

def merge_sort(arr):
    """Merge sort"""
    if len(arr) <= 1:
        return arr

    mid = len(arr) // 2
    left = merge_sort(arr[:mid])
    right = merge_sort(arr[mid:])

    return merge(left, right)

def merge(left, right):
    """Merge two sorted arrays"""
    result = []
    i = j = 0

    while i < len(left) and j < len(right):
        if left[i] <= right[j]:
            result.append(left[i])
            i += 1
        else:
            result.append(right[j])
            j += 1

    result.extend(left[i:])
    result.extend(right[j:])
    return result

class TestBasicPerformance:
    """Basic performance tests"""

    @pytest.fixture(scope="class")
    def small_dataset(self):
        """Small dataset"""
        return list(range(100))

    @pytest.fixture(scope="class")
    def medium_dataset(self):
        """Medium dataset"""
        return list(range(1000))

    @pytest.fixture(scope="class")
    def large_dataset(self):
        """Large dataset"""
        return list(range(10000))

    def test_linear_search_performance(self, benchmark, medium_dataset):
        """Linear search performance test"""
        target = 500
        result = benchmark(linear_search, medium_dataset, target)
        assert result == 500

    def test_binary_search_performance(self, benchmark, medium_dataset):
        """Binary search performance test"""
        sorted_data = sorted(medium_dataset)
        target = 500
        result = benchmark(binary_search, sorted_data, target)
        assert result == 500

    def test_bubble_sort_small(self, benchmark, small_dataset):
        """Bubble sort small dataset performance"""
        import random
        data = small_dataset.copy()
        random.shuffle(data)

        result = benchmark(bubble_sort, data)
        assert result == sorted(data)

    def test_merge_sort_large(self, benchmark, large_dataset):
        """Merge sort large dataset performance"""
        import random
        data = large_dataset.copy()
        random.shuffle(data)

        result = benchmark(merge_sort, data)
        assert result == sorted(data)

    @pytest.mark.parametrize("size", [100, 500, 1000])
    def test_sorting_algorithm_comparison(self, benchmark, size):
        """Sorting algorithm performance comparison"""
        import random
        data = list(range(size))
        random.shuffle(data)

        # Test merge sort here
        result = benchmark(merge_sort, data)
        assert len(result) == size
        assert result == sorted(data)

    def test_memory_usage_monitoring(self, medium_dataset):
        """Memory usage monitoring"""
        process = psutil.Process(os.getpid())

        # Record initial memory usage
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB

        # Execute memory-intensive operation
        data_copies = []
        for _ in range(10):
            data_copies.append(medium_dataset.copy())

        # Record peak memory usage
        peak_memory = process.memory_info().rss / 1024 / 1024  # MB

        memory_increase = peak_memory - initial_memory

        # Clean up memory
        del data_copies

        # Verify memory usage is reasonable
        assert memory_increase < 50  # Memory growth should not exceed 50MB

        print(f"Memory increase: {memory_increase:.2f} MB")

Concurrency Test Implementation

# test_concurrency.py
import pytest
import threading
import time
import queue
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing

# Code to test for concurrency
class ThreadSafeCounter:
    """Thread-safe counter"""

    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    def increment(self):
        """Increment counter"""
        with self._lock:
            old_value = self._value
            time.sleep(0.001)  # Simulate some processing time
            self._value = old_value + 1

    def decrement(self):
        """Decrement counter"""
        with self._lock:
            old_value = self._value
            time.sleep(0.001)
            self._value = old_value - 1

    @property
    def value(self):
        """Get current value"""
        with self._lock:
            return self._value

class UnsafeCounter:
    """Non-thread-safe counter (for comparison)"""

    def __init__(self):
        self._value = 0

    def increment(self):
        old_value = self._value
        time.sleep(0.001)
        self._value = old_value + 1

    def decrement(self):
        old_value = self._value
        time.sleep(0.001)
        self._value = old_value - 1

    @property
    def value(self):
        return self._value

class ProducerConsumerQueue:
    """Producer-consumer queue"""

    def __init__(self, maxsize=0):
        self.queue = queue.Queue(maxsize)
        self.total_produced = 0
        self.total_consumed = 0
        self._lock = threading.Lock()

    def produce(self, item):
        """Produce item"""
        self.queue.put(item)
        with self._lock:
            self.total_produced += 1

    def consume(self, timeout=1):
        """Consume item"""
        try:
            item = self.queue.get(timeout=timeout)
            with self._lock:
                self.total_consumed += 1
            return item
        except queue.Empty:
            return None

    def get_stats(self):
        """Get statistics"""
        with self._lock:
            return {
                'produced': self.total_produced,
                'consumed': self.total_consumed,
                'queue_size': self.queue.qsize()
            }

class TestConcurrency:
    """Concurrency tests"""

    def test_thread_safe_counter(self):
        """Test thread-safe counter"""
        counter = ThreadSafeCounter()
        threads = []

        def increment_worker():
            for _ in range(100):
                counter.increment()

        def decrement_worker():
            for _ in range(50):
                counter.decrement()

        # Create multiple threads
        for _ in range(5):
            threads.append(threading.Thread(target=increment_worker))

        for _ in range(3):
            threads.append(threading.Thread(target=decrement_worker))

        # Start all threads
        for thread in threads:
            thread.start()

        # Wait for all threads to complete
        for thread in threads:
            thread.join()

        # Verify result: 5*100 - 3*50 = 350
        assert counter.value == 350

    def test_unsafe_counter_race_condition(self):
        """Test non-thread-safe counter race condition"""
        counter = UnsafeCounter()
        threads = []

        def increment_worker():
            for _ in range(100):
                counter.increment()

        # Create multiple threads
        for _ in range(5):
            threads.append(threading.Thread(target=increment_worker))

        # Start all threads
        for thread in threads:
            thread.start()

        # Wait for all threads to complete
        for thread in threads:
            thread.join()

        # Non-thread-safe counter usually doesn't get expected result
        expected = 500
        actual = counter.value

        print(f"Expected: {expected}, Actual: {actual}")

        # Due to race condition, actual value is usually less than expected
        assert actual <= expected

        # In most cases, result will be incorrect due to race condition
        # We don't make a strong assertion here, just demonstrate the problem

    def test_producer_consumer_pattern(self):
        """Test producer-consumer pattern"""
        pc_queue = ProducerConsumerQueue(maxsize=10)

        def producer(name, count):
            """Producer function"""
            for i in range(count):
                item = f"{name}_item_{i}"
                pc_queue.produce(item)
                time.sleep(0.01)

        def consumer(name, max_items):
            """Consumer function"""
            consumed = 0
            while consumed < max_items:
                item = pc_queue.consume(timeout=2)
                if item is None:
                    break
                consumed += 1
                time.sleep(0.01)

        # Create producer and consumer threads
        threads = []

        # 2 producers, each producing 20 items
        for i in range(2):
            thread = threading.Thread(
                target=producer,
                args=(f"producer_{i}", 20)
            )
            threads.append(thread)

        # 3 consumers, each consuming up to 15 items
        for i in range(3):
            thread = threading.Thread(
                target=consumer,
                args=(f"consumer_{i}", 15)
            )
            threads.append(thread)

        # Start all threads
        for thread in threads:
            thread.start()

        # Wait for all threads to complete
        for thread in threads:
            thread.join()

        # Verify production and consumption balance
        stats = pc_queue.get_stats()

        assert stats['produced'] == 40  # 2 * 20
        assert stats['consumed'] <= stats['produced']
        assert stats['queue_size'] == stats['produced'] - stats['consumed']

        print(f"Statistics: {stats}")

    def test_thread_pool_executor(self):
        """Test thread pool executor"""
        def worker_task(task_id):
            """Worker task"""
            time.sleep(0.1)
            return f"Task {task_id} completed"

        with ThreadPoolExecutor(max_workers=4) as executor:
            # Submit tasks
            futures = []
            for i in range(10):
                future = executor.submit(worker_task, i)
                futures.append(future)

            # Collect results
            results = []
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                results.append(result)

        # Verify all tasks completed
        assert len(results) == 10
        assert all("completed" in result for result in results)

    def test_process_pool_executor(self):
        """Test process pool executor"""
        def cpu_intensive_task(n):
            """CPU-intensive task"""
            result = 0
            for i in range(n):
                result += i * i
            return result

        with ProcessPoolExecutor(max_workers=2) as executor:
            # Submit CPU-intensive tasks
            futures = []
            for i in range(1000, 5000, 1000):
                future = executor.submit(cpu_intensive_task, i)
                futures.append(future)

            # Collect results
            results = []
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                results.append(result)

        # Verify all tasks completed
        assert len(results) == 4
        assert all(isinstance(result, int) and result > 0 for result in results)

    @pytest.mark.asyncio
    async def test_async_concurrency(self):
        """Test async concurrency"""
        async def async_task(task_id, delay):
            """Async task"""
            await asyncio.sleep(delay)
            return f"Async task {task_id} completed after {delay}s"

        # Create multiple async tasks
        tasks = []
        for i in range(5):
            task = async_task(i, 0.1)
            tasks.append(task)

        # Execute all tasks concurrently
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()

        # Verify results
        assert len(results) == 5
        assert all("completed" in result for result in results)

        # Verify concurrent execution time benefit
        total_time = end_time - start_time
        assert total_time < 0.2  # Should be much less than sequential 0.5 seconds

    def test_deadlock_detection(self):
        """Deadlock detection test"""
        lock1 = threading.Lock()
        lock2 = threading.Lock()
        deadlock_occurred = threading.Event()

        def worker1():
            """Worker thread 1"""
            try:
                with lock1:
                    time.sleep(0.1)
                    if lock2.acquire(timeout=0.5):  # Use timeout to avoid real deadlock
                        lock2.release()
                    else:
                        deadlock_occurred.set()
            except Exception as e:
                deadlock_occurred.set()

        def worker2():
            """Worker thread 2"""
            try:
                with lock2:
                    time.sleep(0.1)
                    if lock1.acquire(timeout=0.5):  # Use timeout to avoid real deadlock
                        lock1.release()
                    else:
                        deadlock_occurred.set()
            except Exception as e:
                deadlock_occurred.set()

        # Start two threads that may deadlock
        thread1 = threading.Thread(target=worker1)
        thread2 = threading.Thread(target=worker2)

        thread1.start()
        thread2.start()

        thread1.join()
        thread2.join()

        # Verify if deadlock situation detected
        if deadlock_occurred.is_set():
            print("Potential deadlock detected")
        else:
            print("No deadlock occurred")

        # This test mainly demonstrates deadlock detection mechanism
        assert True  # Test always passes

Performance Benchmark Testing

# test_benchmark_advanced.py
import pytest
import time
import random
import string
from collections import deque, defaultdict
import heapq

# Data structure performance tests
class DataStructurePerformance:
    """Data structure performance test class"""

    @staticmethod
    def list_operations(data, operations):
        """List operations"""
        lst = list(data)
        for op, value in operations:
            if op == 'append':
                lst.append(value)
            elif op == 'insert':
                lst.insert(0, value)
            elif op == 'remove' and value in lst:
                lst.remove(value)
        return lst

    @staticmethod
    def deque_operations(data, operations):
        """Deque operations"""
        dq = deque(data)
        for op, value in operations:
            if op == 'append':
                dq.append(value)
            elif op == 'insert':
                dq.appendleft(value)
            elif op == 'remove' and value in dq:
                dq.remove(value)
        return list(dq)

    @staticmethod
    def set_operations(data, operations):
        """Set operations"""
        s = set(data)
        for op, value in operations:
            if op == 'add':
                s.add(value)
            elif op == 'remove' and value in s:
                s.remove(value)
        return s

    @staticmethod
    def dict_operations(data, operations):
        """Dictionary operations"""
        d = {i: v for i, v in enumerate(data)}
        for op, key, value in operations:
            if op == 'set':
                d[key] = value
            elif op == 'get':
                _ = d.get(key)
            elif op == 'delete' and key in d:
                del d[key]
        return d

class TestBenchmarkAdvanced:
    """Advanced benchmark tests"""

    @pytest.fixture(scope="class")
    def sample_data(self):
        """Sample data"""
        return list(range(1000))

    @pytest.fixture(scope="class")
    def operations_data(self):
        """Operations data"""
        operations = []
        for _ in range(100):
            op = random.choice(['append', 'insert', 'remove'])
            value = random.randint(1, 1000)
            operations.append((op, value))
        return operations

    def test_list_vs_deque_performance(self, benchmark, sample_data, operations_data):
        """List vs deque performance comparison"""
        # This test will run multiple times, testing different data structures each time
        data_structures = {
            'list': DataStructurePerformance.list_operations,
            'deque': DataStructurePerformance.deque_operations
        }

        # Benchmark will automatically select one to test
        # Through parametrization we can test each data structure separately
        result = benchmark(
            DataStructurePerformance.list_operations,
            sample_data,
            operations_data
        )
        assert len(result) >= 900  # Considering possible delete operations

    @pytest.mark.parametrize("data_structure", ["list", "deque"])
    def test_data_structure_comparison(self, benchmark, sample_data, operations_data, data_structure):
        """Data structure performance comparison parametrized test"""
        methods = {
            'list': DataStructurePerformance.list_operations,
            'deque': DataStructurePerformance.deque_operations
        }

        method = methods[data_structure]
        result = benchmark(method, sample_data, operations_data)
        assert len(result) >= 900

    def test_string_concatenation_methods(self, benchmark):
        """String concatenation method performance test"""
        def string_concat_plus():
            """Concatenate using + operator"""
            result = ""
            for i in range(1000):
                result += str(i)
            return result

        def string_concat_join():
            """Concatenate using join method"""
            parts = []
            for i in range(1000):
                parts.append(str(i))
            return "".join(parts)

        def string_concat_format():
            """Concatenate using f-string"""
            parts = [str(i) for i in range(1000)]
            return "".join(parts)

        # Test join method (usually fastest)
        result = benchmark(string_concat_join)
        assert len(result) > 0

    def test_search_algorithms_benchmark(self, benchmark):
        """Search algorithm performance benchmark test"""
        # Prepare test data
        size = 10000
        data = list(range(size))
        random.shuffle(data)
        sorted_data = sorted(data)
        target = size // 2

        def linear_search_impl():
            for i, item in enumerate(data):
                if item == target:
                    return i
            return -1

        def binary_search_impl():
            left, right = 0, len(sorted_data) - 1
            while left <= right:
                mid = (left + right) // 2
                if sorted_data[mid] == target:
                    return mid
                elif sorted_data[mid] < target:
                    left = mid + 1
                else:
                    right = mid - 1
            return -1

        # Test binary search
        result = benchmark(binary_search_impl)
        assert result != -1

    def test_heap_operations_benchmark(self, benchmark):
        """Heap operations performance test"""
        def heap_operations():
            heap = []

            # Insert elements
            for i in range(1000):
                heapq.heappush(heap, random.randint(1, 10000))

            # Pop minimum elements
            results = []
            for _ in range(100):
                if heap:
                    results.append(heapq.heappop(heap))

            return results

        result = benchmark(heap_operations)
        assert len(result) == 100
        assert result == sorted(result)  # Verify heap ordering

    def test_memory_intensive_operation(self, benchmark):
        """Memory-intensive operation test"""
        def memory_operation():
            # Create many objects
            data = []
            for _ in range(10000):
                data.append({
                    'id': random.randint(1, 100000),
                    'name': ''.join(random.choices(string.ascii_letters, k=10)),
                    'values': [random.random() for _ in range(10)]
                })

            # Process data
            processed = []
            for item in data:
                if item['id'] % 2 == 0:
                    processed.append({
                        'id': item['id'],
                        'name_upper': item['name'].upper(),
                        'avg_value': sum(item['values']) / len(item['values'])
                    })

            return len(processed)

        result = benchmark(memory_operation)
        assert result > 0

    def test_benchmark_with_setup_teardown(self, benchmark):
        """Benchmark test with setup and teardown"""
        def setup():
            # Prepare test data
            return {
                'data': [random.randint(1, 1000) for _ in range(1000)],
                'lookup': set(random.randint(1, 1000) for _ in range(100))
            }

        def test_function(test_data):
            # Actual function to test
            data = test_data['data']
            lookup = test_data['lookup']

            found = []
            for item in data:
                if item in lookup:
                    found.append(item)

            return len(found)

        def teardown(test_data):
            # Cleanup work
            test_data.clear()

        result = benchmark.pedantic(
            test_function,
            setup=setup,
            teardown=teardown,
            rounds=5,
            iterations=10
        )

        assert result >= 0

    def test_custom_timer_benchmark(self, benchmark):
        """Custom timer benchmark test"""
        @benchmark
        def timed_operation():
            # Simulate an operation that needs precise timing
            start = time.perf_counter()

            # Execute some computation
            result = 0
            for i in range(10000):
                result += i ** 0.5

            end = time.perf_counter()

            # Return result and execution time
            return result, end - start

        result, duration = timed_operation
        assert result > 0
        assert duration > 0

Performance Monitoring and Analysis

# test_performance_monitoring.py
import pytest
import psutil
import os
import threading
import time
from contextlib import contextmanager
import gc

class PerformanceMonitor:
    """Performance monitor"""

    def __init__(self):
        self.process = psutil.Process(os.getpid())
        self.baseline_memory = None
        self.peak_memory = None
        self.start_time = None
        self.end_time = None

    def start_monitoring(self):
        """Start monitoring"""
        gc.collect()  # Trigger garbage collection
        self.baseline_memory = self.process.memory_info().rss / 1024 / 1024
        self.peak_memory = self.baseline_memory
        self.start_time = time.perf_counter()

    def update_peak_memory(self):
        """Update peak memory"""
        current_memory = self.process.memory_info().rss / 1024 / 1024
        if current_memory > self.peak_memory:
            self.peak_memory = current_memory

    def stop_monitoring(self):
        """Stop monitoring"""
        self.end_time = time.perf_counter()
        self.update_peak_memory()

    def get_metrics(self):
        """Get performance metrics"""
        return {
            'duration': self.end_time - self.start_time if self.end_time else None,
            'baseline_memory_mb': self.baseline_memory,
            'peak_memory_mb': self.peak_memory,
            'memory_increase_mb': self.peak_memory - self.baseline_memory if self.baseline_memory else None,
            'cpu_percent': self.process.cpu_percent()
        }

@contextmanager
def performance_monitor():
    """Performance monitoring context manager"""
    monitor = PerformanceMonitor()
    monitor.start_monitoring()

    # Start monitoring thread
    monitoring = True

    def monitor_thread():
        while monitoring:
            monitor.update_peak_memory()
            time.sleep(0.1)

    thread = threading.Thread(target=monitor_thread, daemon=True)
    thread.start()

    try:
        yield monitor
    finally:
        monitoring = False
        monitor.stop_monitoring()
        thread.join(timeout=1)

class TestPerformanceMonitoring:
    """Performance monitoring tests"""

    def test_memory_usage_tracking(self):
        """Memory usage tracking test"""
        with performance_monitor() as monitor:
            # Execute memory-intensive operation
            large_data = []
            for _ in range(100):
                large_data.append([random.randint(1, 1000) for _ in range(1000)])

            # Verify data
            assert len(large_data) == 100
            assert all(len(sublist) == 1000 for sublist in large_data)

        # Analyze performance metrics
        metrics = monitor.get_metrics()

        print(f"Execution time: {metrics['duration']:.3f} seconds")
        print(f"Baseline memory: {metrics['baseline_memory_mb']:.2f} MB")
        print(f"Peak memory: {metrics['peak_memory_mb']:.2f} MB")
        print(f"Memory increase: {metrics['memory_increase_mb']:.2f} MB")

        # Verify memory usage is reasonable
        assert metrics['duration'] < 5.0  # Execution time not exceeding 5 seconds
        assert metrics['memory_increase_mb'] < 100  # Memory increase not exceeding 100MB

    def test_cpu_intensive_monitoring(self):
        """CPU-intensive monitoring test"""
        with performance_monitor() as monitor:
            # CPU-intensive computation
            def fibonacci(n):
                if n <= 1:
                    return n
                return fibonacci(n-1) + fibonacci(n-2)

            result = fibonacci(25)
            assert result > 0

        metrics = monitor.get_metrics()

        print(f"CPU usage: {metrics['cpu_percent']:.2f}%")
        print(f"Computation time: {metrics['duration']:.3f} seconds")

        # Verify CPU usage
        assert metrics['duration'] > 0

    def test_concurrent_performance_monitoring(self):
        """Concurrent performance monitoring test"""
        def worker_task(task_id, duration):
            """Worker task"""
            start = time.time()
            while time.time() - start < duration:
                # Simulate workload
                _ = [i**2 for i in range(1000)]
            return task_id

        with performance_monitor() as monitor:
            threads = []

            # Create multiple concurrent tasks
            for i in range(4):
                thread = threading.Thread(
                    target=worker_task,
                    args=(i, 0.5)
                )
                threads.append(thread)
                thread.start()

            # Wait for all tasks to complete
            for thread in threads:
                thread.join()

        metrics = monitor.get_metrics()

        print(f"Concurrent execution time: {metrics['duration']:.3f} seconds")
        print(f"Memory usage: {metrics['memory_increase_mb']:.2f} MB")

        # Verify concurrent performance
        assert metrics['duration'] < 1.0  # Concurrent execution should be faster than sequential

    @pytest.mark.benchmark
    def test_benchmark_with_monitoring(self, benchmark):
        """Performance monitoring combined with benchmark testing"""
        def monitored_operation():
            with performance_monitor() as monitor:
                # Execute operation being tested
                data = [random.randint(1, 1000) for _ in range(10000)]
                sorted_data = sorted(data)

                return sorted_data, monitor.get_metrics()

        result, metrics = benchmark(monitored_operation)

        # Verify result
        assert len(result) == 10000
        assert result == sorted(result)

        # Print performance metrics
        print(f"Benchmark memory increase: {metrics['memory_increase_mb']:.2f} MB")
Performance Testing Best Practices
  1. Establish baseline: Establish performance baseline, track performance trends
  2. Environment consistency: Ensure test environment consistency and repeatability
  3. Multiple measurements: Perform multiple measurements to obtain reliable results
  4. Resource monitoring: Monitor CPU, memory, I/O and other system resources
  5. Progressive testing: Start with small data, gradually increase test scale
Important Notes
  1. Test isolation: Performance tests may affect execution of other tests
  2. System load: System load affects performance test results
  3. Garbage collection: Python garbage collection may affect performance measurement
  4. Concurrent safety: Ensure thread safety in concurrent tests

Performance testing and concurrency testing are important means to ensure application stability in production environment. Through systematic performance testing, performance issues can be identified and resolved early.