Chapter 13: Industry Application Case Studies
Haiyue
72min
Chapter 13: Industry Application Case Studies
Learning Objectives
- Understand YOLO’s application in autonomous driving
- Master the design ideas of intelligent surveillance systems
- Learn about object detection applications in industrial quality inspection
- Explore applications in medical imaging, retail, sports, and other fields
13.1 Autonomous Driving Application Cases
13.1.1 Object Detection Requirements in Autonomous Driving
🔄 正在渲染 Mermaid 图表...
13.1.2 Multi-camera Fusion Detection System
System Architecture Design
# autonomous_driving_system.py - Autonomous driving detection system
import cv2
import numpy as np
import torch
from ultralytics import YOLO
import threading
import queue
from dataclasses import dataclass
from typing import List, Dict, Tuple
import time
@dataclass
class DetectionResult:
camera_id: int
timestamp: float
bbox: List[float]
confidence: float
class_name: str
distance: float = None # Distance estimation
velocity: float = None # Velocity estimation
@dataclass
class CameraConfig:
camera_id: int
position: str # 'front', 'rear', 'left', 'right'
fov: float # Field of view
height: float # Camera height
angle: float # Mounting angle
class AutonomousDrivingDetector:
def __init__(self, model_path: str, camera_configs: List[CameraConfig]):
self.model = YOLO(model_path)
self.camera_configs = {config.camera_id: config for config in camera_configs}
# Object class mapping
self.vehicle_classes = ['car', 'truck', 'bus', 'motorcycle']
self.person_classes = ['person', 'bicycle']
self.traffic_classes = ['traffic light', 'stop sign', 'traffic sign']
# Tracking history
self.tracking_history = {}
self.detection_queues = {config.camera_id: queue.Queue() for config in camera_configs}
# Safety-related detection thresholds
self.safety_thresholds = {
'emergency_brake_distance': 5.0, # Emergency braking distance (meters)
'warning_distance': 10.0, # Warning distance (meters)
'person_confidence_threshold': 0.8, # Pedestrian detection confidence threshold
'vehicle_confidence_threshold': 0.7 # Vehicle detection confidence threshold
}
def detect_frame(self, frame: np.ndarray, camera_id: int) -> List[DetectionResult]:
"""
Single frame detection
"""
camera_config = self.camera_configs[camera_id]
results = self.model(frame, verbose=False)
detections = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = box.cls[0].item()
class_name = self.model.names[int(cls)]
# Set different confidence thresholds based on object type
threshold = self.get_confidence_threshold(class_name)
if conf < threshold:
continue
# Estimate distance
distance = self.estimate_distance(
bbox=[x1, y1, x2, y2],
class_name=class_name,
camera_config=camera_config,
frame_shape=frame.shape
)
detection = DetectionResult(
camera_id=camera_id,
timestamp=time.time(),
bbox=[x1, y1, x2, y2],
confidence=conf,
class_name=class_name,
distance=distance
)
detections.append(detection)
return detections
def get_confidence_threshold(self, class_name: str) -> float:
"""
Get confidence threshold based on object type
"""
if class_name in self.person_classes:
return self.safety_thresholds['person_confidence_threshold']
elif class_name in self.vehicle_classes:
return self.safety_thresholds['vehicle_confidence_threshold']
else:
return 0.5
def estimate_distance(self, bbox: List[float], class_name: str,
camera_config: CameraConfig, frame_shape: Tuple[int, int, int]) -> float:
"""
Distance estimation based on monocular vision
"""
# Simplified distance estimation model (requires more complex calibration in actual applications)
x1, y1, x2, y2 = bbox
object_height_pixels = y2 - y1
frame_height = frame_shape[0]
# Estimate real height based on object type
real_height_map = {
'person': 1.7, # Average height of a person
'car': 1.5, # Average height of a car
'truck': 3.0, # Average height of a truck
'bus': 3.2, # Average height of a bus
'bicycle': 1.0 # Average height of a bicycle
}
real_height = real_height_map.get(class_name, 1.5)
# Simplified distance calculation formula
focal_length = frame_height # Simplified assumption
distance = (real_height * focal_length) / object_height_pixels
return max(distance, 1.0) # Minimum distance limit
def fusion_detection(self, multi_camera_detections: Dict[int, List[DetectionResult]]) -> List[Dict]:
"""
Multi-camera detection result fusion
"""
fused_results = []
# Assign importance weights based on camera position
camera_weights = {
'front': 1.0,
'left': 0.8,
'right': 0.8,
'rear': 0.6
}
for camera_id, detections in multi_camera_detections.items():
camera_config = self.camera_configs[camera_id]
weight = camera_weights.get(camera_config.position, 0.5)
for det in detections:
# Safety risk assessment
risk_level = self.assess_safety_risk(det)
fused_result = {
'detection': det,
'camera_position': camera_config.position,
'weight': weight,
'risk_level': risk_level,
'action_required': self.determine_action(det, risk_level)
}
fused_results.append(fused_result)
# Sort by risk level and weight
fused_results.sort(key=lambda x: (x['risk_level'], x['weight']), reverse=True)
return fused_results
def assess_safety_risk(self, detection: DetectionResult) -> str:
"""
Assess safety risk level
"""
distance = detection.distance
class_name = detection.class_name
if distance is None:
return 'unknown'
# More stringent risk assessment for pedestrians and cyclists
if class_name in self.person_classes:
if distance < 3.0:
return 'critical'
elif distance < 8.0:
return 'high'
elif distance < 15.0:
return 'medium'
else:
return 'low'
# Risk assessment for vehicles
elif class_name in self.vehicle_classes:
if distance < self.safety_thresholds['emergency_brake_distance']:
return 'critical'
elif distance < self.safety_thresholds['warning_distance']:
return 'high'
elif distance < 20.0:
return 'medium'
else:
return 'low'
return 'low'
def determine_action(self, detection: DetectionResult, risk_level: str) -> str:
"""
Determine action to be taken based on risk level
"""
actions = {
'critical': 'emergency_brake',
'high': 'slow_down',
'medium': 'monitor',
'low': 'continue',
'unknown': 'monitor'
}
return actions.get(risk_level, 'monitor')
def run_multi_camera_detection(self, camera_sources: Dict[int, any]):
"""
Run multi-camera detection system
"""
# Create capture objects for each camera
captures = {}
for camera_id, source in camera_sources.items():
cap = cv2.VideoCapture(source)
if cap.isOpened():
captures[camera_id] = cap
else:
print(f"Failed to open camera {camera_id}")
try:
while True:
multi_camera_detections = {}
# Capture frames from all cameras
for camera_id, cap in captures.items():
ret, frame = cap.read()
if ret:
# Perform detection
detections = self.detect_frame(frame, camera_id)
multi_camera_detections[camera_id] = detections
# Visualize single camera results
annotated_frame = self.draw_detections(frame, detections, camera_id)
cv2.imshow(f'Camera {camera_id}', annotated_frame)
# Fuse detection results
if multi_camera_detections:
fused_results = self.fusion_detection(multi_camera_detections)
# Process fused results
self.process_fused_results(fused_results)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
for cap in captures.values():
cap.release()
cv2.destroyAllWindows()
def draw_detections(self, frame: np.ndarray, detections: List[DetectionResult],
camera_id: int) -> np.ndarray:
"""
Draw detection results
"""
for det in detections:
x1, y1, x2, y2 = map(int, det.bbox)
# Choose color based on risk level
risk_level = self.assess_safety_risk(det)
colors = {
'critical': (0, 0, 255), # Red
'high': (0, 165, 255), # Orange
'medium': (0, 255, 255), # Yellow
'low': (0, 255, 0), # Green
'unknown': (128, 128, 128) # Gray
}
color = colors.get(risk_level, (0, 255, 0))
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# Draw label information
label = f"{det.class_name}: {det.confidence:.2f}"
if det.distance:
label += f" | {det.distance:.1f}m"
cv2.putText(frame, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Add camera information
camera_info = f"Camera {camera_id} ({self.camera_configs[camera_id].position})"
cv2.putText(frame, camera_info, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return frame
def process_fused_results(self, fused_results: List[Dict]):
"""
Process fused detection results
"""
# Check if emergency braking is required
critical_detections = [r for r in fused_results if r['risk_level'] == 'critical']
if critical_detections:
print("⚠️ CRITICAL: Emergency brake required!")
for result in critical_detections:
det = result['detection']
print(f" - {det.class_name} at {det.distance:.1f}m ({result['camera_position']} camera)")
# Check if deceleration is required
high_risk_detections = [r for r in fused_results if r['risk_level'] == 'high']
if high_risk_detections:
print("⚠️ WARNING: Slow down recommended")
for result in high_risk_detections:
det = result['detection']
print(f" - {det.class_name} at {det.distance:.1f}m ({result['camera_position']} camera)")
# Usage example
if __name__ == "__main__":
# Configure cameras
camera_configs = [
CameraConfig(0, 'front', 60.0, 1.2, 0.0),
CameraConfig(1, 'left', 45.0, 1.2, -45.0),
CameraConfig(2, 'right', 45.0, 1.2, 45.0),
]
# Initialize detection system
detector = AutonomousDrivingDetector("yolov8n.pt", camera_configs)
# Camera source configuration
camera_sources = {
0: 0, # Front camera
1: 1, # Left camera
2: 2, # Right camera
}
# Run detection system
detector.run_multi_camera_detection(camera_sources)
13.1.3 Traffic Sign Recognition Optimization
# traffic_sign_detector.py - Traffic sign detection optimization
class TrafficSignDetector:
def __init__(self, model_path: str):
self.model = YOLO(model_path)
# Traffic sign class mapping
self.sign_categories = {
'regulatory': ['stop', 'yield', 'no_entry', 'speed_limit'],
'warning': ['curve', 'intersection', 'school_zone'],
'informational': ['highway_sign', 'direction_sign']
}
# Speed limit sign number recognition
self.speed_limits = [20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]
def detect_traffic_signs(self, frame: np.ndarray) -> List[Dict]:
"""
Detect traffic signs and perform OCR recognition
"""
results = self.model(frame, verbose=False)
signs = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = box.cls[0].item()
class_name = self.model.names[int(cls)]
# Only process traffic signs
if self.is_traffic_sign(class_name):
# Extract sign region
sign_region = frame[int(y1):int(y2), int(x1):int(x2)]
# Special handling for speed limit signs
if 'speed' in class_name.lower():
speed_value = self.recognize_speed_limit(sign_region)
if speed_value:
class_name = f"speed_limit_{speed_value}"
sign_info = {
'bbox': [x1, y1, x2, y2],
'confidence': conf,
'class': class_name,
'category': self.get_sign_category(class_name),
'priority': self.get_sign_priority(class_name)
}
signs.append(sign_info)
# Sort by priority
signs.sort(key=lambda x: x['priority'], reverse=True)
return signs
def is_traffic_sign(self, class_name: str) -> bool:
"""
Determine if it is a traffic sign
"""
traffic_keywords = ['sign', 'stop', 'yield', 'speed', 'limit', 'warning']
return any(keyword in class_name.lower() for keyword in traffic_keywords)
def get_sign_category(self, class_name: str) -> str:
"""
Get sign category
"""
for category, signs in self.sign_categories.items():
if any(sign in class_name.lower() for sign in signs):
return category
return 'other'
def get_sign_priority(self, class_name: str) -> int:
"""
Get sign priority
"""
priority_map = {
'stop': 10,
'yield': 9,
'speed_limit': 8,
'no_entry': 7,
'warning': 6,
'informational': 3
}
for keyword, priority in priority_map.items():
if keyword in class_name.lower():
return priority
return 1
def recognize_speed_limit(self, sign_region: np.ndarray) -> int:
"""
Recognize speed limit numbers (simplified version)
"""
# OCR technology should be used here, simplified to simulation
# In actual applications, PaddleOCR, EasyOCR, etc. can be used
import random
return random.choice(self.speed_limits)
13.2 Intelligent Surveillance System Cases
13.2.1 Video Surveillance Anomaly Detection
# intelligent_surveillance.py - Intelligent surveillance system
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict, deque
import time
from typing import Dict, List, Tuple
import json
class IntelligentSurveillanceSystem:
def __init__(self, model_path: str, config_file: str = None):
self.model = YOLO(model_path)
# Monitoring configuration
self.config = self.load_config(config_file) if config_file else self.default_config()
# Tracking related
self.track_history = defaultdict(lambda: deque(maxlen=30))
self.person_count_history = deque(maxlen=100)
self.alert_cooldown = {} # Prevent duplicate alerts
# Anomaly detection status
self.background_subtractor = cv2.createBackgroundSubtractorMOG2()
self.motion_threshold = 5000 # Motion pixel threshold
def default_config(self) -> Dict:
"""
Default monitoring configuration
"""
return {
'zones': {
'entrance': {'coords': [0, 0, 100, 100], 'type': 'counting'},
'restricted': {'coords': [200, 200, 400, 400], 'type': 'intrusion'},
'exit': {'coords': [500, 0, 600, 100], 'type': 'counting'}
},
'alerts': {
'max_persons': 10,
'loitering_time': 300, # 5 minutes
'motion_detection': True,
'person_counting': True,
'intrusion_detection': True
},
'business_hours': {
'start': '08:00',
'end': '18:00'
}
}
def load_config(self, config_file: str) -> Dict:
"""
Load monitoring configuration
"""
with open(config_file, 'r') as f:
return json.load(f)
def detect_and_track(self, frame: np.ndarray) -> Tuple[List[Dict], np.ndarray]:
"""
Detect and track people
"""
results = self.model.track(frame, persist=True, verbose=False)
detections = []
annotated_frame = frame.copy()
for r in results:
boxes = r.boxes
if boxes is not None and boxes.id is not None:
track_ids = boxes.id.int().cpu().tolist()
confidences = boxes.conf.float().cpu().tolist()
classes = boxes.cls.int().cpu().tolist()
xyxy = boxes.xyxy.cpu().tolist()
for track_id, conf, cls, bbox in zip(track_ids, confidences, classes, xyxy):
class_name = self.model.names[cls]
# Only track people
if class_name == 'person' and conf > 0.5:
x1, y1, x2, y2 = bbox
center = ((x1 + x2) / 2, (y1 + y2) / 2)
# Update tracking history
self.track_history[track_id].append({
'timestamp': time.time(),
'center': center,
'bbox': bbox,
'confidence': conf
})
detection = {
'track_id': track_id,
'bbox': bbox,
'center': center,
'confidence': conf,
'class': class_name
}
detections.append(detection)
# Draw tracking results
self.draw_track(annotated_frame, track_id, bbox, center)
return detections, annotated_frame
def draw_track(self, frame: np.ndarray, track_id: int, bbox: List[float], center: Tuple[float, float]):
"""
Draw tracking trajectory
"""
x1, y1, x2, y2 = map(int, bbox)
cx, cy = map(int, center)
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw center point
cv2.circle(frame, (cx, cy), 5, (0, 0, 255), -1)
# Draw trajectory
if track_id in self.track_history:
track_points = [pos['center'] for pos in self.track_history[track_id]]
if len(track_points) > 1:
for i in range(1, len(track_points)):
pt1 = tuple(map(int, track_points[i-1]))
pt2 = tuple(map(int, track_points[i]))
cv2.line(frame, pt1, pt2, (255, 0, 0), 2)
# Draw ID
cv2.putText(frame, f"ID:{track_id}", (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
def analyze_behavior(self, detections: List[Dict]) -> List[Dict]:
"""
Analyze behavior patterns
"""
alerts = []
current_time = time.time()
# Person counting analysis
person_count = len(detections)
self.person_count_history.append(person_count)
if person_count > self.config['alerts']['max_persons']:
alerts.append({
'type': 'overcrowding',
'message': f"Person count ({person_count}) exceeds threshold",
'severity': 'high',
'timestamp': current_time
})
# Loitering detection
for detection in detections:
track_id = detection['track_id']
if track_id in self.track_history:
track_data = list(self.track_history[track_id])
# Check if staying in the same area for too long
if len(track_data) >= 20: # At least 20 frames history
positions = [pos['center'] for pos in track_data[-20:]]
if self.is_loitering(positions):
alert_key = f"loitering_{track_id}"
if self.should_send_alert(alert_key):
alerts.append({
'type': 'loitering',
'track_id': track_id,
'message': f"Person {track_id} loitering detected",
'severity': 'medium',
'timestamp': current_time
})
# Zone intrusion detection
for zone_name, zone_config in self.config['zones'].items():
if zone_config['type'] == 'intrusion':
for detection in detections:
if self.is_in_zone(detection['center'], zone_config['coords']):
alert_key = f"intrusion_{zone_name}_{detection['track_id']}"
if self.should_send_alert(alert_key):
alerts.append({
'type': 'intrusion',
'zone': zone_name,
'track_id': detection['track_id'],
'message': f"Intrusion detected in {zone_name} zone",
'severity': 'high',
'timestamp': current_time
})
return alerts
def is_loitering(self, positions: List[Tuple[float, float]]) -> bool:
"""
Detect if loitering
"""
if len(positions) < 10:
return False
# Calculate standard deviation of positions
x_coords = [pos[0] for pos in positions]
y_coords = [pos[1] for pos in positions]
x_std = np.std(x_coords)
y_std = np.std(y_coords)
# If movement range is small, it is considered loitering
return x_std < 30 and y_std < 30
def is_in_zone(self, point: Tuple[float, float], zone_coords: List[int]) -> bool:
"""
Check if point is within the specified zone
"""
x, y = point
x1, y1, x2, y2 = zone_coords
return x1 <= x <= x2 and y1 <= y <= y2
def should_send_alert(self, alert_key: str, cooldown_seconds: int = 30) -> bool:
"""
Check if an alert should be sent (prevent duplicate alerts)
"""
current_time = time.time()
if alert_key in self.alert_cooldown:
if current_time - self.alert_cooldown[alert_key] < cooldown_seconds:
return False
self.alert_cooldown[alert_key] = current_time
return True
def detect_motion(self, frame: np.ndarray) -> bool:
"""
Motion detection
"""
fg_mask = self.background_subtractor.apply(frame)
motion_pixels = cv2.countNonZero(fg_mask)
return motion_pixels > self.motion_threshold
def draw_zones(self, frame: np.ndarray):
"""
Draw monitoring zones
"""
for zone_name, zone_config in self.config['zones'].items():
x1, y1, x2, y2 = zone_config['coords']
color = (0, 0, 255) if zone_config['type'] == 'intrusion' else (255, 255, 0)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(frame, zone_name, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
def run_surveillance(self, video_source=0):
"""
Run surveillance system
"""
cap = cv2.VideoCapture(video_source)
while True:
ret, frame = cap.read()
if not ret:
break
# Detect and track
detections, annotated_frame = self.detect_and_track(frame)
# Behavior analysis
alerts = self.analyze_behavior(detections)
# Motion detection
motion_detected = self.detect_motion(frame)
# Draw monitoring zones
self.draw_zones(annotated_frame)
# Display statistics
person_count = len(detections)
info_text = f"Persons: {person_count} | Motion: {'Yes' if motion_detected else 'No'}"
cv2.putText(annotated_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
# Process alerts
if alerts:
for alert in alerts:
print(f"🚨 ALERT: {alert['message']}")
# Display alerts on the interface
alert_text = f"ALERT: {alert['type']}"
cv2.putText(annotated_frame, alert_text, (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
cv2.imshow('Intelligent Surveillance', annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
# Usage example
if __name__ == "__main__":
surveillance = IntelligentSurveillanceSystem("yolov8n.pt")
surveillance.run_surveillance(0)
13.3 Industrial Quality Inspection Application Cases
13.3.1 Electronic Product Defect Detection
# industrial_qc_system.py - Industrial quality inspection system
import cv2
import numpy as np
from ultralytics import YOLO
import time
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import json
import sqlite3
@dataclass
class DefectDetection:
product_id: str
timestamp: float
defect_type: str
bbox: List[float]
confidence: float
severity: str # 'minor', 'major', 'critical'
image_path: Optional[str] = None
class IndustrialQCSystem:
def __init__(self, model_path: str, db_path: str = "qc_database.db"):
self.model = YOLO(model_path)
self.db_path = db_path
# Defect classification configuration
self.defect_categories = {
'surface_defects': ['scratch', 'dent', 'stain', 'corrosion'],
'assembly_defects': ['missing_component', 'misalignment', 'loose_connection'],
'dimensional_defects': ['oversized', 'undersized', 'warped'],
'electrical_defects': ['short_circuit', 'open_circuit', 'poor_connection']
}
# Severity determination
self.severity_rules = {
'scratch': {'minor': 0.5, 'major': 0.7, 'critical': 0.9},
'missing_component': {'minor': 0.0, 'major': 0.6, 'critical': 0.8},
'short_circuit': {'minor': 0.0, 'major': 0.0, 'critical': 0.5}
}
# Quality statistics
self.daily_stats = {
'total_inspected': 0,
'defects_found': 0,
'pass_rate': 0.0
}
# Initialize database
self.init_database()
def init_database(self):
"""
Initialize quality inspection database
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS inspections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
timestamp REAL,
result TEXT,
defect_count INTEGER,
pass_fail TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS defects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
inspection_id INTEGER,
defect_type TEXT,
confidence REAL,
severity TEXT,
bbox TEXT,
FOREIGN KEY (inspection_id) REFERENCES inspections (id)
)
''')
conn.commit()
conn.close()
def detect_defects(self, image: np.ndarray, product_id: str) -> List[DefectDetection]:
"""
Detect product defects
"""
results = self.model(image, verbose=False)
defects = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = box.cls[0].item()
defect_type = self.model.names[int(cls)]
# Determine severity
severity = self.determine_severity(defect_type, conf)
defect = DefectDetection(
product_id=product_id,
timestamp=time.time(),
defect_type=defect_type,
bbox=[x1, y1, x2, y2],
confidence=conf,
severity=severity
)
defects.append(defect)
return defects
def determine_severity(self, defect_type: str, confidence: float) -> str:
"""
Determine severity based on defect type and confidence
"""
if defect_type in self.severity_rules:
rules = self.severity_rules[defect_type]
if confidence >= rules['critical']:
return 'critical'
elif confidence >= rules['major']:
return 'major'
else:
return 'minor'
else:
# Default rules
if confidence >= 0.8:
return 'major'
elif confidence >= 0.6:
return 'minor'
else:
return 'minor'
def inspect_product(self, image: np.ndarray, product_id: str) -> Dict:
"""
Complete product inspection process
"""
inspection_start = time.time()
# Detect defects
defects = self.detect_defects(image, product_id)
# Quality assessment
inspection_result = self.quality_assessment(defects)
# Record inspection results
self.record_inspection(product_id, defects, inspection_result)
# Update statistics
self.update_statistics(inspection_result['pass_fail'])
inspection_time = time.time() - inspection_start
return {
'product_id': product_id,
'defects': defects,
'result': inspection_result,
'inspection_time': inspection_time,
'timestamp': inspection_start
}
def quality_assessment(self, defects: List[DefectDetection]) -> Dict:
"""
Quality assessment and judgment
"""
if not defects:
return {
'pass_fail': 'PASS',
'grade': 'A',
'defect_count': 0,
'critical_defects': 0,
'major_defects': 0,
'minor_defects': 0
}
# Count defects of different severities
critical_count = len([d for d in defects if d.severity == 'critical'])
major_count = len([d for d in defects if d.severity == 'major'])
minor_count = len([d for d in defects if d.severity == 'minor'])
# Judgment rules
if critical_count > 0:
pass_fail = 'FAIL'
grade = 'D'
elif major_count > 2:
pass_fail = 'FAIL'
grade = 'C'
elif major_count > 0 or minor_count > 5:
pass_fail = 'CONDITIONAL'
grade = 'B'
else:
pass_fail = 'PASS'
grade = 'A'
return {
'pass_fail': pass_fail,
'grade': grade,
'defect_count': len(defects),
'critical_defects': critical_count,
'major_defects': major_count,
'minor_defects': minor_count
}
def record_inspection(self, product_id: str, defects: List[DefectDetection], result: Dict):
"""
Record inspection results to database
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Insert inspection record
cursor.execute('''
INSERT INTO inspections (product_id, timestamp, result, defect_count, pass_fail)
VALUES (?, ?, ?, ?, ?)
''', (
product_id,
time.time(),
json.dumps(result),
result['defect_count'],
result['pass_fail']
))
inspection_id = cursor.lastrowid
# Insert defect record
for defect in defects:
cursor.execute('''
INSERT INTO defects (inspection_id, defect_type, confidence, severity, bbox)
VALUES (?, ?, ?, ?, ?)
''', (
inspection_id,
defect.defect_type,
defect.confidence,
defect.severity,
json.dumps(defect.bbox)
))
conn.commit()
conn.close()
def update_statistics(self, pass_fail: str):
"""
Update quality statistics
"""
self.daily_stats['total_inspected'] += 1
if pass_fail == 'FAIL':
self.daily_stats['defects_found'] += 1
# Calculate pass rate
self.daily_stats['pass_rate'] = (
(self.daily_stats['total_inspected'] - self.daily_stats['defects_found']) /
self.daily_stats['total_inspected'] * 100
)
def visualize_inspection(self, image: np.ndarray, defects: List[DefectDetection],
result: Dict) -> np.ndarray:
"""
Visualize inspection results
"""
annotated_image = image.copy()
# Draw defects
for defect in defects:
x1, y1, x2, y2 = map(int, defect.bbox)
# Choose color based on severity
colors = {
'critical': (0, 0, 255), # Red
'major': (0, 165, 255), # Orange
'minor': (0, 255, 255) # Yellow
}
color = colors.get(defect.severity, (0, 255, 0))
# Draw bounding box
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2)
# Draw label
label = f"{defect.defect_type} ({defect.severity}): {defect.confidence:.2f}"
cv2.putText(annotated_image, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Draw inspection result
result_text = f"{result['pass_fail']} - Grade: {result['grade']}"
result_color = (0, 255, 0) if result['pass_fail'] == 'PASS' else (0, 0, 255)
cv2.putText(annotated_image, result_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, result_color, 2)
# Draw defect statistics
stats_text = f"Defects: {result['defect_count']} (C:{result['critical_defects']}, M:{result['major_defects']}, m:{result['minor_defects']})"
cv2.putText(annotated_image, stats_text, (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return annotated_image
def run_conveyor_inspection(self, camera_source=0):
"""
Conveyor belt inspection system
"""
cap = cv2.VideoCapture(camera_source)
product_counter = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Detect product trigger (simplified: simulate by pressing spacebar)
key = cv2.waitKey(1) & 0xFF
if key == ord(' '): # Spacebar triggers detection
product_counter += 1
product_id = f"PROD_{product_counter:06d}"
print(f"\n🔍 Inspecting product: {product_id}")
# Perform inspection
inspection_result = self.inspect_product(frame, product_id)
# Display results
annotated_frame = self.visualize_inspection(
frame, inspection_result['defects'], inspection_result['result']
)
# Print inspection report
self.print_inspection_report(inspection_result)
cv2.waitKey(2000) # Display results for 2 seconds
# Display real-time video and statistics
info_frame = frame.copy()
self.draw_statistics(info_frame)
cv2.imshow('Conveyor Belt - Press SPACE to inspect', info_frame)
if key == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def draw_statistics(self, frame: np.ndarray):
"""
Draw statistics
"""
stats = self.daily_stats
stats_text = [
f"Inspected: {stats['total_inspected']}",
f"Defective: {stats['defects_found']}",
f"Pass Rate: {stats['pass_rate']:.1f}%"
]
for i, text in enumerate(stats_text):
cv2.putText(frame, text, (10, 30 + i * 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
def print_inspection_report(self, inspection_result: Dict):
"""
Print inspection report
"""
result = inspection_result['result']
defects = inspection_result['defects']
print(f"Product ID: {inspection_result['product_id']}")
print(f"Inspection Time: {inspection_result['inspection_time']:.3f}s")
print(f"Result: {result['pass_fail']} (Grade: {result['grade']})")
print(f"Total Defects: {result['defect_count']}")
if defects:
print("Defect Details:")
for defect in defects:
print(f" - {defect.defect_type} ({defect.severity}): {defect.confidence:.3f}")
def generate_quality_report(self, days: int = 1) -> Dict:
"""
Generate quality report
"""
end_time = time.time()
start_time = end_time - (days * 24 * 3600)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get inspection data within the time range
cursor.execute('''
SELECT pass_fail, COUNT(*) as count
FROM inspections
WHERE timestamp BETWEEN ? AND ?
GROUP BY pass_fail
''', (start_time, end_time))
pass_fail_stats = dict(cursor.fetchall())
# Get defect type statistics
cursor.execute('''
SELECT d.defect_type, COUNT(*) as count, AVG(d.confidence) as avg_confidence
FROM defects d
JOIN inspections i ON d.inspection_id = i.id
WHERE i.timestamp BETWEEN ? AND ?
GROUP BY d.defect_type
ORDER BY count DESC
''', (start_time, end_time))
defect_stats = cursor.fetchall()
conn.close()
total_inspections = sum(pass_fail_stats.values())
pass_count = pass_fail_stats.get('PASS', 0)
pass_rate = (pass_count / total_inspections * 100) if total_inspections > 0 else 0
return {
'period_days': days,
'total_inspections': total_inspections,
'pass_rate': pass_rate,
'pass_fail_breakdown': pass_fail_stats,
'top_defects': defect_stats[:10],
'generated_at': time.time()
}
# Usage example
if __name__ == "__main__":
qc_system = IndustrialQCSystem("yolov8n.pt")
# Run conveyor belt inspection
qc_system.run_conveyor_inspection(0)
# Generate quality report
report = qc_system.generate_quality_report(days=7)
print("\n📊 Weekly Quality Report:")
print(json.dumps(report, indent=2))
13.4 Medical Imaging Application Cases
13.4.1 Medical Image Lesion Detection
# medical_imaging_detector.py - Medical imaging detection system
import cv2
import numpy as np
from ultralytics import YOLO
import pydicom
from typing import Dict, List, Tuple, Optional
import json
from dataclasses import dataclass
import time
@dataclass
class MedicalDetection:
patient_id: str
study_id: str
image_id: str
finding_type: str
bbox: List[float]
confidence: float
urgency: str # 'routine', 'urgent', 'emergency'
radiologist_review: bool = False
class MedicalImagingDetector:
def __init__(self, model_path: str):
self.model = YOLO(model_path)
# Medical finding classification
self.finding_categories = {
'lung_nodules': ['nodule', 'mass', 'opacity'],
'fractures': ['fracture', 'break', 'crack'],
'infections': ['pneumonia', 'infiltrate', 'consolidation'],
'tumors': ['tumor', 'neoplasm', 'cancer'],
'vascular': ['aneurysm', 'embolism', 'thrombosis']
}
# Urgency determination rules
self.urgency_rules = {
'pneumothorax': 'emergency',
'massive_stroke': 'emergency',
'aortic_dissection': 'emergency',
'pulmonary_embolism': 'urgent',
'pneumonia': 'urgent',
'nodule': 'routine',
'fracture': 'urgent'
}
def load_dicom_image(self, dicom_path: str) -> Tuple[np.ndarray, Dict]:
"""
Load DICOM medical image
"""
# Read DICOM file
ds = pydicom.dcmread(dicom_path)
# Extract image data
image = ds.pixel_array
# Normalize image (convert to 8-bit)
if image.dtype != np.uint8:
image = ((image - image.min()) / (image.max() - image.min()) * 255).astype(np.uint8)
# Convert to 3 channels (if grayscale)
if len(image.shape) == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
# Extract metadata
metadata = {
'patient_id': getattr(ds, 'PatientID', 'Unknown'),
'study_id': getattr(ds, 'StudyInstanceUID', 'Unknown'),
'series_id': getattr(ds, 'SeriesInstanceUID', 'Unknown'),
'image_id': getattr(ds, 'SOPInstanceUID', 'Unknown'),
'modality': getattr(ds, 'Modality', 'Unknown'),
'body_part': getattr(ds, 'BodyPartExamined', 'Unknown'),
'acquisition_date': getattr(ds, 'AcquisitionDate', 'Unknown')
}
return image, metadata
def detect_findings(self, image: np.ndarray, metadata: Dict) -> List[MedicalDetection]:
"""
Detect pathological findings in medical images
"""
results = self.model(image, verbose=False)
findings = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = box.cls[0].item()
finding_type = self.model.names[int(cls)]
# Medical images require higher confidence thresholds
if conf < 0.7:
continue
# Determine urgency
urgency = self.determine_urgency(finding_type, conf)
# Determine if radiologist review is needed
needs_review = self.needs_radiologist_review(finding_type, conf)
detection = MedicalDetection(
patient_id=metadata['patient_id'],
study_id=metadata['study_id'],
image_id=metadata['image_id'],
finding_type=finding_type,
bbox=[x1, y1, x2, y2],
confidence=conf,
urgency=urgency,
radiologist_review=needs_review
)
findings.append(detection)
return findings
def determine_urgency(self, finding_type: str, confidence: float) -> str:
"""
Determine the urgency of findings
"""
# Check urgency for specific diseases
for condition, urgency in self.urgency_rules.items():
if condition.lower() in finding_type.lower():
# High confidence emergencies
if confidence > 0.9 and urgency == 'urgent':
return 'emergency'
return urgency
# Default to confidence-based judgment
if confidence > 0.95:
return 'urgent'
else:
return 'routine'
def needs_radiologist_review(self, finding_type: str, confidence: float) -> bool:
"""
Determine if radiologist review is needed
"""
# Emergencies always require review
urgency = self.determine_urgency(finding_type, confidence)
if urgency in ['emergency', 'urgent']:
return True
# Low confidence findings require review
if confidence < 0.8:
return True
# Specific types of findings require review
high_stakes_findings = ['tumor', 'cancer', 'mass', 'aneurysm']
if any(term in finding_type.lower() for term in high_stakes_findings):
return True
return False
def generate_medical_report(self, image: np.ndarray, findings: List[MedicalDetection],
metadata: Dict) -> Dict:
"""
Generate medical examination report
"""
report = {
'patient_info': {
'patient_id': metadata['patient_id'],
'study_id': metadata['study_id'],
'modality': metadata['modality'],
'body_part': metadata['body_part'],
'exam_date': metadata['acquisition_date']
},
'ai_analysis': {
'model_version': 'YOLOv8-Medical-v1.0',
'analysis_timestamp': time.time(),
'total_findings': len(findings),
'findings_by_urgency': self.categorize_by_urgency(findings),
'findings_requiring_review': len([f for f in findings if f.radiologist_review])
},
'findings': [],
'recommendations': self.generate_recommendations(findings)
}
# Detailed findings list
for finding in findings:
finding_detail = {
'type': finding.finding_type,
'location_bbox': finding.bbox,
'confidence': finding.confidence,
'urgency': finding.urgency,
'requires_radiologist_review': finding.radiologist_review,
'clinical_notes': self.get_clinical_notes(finding.finding_type)
}
report['findings'].append(finding_detail)
return report
def categorize_by_urgency(self, findings: List[MedicalDetection]) -> Dict:
"""
Categorize findings by urgency
"""
categories = {'emergency': 0, 'urgent': 0, 'routine': 0}
for finding in findings:
categories[finding.urgency] += 1
return categories
def generate_recommendations(self, findings: List[MedicalDetection]) -> List[str]:
"""
Generate clinical recommendations
"""
recommendations = []
# Check for emergencies
emergency_findings = [f for f in findings if f.urgency == 'emergency']
if emergency_findings:
recommendations.append("🚨 IMMEDIATE ATTENTION REQUIRED - Emergency findings detected")
recommendations.append("Contact attending physician immediately")
# Check for urgent follow-ups
urgent_findings = [f for f in findings if f.urgency == 'urgent']
if urgent_findings:
recommendations.append("⚡ Urgent follow-up recommended within 24-48 hours")
# Check for radiologist review
review_needed = [f for f in findings if f.radiologist_review]
if review_needed:
recommendations.append("👨⚕️ Radiologist review recommended for AI findings")
# Recommendations based on specific finding types
finding_types = [f.finding_type for f in findings]
if any('nodule' in ft.lower() for ft in finding_types):
recommendations.append("📋 Consider follow-up CT in 3-6 months for nodule surveillance")
if any('fracture' in ft.lower() for ft in finding_types):
recommendations.append("🦴 Orthopedic consultation may be required")
if any('pneumonia' in ft.lower() for ft in finding_types):
recommendations.append("💊 Consider antibiotic therapy and follow-up chest imaging")
return recommendations
def get_clinical_notes(self, finding_type: str) -> str:
"""
Get clinical notes
"""
clinical_notes = {
'nodule': 'Pulmonary nodule identified. Size and characteristics should be evaluated for malignancy risk.',
'pneumonia': 'Inflammatory changes consistent with pneumonia. Clinical correlation recommended.',
'fracture': 'Bone discontinuity consistent with fracture. Assess for displacement and complications.',
'mass': 'Space-occupying lesion identified. Further characterization with contrast studies may be needed.',
'pneumothorax': 'Air in pleural space. Assess size and consider chest tube placement if significant.'
}
for key, note in clinical_notes.items():
if key.lower() in finding_type.lower():
return note
return 'Abnormal finding detected. Clinical correlation and further evaluation recommended.'
def visualize_medical_findings(self, image: np.ndarray, findings: List[MedicalDetection]) -> np.ndarray:
"""
Visualize medical findings
"""
annotated_image = image.copy()
for finding in findings:
x1, y1, x2, y2 = map(int, finding.bbox)
# Choose color based on urgency
colors = {
'emergency': (0, 0, 255), # Red
'urgent': (0, 165, 255), # Orange
'routine': (0, 255, 255) # Yellow
}
color = colors.get(finding.urgency, (0, 255, 0))
# Draw bounding box
thickness = 3 if finding.urgency == 'emergency' else 2
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, thickness)
# Draw label
label = f"{finding.finding_type}: {finding.confidence:.2f}"
if finding.radiologist_review:
label += " [REVIEW]"
cv2.putText(annotated_image, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# Emergency situation special marker
if finding.urgency == 'emergency':
cv2.putText(annotated_image, "EMERGENCY", (x1, y2+25),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
return annotated_image
def batch_analysis(self, dicom_directory: str) -> List[Dict]:
"""
Batch analyze DICOM files
"""
import os
import glob
dicom_files = glob.glob(os.path.join(dicom_directory, "*.dcm"))
results = []
for dicom_file in dicom_files:
try:
print(f"Processing: {os.path.basename(dicom_file)}")
# Load DICOM image
image, metadata = self.load_dicom_image(dicom_file)
# Detect findings
findings = self.detect_findings(image, metadata)
# Generate report
report = self.generate_medical_report(image, findings, metadata)
# Save visualization results
if findings:
annotated_image = self.visualize_medical_findings(image, findings)
output_path = f"analysis_{metadata['patient_id']}_{metadata['image_id']}.jpg"
cv2.imwrite(output_path, annotated_image)
report['visualization_path'] = output_path
results.append(report)
except Exception as e:
print(f"Error processing {dicom_file}: {e}")
return results
# Usage example
if __name__ == "__main__":
detector = MedicalImagingDetector("yolov8n-medical.pt")
# Single DICOM file analysis
image, metadata = detector.load_dicom_image("sample.dcm")
findings = detector.detect_findings(image, metadata)
report = detector.generate_medical_report(image, findings, metadata)
print("🏥 Medical Imaging Analysis Report")
print("=" * 50)
print(json.dumps(report, indent=2))
# Visualize results
if findings:
annotated_image = detector.visualize_medical_findings(image, findings)
cv2.imshow('Medical Imaging Analysis', annotated_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# Batch analysis
# batch_results = detector.batch_analysis("/path/to/dicom/directory")
13.5 Retail and E-commerce Application Cases
13.5.1 Smart Shelf Monitoring System
# retail_monitoring_system.py - Retail monitoring system
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict, deque
import time
import json
from typing import Dict, List, Tuple
from dataclasses import dataclass
@dataclass
class ProductDetection:
product_id: str
product_name: str
bbox: List[float]
confidence: float
shelf_zone: str
stock_level: str # 'full', 'medium', 'low', 'empty'
class RetailMonitoringSystem:
def __init__(self, model_path: str, store_layout_config: str):
self.model = YOLO(model_path)
# Load store layout configuration
with open(store_layout_config, 'r') as f:
self.store_config = json.load(f)
# Product catalog
self.product_catalog = self.store_config.get('product_catalog', {})
# Shelf zone definition
self.shelf_zones = self.store_config.get('shelf_zones', {})
# Inventory monitoring history
self.stock_history = defaultdict(lambda: deque(maxlen=100))
# Restock alerts
self.restock_alerts = {}
def detect_products(self, image: np.ndarray) -> List[ProductDetection]:
"""
Detect products on shelves
"""
results = self.model(image, verbose=False)
detections = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = box.cls[0].item()
class_name = self.model.names[int(cls)]
# Map to product ID
product_info = self.get_product_info(class_name)
if product_info:
# Determine shelf zone
shelf_zone = self.determine_shelf_zone([x1, y1, x2, y2])
# Assess stock level
stock_level = self.assess_stock_level(
product_info['product_id'], [x1, y1, x2, y2], shelf_zone
)
detection = ProductDetection(
product_id=product_info['product_id'],
product_name=product_info['name'],
bbox=[x1, y1, x2, y2],
confidence=conf,
shelf_zone=shelf_zone,
stock_level=stock_level
)
detections.append(detection)
return detections
def get_product_info(self, class_name: str) -> Dict:
"""
Get product information based on detection class
"""
for product_id, product_data in self.product_catalog.items():
if class_name.lower() in product_data.get('detection_keywords', []):
return {
'product_id': product_id,
'name': product_data['name'],
'category': product_data['category'],
'price': product_data['price']
}
return None
def determine_shelf_zone(self, bbox: List[float]) -> str:
"""
Determine the shelf zone where the product is located
"""
x1, y1, x2, y2 = bbox
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
for zone_name, zone_coords in self.shelf_zones.items():
zx1, zy1, zx2, zy2 = zone_coords
if zx1 <= center_x <= zx2 and zy1 <= center_y <= zy2:
return zone_name
return 'unknown'
def assess_stock_level(self, product_id: str, bbox: List[float], shelf_zone: str) -> str:
"""
Assess stock level
"""
# Simplified stock assessment: based on bounding box size and position
x1, y1, x2, y2 = bbox
area = (x2 - x1) * (y2 - y1)
# Adjust thresholds based on product type and shelf zone
zone_config = self.store_config.get('zone_configs', {}).get(shelf_zone, {})
area_thresholds = zone_config.get('stock_thresholds', {
'full': 5000,
'medium': 3000,
'low': 1000
})
if area > area_thresholds['full']:
return 'full'
elif area > area_thresholds['medium']:
return 'medium'
elif area > area_thresholds['low']:
return 'low'
else:
return 'empty'
def analyze_shelf_status(self, detections: List[ProductDetection]) -> Dict:
"""
Analyze shelf status
"""
shelf_analysis = defaultdict(lambda: {
'total_products': 0,
'stock_levels': defaultdict(int),
'products': []
})
for detection in detections:
zone = detection.shelf_zone
shelf_analysis[zone]['total_products'] += 1
shelf_analysis[zone]['stock_levels'][detection.stock_level] += 1
shelf_analysis[zone]['products'].append({
'product_id': detection.product_id,
'product_name': detection.product_name,
'stock_level': detection.stock_level,
'confidence': detection.confidence
})
# Generate restock recommendations
restock_recommendations = []
for zone, analysis in shelf_analysis.items():
low_stock_count = analysis['stock_levels']['low'] + analysis['stock_levels']['empty']
total_count = analysis['total_products']
if total_count > 0 and low_stock_count / total_count > 0.3: # More than 30% low stock
restock_recommendations.append({
'zone': zone,
'urgency': 'high' if low_stock_count / total_count > 0.5 else 'medium',
'low_stock_products': [p for p in analysis['products'] if p['stock_level'] in ['low', 'empty']]
})
return {
'shelf_analysis': dict(shelf_analysis),
'restock_recommendations': restock_recommendations,
'overall_health': self.calculate_overall_health(shelf_analysis)
}
def calculate_overall_health(self, shelf_analysis: Dict) -> Dict:
"""
Calculate overall shelf health status
"""
total_products = sum(analysis['total_products'] for analysis in shelf_analysis.values())
total_low_stock = sum(
analysis['stock_levels']['low'] + analysis['stock_levels']['empty']
for analysis in shelf_analysis.values()
)
if total_products == 0:
health_score = 0
else:
health_score = max(0, 100 - (total_low_stock / total_products * 100))
health_status = 'excellent' if health_score >= 90 else \
'good' if health_score >= 75 else \
'fair' if health_score >= 60 else 'poor'
return {
'score': health_score,
'status': health_status,
'total_products': total_products,
'low_stock_items': total_low_stock
}
def customer_behavior_analysis(self, image: np.ndarray) -> Dict:
"""
Customer behavior analysis
"""
# Detect customers
results = self.model(image, verbose=False)
customers = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
cls = box.cls[0].item()
class_name = self.model.names[int(cls)]
if class_name == 'person': # Customer detection
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
# Analyze customer location
zone = self.determine_shelf_zone([x1, y1, x2, y2])
customers.append({
'bbox': [x1, y1, x2, y2],
'confidence': conf,
'zone': zone
})
# Analyze hot zones
zone_popularity = defaultdict(int)
for customer in customers:
if customer['zone'] != 'unknown':
zone_popularity[customer['zone']] += 1
return {
'customer_count': len(customers),
'zone_popularity': dict(zone_popularity),
'customers': customers
}
def visualize_retail_monitoring(self, image: np.ndarray, detections: List[ProductDetection],
shelf_analysis: Dict, customer_analysis: Dict) -> np.ndarray:
"""
Visualize retail monitoring results
"""
annotated_image = image.copy()
# Draw shelf zones
for zone_name, zone_coords in self.shelf_zones.items():
x1, y1, x2, y2 = zone_coords
# Choose color based on stock status
zone_health = 'good' # Default
for rec in shelf_analysis.get('restock_recommendations', []):
if rec['zone'] == zone_name:
zone_health = rec['urgency']
colors = {
'good': (0, 255, 0), # Green
'medium': (0, 255, 255), # Yellow
'high': (0, 0, 255) # Red
}
color = colors.get(zone_health, (128, 128, 128))
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2)
cv2.putText(annotated_image, zone_name, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# Draw product detections
for detection in detections:
x1, y1, x2, y2 = map(int, detection.bbox)
# Choose color based on stock level
stock_colors = {
'full': (0, 255, 0), # Green
'medium': (0, 255, 255), # Yellow
'low': (0, 165, 255), # Orange
'empty': (0, 0, 255) # Red
}
color = stock_colors.get(detection.stock_level, (128, 128, 128))
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2)
# Product label
label = f"{detection.product_name} ({detection.stock_level})"
cv2.putText(annotated_image, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Draw customer locations
for customer in customer_analysis['customers']:
x1, y1, x2, y2 = map(int, customer['bbox'])
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (255, 0, 255), 2)
cv2.putText(annotated_image, "Customer", (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 2)
# Display statistics
health = shelf_analysis['overall_health']
info_text = [
f"Health Score: {health['score']:.1f}% ({health['status']})",
f"Products: {health['total_products']} | Low Stock: {health['low_stock_items']}",
f"Customers: {customer_analysis['customer_count']}"
]
for i, text in enumerate(info_text):
cv2.putText(annotated_image, text, (10, 30 + i * 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return annotated_image
def run_retail_monitoring(self, camera_source=0):
"""
Run retail monitoring system
"""
cap = cv2.VideoCapture(camera_source)
while True:
ret, frame = cap.read()
if not ret:
break
# Product detection
detections = self.detect_products(frame)
# Shelf analysis
shelf_analysis = self.analyze_shelf_status(detections)
# Customer behavior analysis
customer_analysis = self.customer_behavior_analysis(frame)
# Visualize
annotated_frame = self.visualize_retail_monitoring(
frame, detections, shelf_analysis, customer_analysis
)
# Process restock alerts
for recommendation in shelf_analysis['restock_recommendations']:
if recommendation['urgency'] == 'high':
print(f"🚨 High Priority Restock: Zone {recommendation['zone']}")
cv2.imshow('Retail Monitoring System', annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def generate_daily_report(self) -> Dict:
"""
Generate daily report
"""
# This should generate a report from historical data
# Simplified implementation
return {
'date': time.strftime('%Y-%m-%d'),
'total_restocks_needed': 5,
'popular_zones': ['zone_a', 'zone_c'],
'peak_customer_hours': ['10:00-12:00', '14:00-16:00'],
'inventory_turnover': 85.2
}
# Usage example and configuration file
store_layout_config = {
"shelf_zones": {
"zone_a": [0, 0, 200, 300],
"zone_b": [200, 0, 400, 300],
"zone_c": [400, 0, 600, 300]
},
"product_catalog": {
"PROD001": {
"name": "Cola",
"category": "Beverages",
"price": 1.99,
"detection_keywords": ["bottle", "cola", "soda"]
},
"PROD002": {
"name": "Bread",
"category": "Bakery",
"price": 2.49,
"detection_keywords": ["bread", "loaf"]
}
},
"zone_configs": {
"zone_a": {"stock_thresholds": {"full": 8000, "medium": 5000, "low": 2000}},
"zone_b": {"stock_thresholds": {"full": 6000, "medium": 4000, "low": 1500}},
"zone_c": {"stock_thresholds": {"full": 7000, "medium": 4500, "low": 1800}}
}
}
# Save configuration file
with open('store_layout.json', 'w') as f:
json.dump(store_layout_config, f, indent=2)
if __name__ == "__main__":
system = RetailMonitoringSystem("yolov8n.pt", "store_layout.json")
system.run_retail_monitoring(0)
Chapter Summary
Through the industry application case studies in this chapter, we have gained an in-depth understanding of YOLO’s practical application modes and technical key points in different fields:
- Autonomous Driving Applications: Multi-camera fusion, real-time requirements, safety risk assessment
- Intelligent Surveillance Systems: Behavior analysis, anomaly detection, zone management
- Industrial Quality Inspection: Defect classification, quality assessment, production process integration
- Medical Imaging: High precision requirements, clinical decision support, urgency determination
- Retail Applications: Inventory monitoring, customer behavior analysis, intelligent replenishment
Each application area has its specific technical challenges and solutions:
- Safety-critical scenarios require extremely high reliability and real-time performance
- Medical applications demand high precision and interpretability
- Industrial environments focus on stability and integrability
- Retail scenarios emphasize business value and user experience
These cases demonstrate the wide applicability and strong potential of YOLO technology, providing valuable references and guidance for us to apply YOLO in actual projects.
In the next chapter, we will explore the cutting-edge technological developments and future trends of YOLO, understanding the development direction and potential breakthroughs of this technology.