Chapter 13: Industry Application Case Studies

Haiyue
72min

Chapter 13: Industry Application Case Studies

Learning Objectives

  1. Understand YOLO’s application in autonomous driving
  2. Master the design ideas of intelligent surveillance systems
  3. Learn about object detection applications in industrial quality inspection
  4. Explore applications in medical imaging, retail, sports, and other fields

13.1 Autonomous Driving Application Cases

13.1.1 Object Detection Requirements in Autonomous Driving

🔄 正在渲染 Mermaid 图表...

13.1.2 Multi-camera Fusion Detection System

System Architecture Design

# autonomous_driving_system.py - Autonomous driving detection system
import cv2
import numpy as np
import torch
from ultralytics import YOLO
import threading
import queue
from dataclasses import dataclass
from typing import List, Dict, Tuple
import time

@dataclass
class DetectionResult:
    camera_id: int
    timestamp: float
    bbox: List[float]
    confidence: float
    class_name: str
    distance: float = None  # Distance estimation
    velocity: float = None  # Velocity estimation

@dataclass
class CameraConfig:
    camera_id: int
    position: str  # 'front', 'rear', 'left', 'right'
    fov: float  # Field of view
    height: float  # Camera height
    angle: float  # Mounting angle

class AutonomousDrivingDetector:
    def __init__(self, model_path: str, camera_configs: List[CameraConfig]):
        self.model = YOLO(model_path)
        self.camera_configs = {config.camera_id: config for config in camera_configs}

        # Object class mapping
        self.vehicle_classes = ['car', 'truck', 'bus', 'motorcycle']
        self.person_classes = ['person', 'bicycle']
        self.traffic_classes = ['traffic light', 'stop sign', 'traffic sign']

        # Tracking history
        self.tracking_history = {}
        self.detection_queues = {config.camera_id: queue.Queue() for config in camera_configs}

        # Safety-related detection thresholds
        self.safety_thresholds = {
            'emergency_brake_distance': 5.0,  # Emergency braking distance (meters)
            'warning_distance': 10.0,  # Warning distance (meters)
            'person_confidence_threshold': 0.8,  # Pedestrian detection confidence threshold
            'vehicle_confidence_threshold': 0.7  # Vehicle detection confidence threshold
        }

    def detect_frame(self, frame: np.ndarray, camera_id: int) -> List[DetectionResult]:
        """
        Single frame detection
        """
        camera_config = self.camera_configs[camera_id]
        results = self.model(frame, verbose=False)

        detections = []
        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    conf = box.conf[0].item()
                    cls = box.cls[0].item()
                    class_name = self.model.names[int(cls)]

                    # Set different confidence thresholds based on object type
                    threshold = self.get_confidence_threshold(class_name)
                    if conf < threshold:
                        continue

                    # Estimate distance
                    distance = self.estimate_distance(
                        bbox=[x1, y1, x2, y2],
                        class_name=class_name,
                        camera_config=camera_config,
                        frame_shape=frame.shape
                    )

                    detection = DetectionResult(
                        camera_id=camera_id,
                        timestamp=time.time(),
                        bbox=[x1, y1, x2, y2],
                        confidence=conf,
                        class_name=class_name,
                        distance=distance
                    )

                    detections.append(detection)

        return detections

    def get_confidence_threshold(self, class_name: str) -> float:
        """
        Get confidence threshold based on object type
        """
        if class_name in self.person_classes:
            return self.safety_thresholds['person_confidence_threshold']
        elif class_name in self.vehicle_classes:
            return self.safety_thresholds['vehicle_confidence_threshold']
        else:
            return 0.5

    def estimate_distance(self, bbox: List[float], class_name: str,
                         camera_config: CameraConfig, frame_shape: Tuple[int, int, int]) -> float:
        """
        Distance estimation based on monocular vision
        """
        # Simplified distance estimation model (requires more complex calibration in actual applications)
        x1, y1, x2, y2 = bbox
        object_height_pixels = y2 - y1
        frame_height = frame_shape[0]

        # Estimate real height based on object type
        real_height_map = {
            'person': 1.7,  # Average height of a person
            'car': 1.5,     # Average height of a car
            'truck': 3.0,   # Average height of a truck
            'bus': 3.2,     # Average height of a bus
            'bicycle': 1.0  # Average height of a bicycle
        }

        real_height = real_height_map.get(class_name, 1.5)

        # Simplified distance calculation formula
        focal_length = frame_height  # Simplified assumption
        distance = (real_height * focal_length) / object_height_pixels

        return max(distance, 1.0)  # Minimum distance limit

    def fusion_detection(self, multi_camera_detections: Dict[int, List[DetectionResult]]) -> List[Dict]:
        """
        Multi-camera detection result fusion
        """
        fused_results = []

        # Assign importance weights based on camera position
        camera_weights = {
            'front': 1.0,
            'left': 0.8,
            'right': 0.8,
            'rear': 0.6
        }

        for camera_id, detections in multi_camera_detections.items():
            camera_config = self.camera_configs[camera_id]
            weight = camera_weights.get(camera_config.position, 0.5)

            for det in detections:
                # Safety risk assessment
                risk_level = self.assess_safety_risk(det)

                fused_result = {
                    'detection': det,
                    'camera_position': camera_config.position,
                    'weight': weight,
                    'risk_level': risk_level,
                    'action_required': self.determine_action(det, risk_level)
                }

                fused_results.append(fused_result)

        # Sort by risk level and weight
        fused_results.sort(key=lambda x: (x['risk_level'], x['weight']), reverse=True)

        return fused_results

    def assess_safety_risk(self, detection: DetectionResult) -> str:
        """
        Assess safety risk level
        """
        distance = detection.distance
        class_name = detection.class_name

        if distance is None:
            return 'unknown'

        # More stringent risk assessment for pedestrians and cyclists
        if class_name in self.person_classes:
            if distance < 3.0:
                return 'critical'
            elif distance < 8.0:
                return 'high'
            elif distance < 15.0:
                return 'medium'
            else:
                return 'low'

        # Risk assessment for vehicles
        elif class_name in self.vehicle_classes:
            if distance < self.safety_thresholds['emergency_brake_distance']:
                return 'critical'
            elif distance < self.safety_thresholds['warning_distance']:
                return 'high'
            elif distance < 20.0:
                return 'medium'
            else:
                return 'low'

        return 'low'

    def determine_action(self, detection: DetectionResult, risk_level: str) -> str:
        """
        Determine action to be taken based on risk level
        """
        actions = {
            'critical': 'emergency_brake',
            'high': 'slow_down',
            'medium': 'monitor',
            'low': 'continue',
            'unknown': 'monitor'
        }

        return actions.get(risk_level, 'monitor')

    def run_multi_camera_detection(self, camera_sources: Dict[int, any]):
        """
        Run multi-camera detection system
        """
        # Create capture objects for each camera
        captures = {}
        for camera_id, source in camera_sources.items():
            cap = cv2.VideoCapture(source)
            if cap.isOpened():
                captures[camera_id] = cap
            else:
                print(f"Failed to open camera {camera_id}")

        try:
            while True:
                multi_camera_detections = {}

                # Capture frames from all cameras
                for camera_id, cap in captures.items():
                    ret, frame = cap.read()
                    if ret:
                        # Perform detection
                        detections = self.detect_frame(frame, camera_id)
                        multi_camera_detections[camera_id] = detections

                        # Visualize single camera results
                        annotated_frame = self.draw_detections(frame, detections, camera_id)
                        cv2.imshow(f'Camera {camera_id}', annotated_frame)

                # Fuse detection results
                if multi_camera_detections:
                    fused_results = self.fusion_detection(multi_camera_detections)

                    # Process fused results
                    self.process_fused_results(fused_results)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

        finally:
            for cap in captures.values():
                cap.release()
            cv2.destroyAllWindows()

    def draw_detections(self, frame: np.ndarray, detections: List[DetectionResult],
                       camera_id: int) -> np.ndarray:
        """
        Draw detection results
        """
        for det in detections:
            x1, y1, x2, y2 = map(int, det.bbox)

            # Choose color based on risk level
            risk_level = self.assess_safety_risk(det)
            colors = {
                'critical': (0, 0, 255),    # Red
                'high': (0, 165, 255),      # Orange
                'medium': (0, 255, 255),    # Yellow
                'low': (0, 255, 0),         # Green
                'unknown': (128, 128, 128)  # Gray
            }
            color = colors.get(risk_level, (0, 255, 0))

            # Draw bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            # Draw label information
            label = f"{det.class_name}: {det.confidence:.2f}"
            if det.distance:
                label += f" | {det.distance:.1f}m"

            cv2.putText(frame, label, (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Add camera information
        camera_info = f"Camera {camera_id} ({self.camera_configs[camera_id].position})"
        cv2.putText(frame, camera_info, (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        return frame

    def process_fused_results(self, fused_results: List[Dict]):
        """
        Process fused detection results
        """
        # Check if emergency braking is required
        critical_detections = [r for r in fused_results if r['risk_level'] == 'critical']
        if critical_detections:
            print("⚠️  CRITICAL: Emergency brake required!")
            for result in critical_detections:
                det = result['detection']
                print(f"   - {det.class_name} at {det.distance:.1f}m ({result['camera_position']} camera)")

        # Check if deceleration is required
        high_risk_detections = [r for r in fused_results if r['risk_level'] == 'high']
        if high_risk_detections:
            print("⚠️  WARNING: Slow down recommended")
            for result in high_risk_detections:
                det = result['detection']
                print(f"   - {det.class_name} at {det.distance:.1f}m ({result['camera_position']} camera)")

# Usage example
if __name__ == "__main__":
    # Configure cameras
    camera_configs = [
        CameraConfig(0, 'front', 60.0, 1.2, 0.0),
        CameraConfig(1, 'left', 45.0, 1.2, -45.0),
        CameraConfig(2, 'right', 45.0, 1.2, 45.0),
    ]

    # Initialize detection system
    detector = AutonomousDrivingDetector("yolov8n.pt", camera_configs)

    # Camera source configuration
    camera_sources = {
        0: 0,  # Front camera
        1: 1,  # Left camera
        2: 2,  # Right camera
    }

    # Run detection system
    detector.run_multi_camera_detection(camera_sources)

13.1.3 Traffic Sign Recognition Optimization

# traffic_sign_detector.py - Traffic sign detection optimization
class TrafficSignDetector:
    def __init__(self, model_path: str):
        self.model = YOLO(model_path)

        # Traffic sign class mapping
        self.sign_categories = {
            'regulatory': ['stop', 'yield', 'no_entry', 'speed_limit'],
            'warning': ['curve', 'intersection', 'school_zone'],
            'informational': ['highway_sign', 'direction_sign']
        }

        # Speed limit sign number recognition
        self.speed_limits = [20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

    def detect_traffic_signs(self, frame: np.ndarray) -> List[Dict]:
        """
        Detect traffic signs and perform OCR recognition
        """
        results = self.model(frame, verbose=False)
        signs = []

        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    conf = box.conf[0].item()
                    cls = box.cls[0].item()
                    class_name = self.model.names[int(cls)]

                    # Only process traffic signs
                    if self.is_traffic_sign(class_name):
                        # Extract sign region
                        sign_region = frame[int(y1):int(y2), int(x1):int(x2)]

                        # Special handling for speed limit signs
                        if 'speed' in class_name.lower():
                            speed_value = self.recognize_speed_limit(sign_region)
                            if speed_value:
                                class_name = f"speed_limit_{speed_value}"

                        sign_info = {
                            'bbox': [x1, y1, x2, y2],
                            'confidence': conf,
                            'class': class_name,
                            'category': self.get_sign_category(class_name),
                            'priority': self.get_sign_priority(class_name)
                        }

                        signs.append(sign_info)

        # Sort by priority
        signs.sort(key=lambda x: x['priority'], reverse=True)
        return signs

    def is_traffic_sign(self, class_name: str) -> bool:
        """
        Determine if it is a traffic sign
        """
        traffic_keywords = ['sign', 'stop', 'yield', 'speed', 'limit', 'warning']
        return any(keyword in class_name.lower() for keyword in traffic_keywords)

    def get_sign_category(self, class_name: str) -> str:
        """
        Get sign category
        """
        for category, signs in self.sign_categories.items():
            if any(sign in class_name.lower() for sign in signs):
                return category
        return 'other'

    def get_sign_priority(self, class_name: str) -> int:
        """
        Get sign priority
        """
        priority_map = {
            'stop': 10,
            'yield': 9,
            'speed_limit': 8,
            'no_entry': 7,
            'warning': 6,
            'informational': 3
        }

        for keyword, priority in priority_map.items():
            if keyword in class_name.lower():
                return priority
        return 1

    def recognize_speed_limit(self, sign_region: np.ndarray) -> int:
        """
        Recognize speed limit numbers (simplified version)
        """
        # OCR technology should be used here, simplified to simulation
        # In actual applications, PaddleOCR, EasyOCR, etc. can be used
        import random
        return random.choice(self.speed_limits)

13.2 Intelligent Surveillance System Cases

13.2.1 Video Surveillance Anomaly Detection

# intelligent_surveillance.py - Intelligent surveillance system
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict, deque
import time
from typing import Dict, List, Tuple
import json

class IntelligentSurveillanceSystem:
    def __init__(self, model_path: str, config_file: str = None):
        self.model = YOLO(model_path)

        # Monitoring configuration
        self.config = self.load_config(config_file) if config_file else self.default_config()

        # Tracking related
        self.track_history = defaultdict(lambda: deque(maxlen=30))
        self.person_count_history = deque(maxlen=100)
        self.alert_cooldown = {}  # Prevent duplicate alerts

        # Anomaly detection status
        self.background_subtractor = cv2.createBackgroundSubtractorMOG2()
        self.motion_threshold = 5000  # Motion pixel threshold

    def default_config(self) -> Dict:
        """
        Default monitoring configuration
        """
        return {
            'zones': {
                'entrance': {'coords': [0, 0, 100, 100], 'type': 'counting'},
                'restricted': {'coords': [200, 200, 400, 400], 'type': 'intrusion'},
                'exit': {'coords': [500, 0, 600, 100], 'type': 'counting'}
            },
            'alerts': {
                'max_persons': 10,
                'loitering_time': 300,  # 5 minutes
                'motion_detection': True,
                'person_counting': True,
                'intrusion_detection': True
            },
            'business_hours': {
                'start': '08:00',
                'end': '18:00'
            }
        }

    def load_config(self, config_file: str) -> Dict:
        """
        Load monitoring configuration
        """
        with open(config_file, 'r') as f:
            return json.load(f)

    def detect_and_track(self, frame: np.ndarray) -> Tuple[List[Dict], np.ndarray]:
        """
        Detect and track people
        """
        results = self.model.track(frame, persist=True, verbose=False)
        detections = []

        annotated_frame = frame.copy()

        for r in results:
            boxes = r.boxes
            if boxes is not None and boxes.id is not None:
                track_ids = boxes.id.int().cpu().tolist()
                confidences = boxes.conf.float().cpu().tolist()
                classes = boxes.cls.int().cpu().tolist()
                xyxy = boxes.xyxy.cpu().tolist()

                for track_id, conf, cls, bbox in zip(track_ids, confidences, classes, xyxy):
                    class_name = self.model.names[cls]

                    # Only track people
                    if class_name == 'person' and conf > 0.5:
                        x1, y1, x2, y2 = bbox
                        center = ((x1 + x2) / 2, (y1 + y2) / 2)

                        # Update tracking history
                        self.track_history[track_id].append({
                            'timestamp': time.time(),
                            'center': center,
                            'bbox': bbox,
                            'confidence': conf
                        })

                        detection = {
                            'track_id': track_id,
                            'bbox': bbox,
                            'center': center,
                            'confidence': conf,
                            'class': class_name
                        }

                        detections.append(detection)

                        # Draw tracking results
                        self.draw_track(annotated_frame, track_id, bbox, center)

        return detections, annotated_frame

    def draw_track(self, frame: np.ndarray, track_id: int, bbox: List[float], center: Tuple[float, float]):
        """
        Draw tracking trajectory
        """
        x1, y1, x2, y2 = map(int, bbox)
        cx, cy = map(int, center)

        # Draw bounding box
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

        # Draw center point
        cv2.circle(frame, (cx, cy), 5, (0, 0, 255), -1)

        # Draw trajectory
        if track_id in self.track_history:
            track_points = [pos['center'] for pos in self.track_history[track_id]]
            if len(track_points) > 1:
                for i in range(1, len(track_points)):
                    pt1 = tuple(map(int, track_points[i-1]))
                    pt2 = tuple(map(int, track_points[i]))
                    cv2.line(frame, pt1, pt2, (255, 0, 0), 2)

        # Draw ID
        cv2.putText(frame, f"ID:{track_id}", (x1, y1-10), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

    def analyze_behavior(self, detections: List[Dict]) -> List[Dict]:
        """
        Analyze behavior patterns
        """
        alerts = []
        current_time = time.time()

        # Person counting analysis
        person_count = len(detections)
        self.person_count_history.append(person_count)

        if person_count > self.config['alerts']['max_persons']:
            alerts.append({
                'type': 'overcrowding',
                'message': f"Person count ({person_count}) exceeds threshold",
                'severity': 'high',
                'timestamp': current_time
            })

        # Loitering detection
        for detection in detections:
            track_id = detection['track_id']
            if track_id in self.track_history:
                track_data = list(self.track_history[track_id])

                # Check if staying in the same area for too long
                if len(track_data) >= 20:  # At least 20 frames history
                    positions = [pos['center'] for pos in track_data[-20:]]
                    if self.is_loitering(positions):
                        alert_key = f"loitering_{track_id}"
                        if self.should_send_alert(alert_key):
                            alerts.append({
                                'type': 'loitering',
                                'track_id': track_id,
                                'message': f"Person {track_id} loitering detected",
                                'severity': 'medium',
                                'timestamp': current_time
                            })

        # Zone intrusion detection
        for zone_name, zone_config in self.config['zones'].items():
            if zone_config['type'] == 'intrusion':
                for detection in detections:
                    if self.is_in_zone(detection['center'], zone_config['coords']):
                        alert_key = f"intrusion_{zone_name}_{detection['track_id']}"
                        if self.should_send_alert(alert_key):
                            alerts.append({
                                'type': 'intrusion',
                                'zone': zone_name,
                                'track_id': detection['track_id'],
                                'message': f"Intrusion detected in {zone_name} zone",
                                'severity': 'high',
                                'timestamp': current_time
                            })

        return alerts

    def is_loitering(self, positions: List[Tuple[float, float]]) -> bool:
        """
        Detect if loitering
        """
        if len(positions) < 10:
            return False

        # Calculate standard deviation of positions
        x_coords = [pos[0] for pos in positions]
        y_coords = [pos[1] for pos in positions]

        x_std = np.std(x_coords)
        y_std = np.std(y_coords)

        # If movement range is small, it is considered loitering
        return x_std < 30 and y_std < 30

    def is_in_zone(self, point: Tuple[float, float], zone_coords: List[int]) -> bool:
        """
        Check if point is within the specified zone
        """
        x, y = point
        x1, y1, x2, y2 = zone_coords
        return x1 <= x <= x2 and y1 <= y <= y2

    def should_send_alert(self, alert_key: str, cooldown_seconds: int = 30) -> bool:
        """
        Check if an alert should be sent (prevent duplicate alerts)
        """
        current_time = time.time()
        if alert_key in self.alert_cooldown:
            if current_time - self.alert_cooldown[alert_key] < cooldown_seconds:
                return False

        self.alert_cooldown[alert_key] = current_time
        return True

    def detect_motion(self, frame: np.ndarray) -> bool:
        """
        Motion detection
        """
        fg_mask = self.background_subtractor.apply(frame)
        motion_pixels = cv2.countNonZero(fg_mask)
        return motion_pixels > self.motion_threshold

    def draw_zones(self, frame: np.ndarray):
        """
        Draw monitoring zones
        """
        for zone_name, zone_config in self.config['zones'].items():
            x1, y1, x2, y2 = zone_config['coords']
            color = (0, 0, 255) if zone_config['type'] == 'intrusion' else (255, 255, 0)

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, zone_name, (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    def run_surveillance(self, video_source=0):
        """
        Run surveillance system
        """
        cap = cv2.VideoCapture(video_source)

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Detect and track
            detections, annotated_frame = self.detect_and_track(frame)

            # Behavior analysis
            alerts = self.analyze_behavior(detections)

            # Motion detection
            motion_detected = self.detect_motion(frame)

            # Draw monitoring zones
            self.draw_zones(annotated_frame)

            # Display statistics
            person_count = len(detections)
            info_text = f"Persons: {person_count} | Motion: {'Yes' if motion_detected else 'No'}"
            cv2.putText(annotated_frame, info_text, (10, 30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

            # Process alerts
            if alerts:
                for alert in alerts:
                    print(f"🚨 ALERT: {alert['message']}")
                    # Display alerts on the interface
                    alert_text = f"ALERT: {alert['type']}"
                    cv2.putText(annotated_frame, alert_text, (10, 70), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)

            cv2.imshow('Intelligent Surveillance', annotated_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

# Usage example
if __name__ == "__main__":
    surveillance = IntelligentSurveillanceSystem("yolov8n.pt")
    surveillance.run_surveillance(0)

13.3 Industrial Quality Inspection Application Cases

13.3.1 Electronic Product Defect Detection

# industrial_qc_system.py - Industrial quality inspection system
import cv2
import numpy as np
from ultralytics import YOLO
import time
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import json
import sqlite3

@dataclass
class DefectDetection:
    product_id: str
    timestamp: float
    defect_type: str
    bbox: List[float]
    confidence: float
    severity: str  # 'minor', 'major', 'critical'
    image_path: Optional[str] = None

class IndustrialQCSystem:
    def __init__(self, model_path: str, db_path: str = "qc_database.db"):
        self.model = YOLO(model_path)
        self.db_path = db_path

        # Defect classification configuration
        self.defect_categories = {
            'surface_defects': ['scratch', 'dent', 'stain', 'corrosion'],
            'assembly_defects': ['missing_component', 'misalignment', 'loose_connection'],
            'dimensional_defects': ['oversized', 'undersized', 'warped'],
            'electrical_defects': ['short_circuit', 'open_circuit', 'poor_connection']
        }

        # Severity determination
        self.severity_rules = {
            'scratch': {'minor': 0.5, 'major': 0.7, 'critical': 0.9},
            'missing_component': {'minor': 0.0, 'major': 0.6, 'critical': 0.8},
            'short_circuit': {'minor': 0.0, 'major': 0.0, 'critical': 0.5}
        }

        # Quality statistics
        self.daily_stats = {
            'total_inspected': 0,
            'defects_found': 0,
            'pass_rate': 0.0
        }

        # Initialize database
        self.init_database()

    def init_database(self):
        """
        Initialize quality inspection database
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS inspections (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id TEXT,
                timestamp REAL,
                result TEXT,
                defect_count INTEGER,
                pass_fail TEXT
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS defects (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                inspection_id INTEGER,
                defect_type TEXT,
                confidence REAL,
                severity TEXT,
                bbox TEXT,
                FOREIGN KEY (inspection_id) REFERENCES inspections (id)
            )
        ''')

        conn.commit()
        conn.close()

    def detect_defects(self, image: np.ndarray, product_id: str) -> List[DefectDetection]:
        """
        Detect product defects
        """
        results = self.model(image, verbose=False)
        defects = []

        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    conf = box.conf[0].item()
                    cls = box.cls[0].item()
                    defect_type = self.model.names[int(cls)]

                    # Determine severity
                    severity = self.determine_severity(defect_type, conf)

                    defect = DefectDetection(
                        product_id=product_id,
                        timestamp=time.time(),
                        defect_type=defect_type,
                        bbox=[x1, y1, x2, y2],
                        confidence=conf,
                        severity=severity
                    )

                    defects.append(defect)

        return defects

    def determine_severity(self, defect_type: str, confidence: float) -> str:
        """
        Determine severity based on defect type and confidence
        """
        if defect_type in self.severity_rules:
            rules = self.severity_rules[defect_type]
            if confidence >= rules['critical']:
                return 'critical'
            elif confidence >= rules['major']:
                return 'major'
            else:
                return 'minor'
        else:
            # Default rules
            if confidence >= 0.8:
                return 'major'
            elif confidence >= 0.6:
                return 'minor'
            else:
                return 'minor'

    def inspect_product(self, image: np.ndarray, product_id: str) -> Dict:
        """
        Complete product inspection process
        """
        inspection_start = time.time()

        # Detect defects
        defects = self.detect_defects(image, product_id)

        # Quality assessment
        inspection_result = self.quality_assessment(defects)

        # Record inspection results
        self.record_inspection(product_id, defects, inspection_result)

        # Update statistics
        self.update_statistics(inspection_result['pass_fail'])

        inspection_time = time.time() - inspection_start

        return {
            'product_id': product_id,
            'defects': defects,
            'result': inspection_result,
            'inspection_time': inspection_time,
            'timestamp': inspection_start
        }

    def quality_assessment(self, defects: List[DefectDetection]) -> Dict:
        """
        Quality assessment and judgment
        """
        if not defects:
            return {
                'pass_fail': 'PASS',
                'grade': 'A',
                'defect_count': 0,
                'critical_defects': 0,
                'major_defects': 0,
                'minor_defects': 0
            }

        # Count defects of different severities
        critical_count = len([d for d in defects if d.severity == 'critical'])
        major_count = len([d for d in defects if d.severity == 'major'])
        minor_count = len([d for d in defects if d.severity == 'minor'])

        # Judgment rules
        if critical_count > 0:
            pass_fail = 'FAIL'
            grade = 'D'
        elif major_count > 2:
            pass_fail = 'FAIL'
            grade = 'C'
        elif major_count > 0 or minor_count > 5:
            pass_fail = 'CONDITIONAL'
            grade = 'B'
        else:
            pass_fail = 'PASS'
            grade = 'A'

        return {
            'pass_fail': pass_fail,
            'grade': grade,
            'defect_count': len(defects),
            'critical_defects': critical_count,
            'major_defects': major_count,
            'minor_defects': minor_count
        }

    def record_inspection(self, product_id: str, defects: List[DefectDetection], result: Dict):
        """
        Record inspection results to database
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Insert inspection record
        cursor.execute('''
            INSERT INTO inspections (product_id, timestamp, result, defect_count, pass_fail)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            product_id,
            time.time(),
            json.dumps(result),
            result['defect_count'],
            result['pass_fail']
        ))

        inspection_id = cursor.lastrowid

        # Insert defect record
        for defect in defects:
            cursor.execute('''
                INSERT INTO defects (inspection_id, defect_type, confidence, severity, bbox)
                VALUES (?, ?, ?, ?, ?)
            ''', (
                inspection_id,
                defect.defect_type,
                defect.confidence,
                defect.severity,
                json.dumps(defect.bbox)
            ))

        conn.commit()
        conn.close()

    def update_statistics(self, pass_fail: str):
        """
        Update quality statistics
        """
        self.daily_stats['total_inspected'] += 1

        if pass_fail == 'FAIL':
            self.daily_stats['defects_found'] += 1

        # Calculate pass rate
        self.daily_stats['pass_rate'] = (
            (self.daily_stats['total_inspected'] - self.daily_stats['defects_found']) /
            self.daily_stats['total_inspected'] * 100
        )

    def visualize_inspection(self, image: np.ndarray, defects: List[DefectDetection],
                           result: Dict) -> np.ndarray:
        """
        Visualize inspection results
        """
        annotated_image = image.copy()

        # Draw defects
        for defect in defects:
            x1, y1, x2, y2 = map(int, defect.bbox)

            # Choose color based on severity
            colors = {
                'critical': (0, 0, 255),    # Red
                'major': (0, 165, 255),     # Orange
                'minor': (0, 255, 255)      # Yellow
            }
            color = colors.get(defect.severity, (0, 255, 0))

            # Draw bounding box
            cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2)

            # Draw label
            label = f"{defect.defect_type} ({defect.severity}): {defect.confidence:.2f}"
            cv2.putText(annotated_image, label, (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Draw inspection result
        result_text = f"{result['pass_fail']} - Grade: {result['grade']}"
        result_color = (0, 255, 0) if result['pass_fail'] == 'PASS' else (0, 0, 255)
        cv2.putText(annotated_image, result_text, (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1.0, result_color, 2)

        # Draw defect statistics
        stats_text = f"Defects: {result['defect_count']} (C:{result['critical_defects']}, M:{result['major_defects']}, m:{result['minor_defects']})"
        cv2.putText(annotated_image, stats_text, (10, 70), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

        return annotated_image

    def run_conveyor_inspection(self, camera_source=0):
        """
        Conveyor belt inspection system
        """
        cap = cv2.VideoCapture(camera_source)
        product_counter = 0

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Detect product trigger (simplified: simulate by pressing spacebar)
            key = cv2.waitKey(1) & 0xFF
            if key == ord(' '):  # Spacebar triggers detection
                product_counter += 1
                product_id = f"PROD_{product_counter:06d}"

                print(f"\n🔍 Inspecting product: {product_id}")

                # Perform inspection
                inspection_result = self.inspect_product(frame, product_id)

                # Display results
                annotated_frame = self.visualize_inspection(
                    frame, inspection_result['defects'], inspection_result['result']
                )

                # Print inspection report
                self.print_inspection_report(inspection_result)

                cv2.waitKey(2000)  # Display results for 2 seconds

            # Display real-time video and statistics
            info_frame = frame.copy()
            self.draw_statistics(info_frame)

            cv2.imshow('Conveyor Belt - Press SPACE to inspect', info_frame)

            if key == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

    def draw_statistics(self, frame: np.ndarray):
        """
        Draw statistics
        """
        stats = self.daily_stats
        stats_text = [
            f"Inspected: {stats['total_inspected']}",
            f"Defective: {stats['defects_found']}",
            f"Pass Rate: {stats['pass_rate']:.1f}%"
        ]

        for i, text in enumerate(stats_text):
            cv2.putText(frame, text, (10, 30 + i * 30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

    def print_inspection_report(self, inspection_result: Dict):
        """
        Print inspection report
        """
        result = inspection_result['result']
        defects = inspection_result['defects']

        print(f"Product ID: {inspection_result['product_id']}")
        print(f"Inspection Time: {inspection_result['inspection_time']:.3f}s")
        print(f"Result: {result['pass_fail']} (Grade: {result['grade']})")
        print(f"Total Defects: {result['defect_count']}")

        if defects:
            print("Defect Details:")
            for defect in defects:
                print(f"  - {defect.defect_type} ({defect.severity}): {defect.confidence:.3f}")

    def generate_quality_report(self, days: int = 1) -> Dict:
        """
        Generate quality report
        """
        end_time = time.time()
        start_time = end_time - (days * 24 * 3600)

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Get inspection data within the time range
        cursor.execute('''
            SELECT pass_fail, COUNT(*) as count
            FROM inspections
            WHERE timestamp BETWEEN ? AND ?
            GROUP BY pass_fail
        ''', (start_time, end_time))

        pass_fail_stats = dict(cursor.fetchall())

        # Get defect type statistics
        cursor.execute('''
            SELECT d.defect_type, COUNT(*) as count, AVG(d.confidence) as avg_confidence
            FROM defects d
            JOIN inspections i ON d.inspection_id = i.id
            WHERE i.timestamp BETWEEN ? AND ?
            GROUP BY d.defect_type
            ORDER BY count DESC
        ''', (start_time, end_time))

        defect_stats = cursor.fetchall()

        conn.close()

        total_inspections = sum(pass_fail_stats.values())
        pass_count = pass_fail_stats.get('PASS', 0)
        pass_rate = (pass_count / total_inspections * 100) if total_inspections > 0 else 0

        return {
            'period_days': days,
            'total_inspections': total_inspections,
            'pass_rate': pass_rate,
            'pass_fail_breakdown': pass_fail_stats,
            'top_defects': defect_stats[:10],
            'generated_at': time.time()
        }

# Usage example
if __name__ == "__main__":
    qc_system = IndustrialQCSystem("yolov8n.pt")

    # Run conveyor belt inspection
    qc_system.run_conveyor_inspection(0)

    # Generate quality report
    report = qc_system.generate_quality_report(days=7)
    print("\n📊 Weekly Quality Report:")
    print(json.dumps(report, indent=2))

13.4 Medical Imaging Application Cases

13.4.1 Medical Image Lesion Detection

# medical_imaging_detector.py - Medical imaging detection system
import cv2
import numpy as np
from ultralytics import YOLO
import pydicom
from typing import Dict, List, Tuple, Optional
import json
from dataclasses import dataclass
import time

@dataclass
class MedicalDetection:
    patient_id: str
    study_id: str
    image_id: str
    finding_type: str
    bbox: List[float]
    confidence: float
    urgency: str  # 'routine', 'urgent', 'emergency'
    radiologist_review: bool = False

class MedicalImagingDetector:
    def __init__(self, model_path: str):
        self.model = YOLO(model_path)

        # Medical finding classification
        self.finding_categories = {
            'lung_nodules': ['nodule', 'mass', 'opacity'],
            'fractures': ['fracture', 'break', 'crack'],
            'infections': ['pneumonia', 'infiltrate', 'consolidation'],
            'tumors': ['tumor', 'neoplasm', 'cancer'],
            'vascular': ['aneurysm', 'embolism', 'thrombosis']
        }

        # Urgency determination rules
        self.urgency_rules = {
            'pneumothorax': 'emergency',
            'massive_stroke': 'emergency',
            'aortic_dissection': 'emergency',
            'pulmonary_embolism': 'urgent',
            'pneumonia': 'urgent',
            'nodule': 'routine',
            'fracture': 'urgent'
        }

    def load_dicom_image(self, dicom_path: str) -> Tuple[np.ndarray, Dict]:
        """
        Load DICOM medical image
        """
        # Read DICOM file
        ds = pydicom.dcmread(dicom_path)

        # Extract image data
        image = ds.pixel_array

        # Normalize image (convert to 8-bit)
        if image.dtype != np.uint8:
            image = ((image - image.min()) / (image.max() - image.min()) * 255).astype(np.uint8)

        # Convert to 3 channels (if grayscale)
        if len(image.shape) == 2:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)

        # Extract metadata
        metadata = {
            'patient_id': getattr(ds, 'PatientID', 'Unknown'),
            'study_id': getattr(ds, 'StudyInstanceUID', 'Unknown'),
            'series_id': getattr(ds, 'SeriesInstanceUID', 'Unknown'),
            'image_id': getattr(ds, 'SOPInstanceUID', 'Unknown'),
            'modality': getattr(ds, 'Modality', 'Unknown'),
            'body_part': getattr(ds, 'BodyPartExamined', 'Unknown'),
            'acquisition_date': getattr(ds, 'AcquisitionDate', 'Unknown')
        }

        return image, metadata

    def detect_findings(self, image: np.ndarray, metadata: Dict) -> List[MedicalDetection]:
        """
        Detect pathological findings in medical images
        """
        results = self.model(image, verbose=False)
        findings = []

        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    conf = box.conf[0].item()
                    cls = box.cls[0].item()
                    finding_type = self.model.names[int(cls)]

                    # Medical images require higher confidence thresholds
                    if conf < 0.7:
                        continue

                    # Determine urgency
                    urgency = self.determine_urgency(finding_type, conf)

                    # Determine if radiologist review is needed
                    needs_review = self.needs_radiologist_review(finding_type, conf)

                    detection = MedicalDetection(
                        patient_id=metadata['patient_id'],
                        study_id=metadata['study_id'],
                        image_id=metadata['image_id'],
                        finding_type=finding_type,
                        bbox=[x1, y1, x2, y2],
                        confidence=conf,
                        urgency=urgency,
                        radiologist_review=needs_review
                    )

                    findings.append(detection)

        return findings

    def determine_urgency(self, finding_type: str, confidence: float) -> str:
        """
        Determine the urgency of findings
        """
        # Check urgency for specific diseases
        for condition, urgency in self.urgency_rules.items():
            if condition.lower() in finding_type.lower():
                # High confidence emergencies
                if confidence > 0.9 and urgency == 'urgent':
                    return 'emergency'
                return urgency

        # Default to confidence-based judgment
        if confidence > 0.95:
            return 'urgent'
        else:
            return 'routine'

    def needs_radiologist_review(self, finding_type: str, confidence: float) -> bool:
        """
        Determine if radiologist review is needed
        """
        # Emergencies always require review
        urgency = self.determine_urgency(finding_type, confidence)
        if urgency in ['emergency', 'urgent']:
            return True

        # Low confidence findings require review
        if confidence < 0.8:
            return True

        # Specific types of findings require review
        high_stakes_findings = ['tumor', 'cancer', 'mass', 'aneurysm']
        if any(term in finding_type.lower() for term in high_stakes_findings):
            return True

        return False

    def generate_medical_report(self, image: np.ndarray, findings: List[MedicalDetection],
                              metadata: Dict) -> Dict:
        """
        Generate medical examination report
        """
        report = {
            'patient_info': {
                'patient_id': metadata['patient_id'],
                'study_id': metadata['study_id'],
                'modality': metadata['modality'],
                'body_part': metadata['body_part'],
                'exam_date': metadata['acquisition_date']
            },
            'ai_analysis': {
                'model_version': 'YOLOv8-Medical-v1.0',
                'analysis_timestamp': time.time(),
                'total_findings': len(findings),
                'findings_by_urgency': self.categorize_by_urgency(findings),
                'findings_requiring_review': len([f for f in findings if f.radiologist_review])
            },
            'findings': [],
            'recommendations': self.generate_recommendations(findings)
        }

        # Detailed findings list
        for finding in findings:
            finding_detail = {
                'type': finding.finding_type,
                'location_bbox': finding.bbox,
                'confidence': finding.confidence,
                'urgency': finding.urgency,
                'requires_radiologist_review': finding.radiologist_review,
                'clinical_notes': self.get_clinical_notes(finding.finding_type)
            }
            report['findings'].append(finding_detail)

        return report

    def categorize_by_urgency(self, findings: List[MedicalDetection]) -> Dict:
        """
        Categorize findings by urgency
        """
        categories = {'emergency': 0, 'urgent': 0, 'routine': 0}
        for finding in findings:
            categories[finding.urgency] += 1
        return categories

    def generate_recommendations(self, findings: List[MedicalDetection]) -> List[str]:
        """
        Generate clinical recommendations
        """
        recommendations = []

        # Check for emergencies
        emergency_findings = [f for f in findings if f.urgency == 'emergency']
        if emergency_findings:
            recommendations.append("🚨 IMMEDIATE ATTENTION REQUIRED - Emergency findings detected")
            recommendations.append("Contact attending physician immediately")

        # Check for urgent follow-ups
        urgent_findings = [f for f in findings if f.urgency == 'urgent']
        if urgent_findings:
            recommendations.append("⚡ Urgent follow-up recommended within 24-48 hours")

        # Check for radiologist review
        review_needed = [f for f in findings if f.radiologist_review]
        if review_needed:
            recommendations.append("👨‍⚕️ Radiologist review recommended for AI findings")

        # Recommendations based on specific finding types
        finding_types = [f.finding_type for f in findings]

        if any('nodule' in ft.lower() for ft in finding_types):
            recommendations.append("📋 Consider follow-up CT in 3-6 months for nodule surveillance")

        if any('fracture' in ft.lower() for ft in finding_types):
            recommendations.append("🦴 Orthopedic consultation may be required")

        if any('pneumonia' in ft.lower() for ft in finding_types):
            recommendations.append("💊 Consider antibiotic therapy and follow-up chest imaging")

        return recommendations

    def get_clinical_notes(self, finding_type: str) -> str:
        """
        Get clinical notes
        """
        clinical_notes = {
            'nodule': 'Pulmonary nodule identified. Size and characteristics should be evaluated for malignancy risk.',
            'pneumonia': 'Inflammatory changes consistent with pneumonia. Clinical correlation recommended.',
            'fracture': 'Bone discontinuity consistent with fracture. Assess for displacement and complications.',
            'mass': 'Space-occupying lesion identified. Further characterization with contrast studies may be needed.',
            'pneumothorax': 'Air in pleural space. Assess size and consider chest tube placement if significant.'
        }

        for key, note in clinical_notes.items():
            if key.lower() in finding_type.lower():
                return note

        return 'Abnormal finding detected. Clinical correlation and further evaluation recommended.'

    def visualize_medical_findings(self, image: np.ndarray, findings: List[MedicalDetection]) -> np.ndarray:
        """
        Visualize medical findings
        """
        annotated_image = image.copy()

        for finding in findings:
            x1, y1, x2, y2 = map(int, finding.bbox)

            # Choose color based on urgency
            colors = {
                'emergency': (0, 0, 255),    # Red
                'urgent': (0, 165, 255),     # Orange
                'routine': (0, 255, 255)     # Yellow
            }
            color = colors.get(finding.urgency, (0, 255, 0))

            # Draw bounding box
            thickness = 3 if finding.urgency == 'emergency' else 2
            cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, thickness)

            # Draw label
            label = f"{finding.finding_type}: {finding.confidence:.2f}"
            if finding.radiologist_review:
                label += " [REVIEW]"

            cv2.putText(annotated_image, label, (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

            # Emergency situation special marker
            if finding.urgency == 'emergency':
                cv2.putText(annotated_image, "EMERGENCY", (x1, y2+25), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)

        return annotated_image

    def batch_analysis(self, dicom_directory: str) -> List[Dict]:
        """
        Batch analyze DICOM files
        """
        import os
        import glob

        dicom_files = glob.glob(os.path.join(dicom_directory, "*.dcm"))
        results = []

        for dicom_file in dicom_files:
            try:
                print(f"Processing: {os.path.basename(dicom_file)}")

                # Load DICOM image
                image, metadata = self.load_dicom_image(dicom_file)

                # Detect findings
                findings = self.detect_findings(image, metadata)

                # Generate report
                report = self.generate_medical_report(image, findings, metadata)

                # Save visualization results
                if findings:
                    annotated_image = self.visualize_medical_findings(image, findings)
                    output_path = f"analysis_{metadata['patient_id']}_{metadata['image_id']}.jpg"
                    cv2.imwrite(output_path, annotated_image)
                    report['visualization_path'] = output_path

                results.append(report)

            except Exception as e:
                print(f"Error processing {dicom_file}: {e}")

        return results

# Usage example
if __name__ == "__main__":
    detector = MedicalImagingDetector("yolov8n-medical.pt")

    # Single DICOM file analysis
    image, metadata = detector.load_dicom_image("sample.dcm")
    findings = detector.detect_findings(image, metadata)
    report = detector.generate_medical_report(image, findings, metadata)

    print("🏥 Medical Imaging Analysis Report")
    print("=" * 50)
    print(json.dumps(report, indent=2))

    # Visualize results
    if findings:
        annotated_image = detector.visualize_medical_findings(image, findings)
        cv2.imshow('Medical Imaging Analysis', annotated_image)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    # Batch analysis
    # batch_results = detector.batch_analysis("/path/to/dicom/directory")

13.5 Retail and E-commerce Application Cases

13.5.1 Smart Shelf Monitoring System

# retail_monitoring_system.py - Retail monitoring system
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict, deque
import time
import json
from typing import Dict, List, Tuple
from dataclasses import dataclass

@dataclass
class ProductDetection:
    product_id: str
    product_name: str
    bbox: List[float]
    confidence: float
    shelf_zone: str
    stock_level: str  # 'full', 'medium', 'low', 'empty'

class RetailMonitoringSystem:
    def __init__(self, model_path: str, store_layout_config: str):
        self.model = YOLO(model_path)

        # Load store layout configuration
        with open(store_layout_config, 'r') as f:
            self.store_config = json.load(f)

        # Product catalog
        self.product_catalog = self.store_config.get('product_catalog', {})

        # Shelf zone definition
        self.shelf_zones = self.store_config.get('shelf_zones', {})

        # Inventory monitoring history
        self.stock_history = defaultdict(lambda: deque(maxlen=100))

        # Restock alerts
        self.restock_alerts = {}

    def detect_products(self, image: np.ndarray) -> List[ProductDetection]:
        """
        Detect products on shelves
        """
        results = self.model(image, verbose=False)
        detections = []

        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    conf = box.conf[0].item()
                    cls = box.cls[0].item()
                    class_name = self.model.names[int(cls)]

                    # Map to product ID
                    product_info = self.get_product_info(class_name)
                    if product_info:
                        # Determine shelf zone
                        shelf_zone = self.determine_shelf_zone([x1, y1, x2, y2])

                        # Assess stock level
                        stock_level = self.assess_stock_level(
                            product_info['product_id'], [x1, y1, x2, y2], shelf_zone
                        )

                        detection = ProductDetection(
                            product_id=product_info['product_id'],
                            product_name=product_info['name'],
                            bbox=[x1, y1, x2, y2],
                            confidence=conf,
                            shelf_zone=shelf_zone,
                            stock_level=stock_level
                        )

                        detections.append(detection)

        return detections

    def get_product_info(self, class_name: str) -> Dict:
        """
        Get product information based on detection class
        """
        for product_id, product_data in self.product_catalog.items():
            if class_name.lower() in product_data.get('detection_keywords', []):
                return {
                    'product_id': product_id,
                    'name': product_data['name'],
                    'category': product_data['category'],
                    'price': product_data['price']
                }
        return None

    def determine_shelf_zone(self, bbox: List[float]) -> str:
        """
        Determine the shelf zone where the product is located
        """
        x1, y1, x2, y2 = bbox
        center_x = (x1 + x2) / 2
        center_y = (y1 + y2) / 2

        for zone_name, zone_coords in self.shelf_zones.items():
            zx1, zy1, zx2, zy2 = zone_coords
            if zx1 <= center_x <= zx2 and zy1 <= center_y <= zy2:
                return zone_name

        return 'unknown'

    def assess_stock_level(self, product_id: str, bbox: List[float], shelf_zone: str) -> str:
        """
        Assess stock level
        """
        # Simplified stock assessment: based on bounding box size and position
        x1, y1, x2, y2 = bbox
        area = (x2 - x1) * (y2 - y1)

        # Adjust thresholds based on product type and shelf zone
        zone_config = self.store_config.get('zone_configs', {}).get(shelf_zone, {})
        area_thresholds = zone_config.get('stock_thresholds', {
            'full': 5000,
            'medium': 3000,
            'low': 1000
        })

        if area > area_thresholds['full']:
            return 'full'
        elif area > area_thresholds['medium']:
            return 'medium'
        elif area > area_thresholds['low']:
            return 'low'
        else:
            return 'empty'

    def analyze_shelf_status(self, detections: List[ProductDetection]) -> Dict:
        """
        Analyze shelf status
        """
        shelf_analysis = defaultdict(lambda: {
            'total_products': 0,
            'stock_levels': defaultdict(int),
            'products': []
        })

        for detection in detections:
            zone = detection.shelf_zone
            shelf_analysis[zone]['total_products'] += 1
            shelf_analysis[zone]['stock_levels'][detection.stock_level] += 1
            shelf_analysis[zone]['products'].append({
                'product_id': detection.product_id,
                'product_name': detection.product_name,
                'stock_level': detection.stock_level,
                'confidence': detection.confidence
            })

        # Generate restock recommendations
        restock_recommendations = []
        for zone, analysis in shelf_analysis.items():
            low_stock_count = analysis['stock_levels']['low'] + analysis['stock_levels']['empty']
            total_count = analysis['total_products']

            if total_count > 0 and low_stock_count / total_count > 0.3:  # More than 30% low stock
                restock_recommendations.append({
                    'zone': zone,
                    'urgency': 'high' if low_stock_count / total_count > 0.5 else 'medium',
                    'low_stock_products': [p for p in analysis['products'] if p['stock_level'] in ['low', 'empty']]
                })

        return {
            'shelf_analysis': dict(shelf_analysis),
            'restock_recommendations': restock_recommendations,
            'overall_health': self.calculate_overall_health(shelf_analysis)
        }

    def calculate_overall_health(self, shelf_analysis: Dict) -> Dict:
        """
        Calculate overall shelf health status
        """
        total_products = sum(analysis['total_products'] for analysis in shelf_analysis.values())
        total_low_stock = sum(
            analysis['stock_levels']['low'] + analysis['stock_levels']['empty']
            for analysis in shelf_analysis.values()
        )

        if total_products == 0:
            health_score = 0
        else:
            health_score = max(0, 100 - (total_low_stock / total_products * 100))

        health_status = 'excellent' if health_score >= 90 else \
                       'good' if health_score >= 75 else \
                       'fair' if health_score >= 60 else 'poor'

        return {
            'score': health_score,
            'status': health_status,
            'total_products': total_products,
            'low_stock_items': total_low_stock
        }

    def customer_behavior_analysis(self, image: np.ndarray) -> Dict:
        """
        Customer behavior analysis
        """
        # Detect customers
        results = self.model(image, verbose=False)
        customers = []

        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    cls = box.cls[0].item()
                    class_name = self.model.names[int(cls)]

                    if class_name == 'person':  # Customer detection
                        x1, y1, x2, y2 = box.xyxy[0].tolist()
                        conf = box.conf[0].item()

                        # Analyze customer location
                        zone = self.determine_shelf_zone([x1, y1, x2, y2])
                        customers.append({
                            'bbox': [x1, y1, x2, y2],
                            'confidence': conf,
                            'zone': zone
                        })

        # Analyze hot zones
        zone_popularity = defaultdict(int)
        for customer in customers:
            if customer['zone'] != 'unknown':
                zone_popularity[customer['zone']] += 1

        return {
            'customer_count': len(customers),
            'zone_popularity': dict(zone_popularity),
            'customers': customers
        }

    def visualize_retail_monitoring(self, image: np.ndarray, detections: List[ProductDetection],
                                  shelf_analysis: Dict, customer_analysis: Dict) -> np.ndarray:
        """
        Visualize retail monitoring results
        """
        annotated_image = image.copy()

        # Draw shelf zones
        for zone_name, zone_coords in self.shelf_zones.items():
            x1, y1, x2, y2 = zone_coords

            # Choose color based on stock status
            zone_health = 'good'  # Default
            for rec in shelf_analysis.get('restock_recommendations', []):
                if rec['zone'] == zone_name:
                    zone_health = rec['urgency']

            colors = {
                'good': (0, 255, 0),      # Green
                'medium': (0, 255, 255),  # Yellow
                'high': (0, 0, 255)       # Red
            }
            color = colors.get(zone_health, (128, 128, 128))

            cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2)
            cv2.putText(annotated_image, zone_name, (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        # Draw product detections
        for detection in detections:
            x1, y1, x2, y2 = map(int, detection.bbox)

            # Choose color based on stock level
            stock_colors = {
                'full': (0, 255, 0),      # Green
                'medium': (0, 255, 255),  # Yellow
                'low': (0, 165, 255),     # Orange
                'empty': (0, 0, 255)      # Red
            }
            color = stock_colors.get(detection.stock_level, (128, 128, 128))

            cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2)

            # Product label
            label = f"{detection.product_name} ({detection.stock_level})"
            cv2.putText(annotated_image, label, (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Draw customer locations
        for customer in customer_analysis['customers']:
            x1, y1, x2, y2 = map(int, customer['bbox'])
            cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (255, 0, 255), 2)
            cv2.putText(annotated_image, "Customer", (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 2)

        # Display statistics
        health = shelf_analysis['overall_health']
        info_text = [
            f"Health Score: {health['score']:.1f}% ({health['status']})",
            f"Products: {health['total_products']} | Low Stock: {health['low_stock_items']}",
            f"Customers: {customer_analysis['customer_count']}"
        ]

        for i, text in enumerate(info_text):
            cv2.putText(annotated_image, text, (10, 30 + i * 25), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

        return annotated_image

    def run_retail_monitoring(self, camera_source=0):
        """
        Run retail monitoring system
        """
        cap = cv2.VideoCapture(camera_source)

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Product detection
            detections = self.detect_products(frame)

            # Shelf analysis
            shelf_analysis = self.analyze_shelf_status(detections)

            # Customer behavior analysis
            customer_analysis = self.customer_behavior_analysis(frame)

            # Visualize
            annotated_frame = self.visualize_retail_monitoring(
                frame, detections, shelf_analysis, customer_analysis
            )

            # Process restock alerts
            for recommendation in shelf_analysis['restock_recommendations']:
                if recommendation['urgency'] == 'high':
                    print(f"🚨 High Priority Restock: Zone {recommendation['zone']}")

            cv2.imshow('Retail Monitoring System', annotated_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

    def generate_daily_report(self) -> Dict:
        """
        Generate daily report
        """
        # This should generate a report from historical data
        # Simplified implementation
        return {
            'date': time.strftime('%Y-%m-%d'),
            'total_restocks_needed': 5,
            'popular_zones': ['zone_a', 'zone_c'],
            'peak_customer_hours': ['10:00-12:00', '14:00-16:00'],
            'inventory_turnover': 85.2
        }

# Usage example and configuration file
store_layout_config = {
    "shelf_zones": {
        "zone_a": [0, 0, 200, 300],
        "zone_b": [200, 0, 400, 300],
        "zone_c": [400, 0, 600, 300]
    },
    "product_catalog": {
        "PROD001": {
            "name": "Cola",
            "category": "Beverages",
            "price": 1.99,
            "detection_keywords": ["bottle", "cola", "soda"]
        },
        "PROD002": {
            "name": "Bread",
            "category": "Bakery",
            "price": 2.49,
            "detection_keywords": ["bread", "loaf"]
        }
    },
    "zone_configs": {
        "zone_a": {"stock_thresholds": {"full": 8000, "medium": 5000, "low": 2000}},
        "zone_b": {"stock_thresholds": {"full": 6000, "medium": 4000, "low": 1500}},
        "zone_c": {"stock_thresholds": {"full": 7000, "medium": 4500, "low": 1800}}
    }
}

# Save configuration file
with open('store_layout.json', 'w') as f:
    json.dump(store_layout_config, f, indent=2)

if __name__ == "__main__":
    system = RetailMonitoringSystem("yolov8n.pt", "store_layout.json")
    system.run_retail_monitoring(0)

Chapter Summary

Through the industry application case studies in this chapter, we have gained an in-depth understanding of YOLO’s practical application modes and technical key points in different fields:

  1. Autonomous Driving Applications: Multi-camera fusion, real-time requirements, safety risk assessment
  2. Intelligent Surveillance Systems: Behavior analysis, anomaly detection, zone management
  3. Industrial Quality Inspection: Defect classification, quality assessment, production process integration
  4. Medical Imaging: High precision requirements, clinical decision support, urgency determination
  5. Retail Applications: Inventory monitoring, customer behavior analysis, intelligent replenishment

Each application area has its specific technical challenges and solutions:

  • Safety-critical scenarios require extremely high reliability and real-time performance
  • Medical applications demand high precision and interpretability
  • Industrial environments focus on stability and integrability
  • Retail scenarios emphasize business value and user experience

These cases demonstrate the wide applicability and strong potential of YOLO technology, providing valuable references and guidance for us to apply YOLO in actual projects.

In the next chapter, we will explore the cutting-edge technological developments and future trends of YOLO, understanding the development direction and potential breakthroughs of this technology.