Chapter 12: YOLO Practical Deployment and Applications
Haiyue
52min
Chapter 12: YOLO Practical Deployment and Applications
Learning Objectives
- Master deployment solutions for different platforms (server, mobile, edge devices)
- Learn design and implementation of real-time detection systems
- Understand production environment monitoring and maintenance
- Familiarize with API interface design and service deployment
12.1 Deployment Architecture Design
12.1.1 Deployment Architecture Overview
🔄 正在渲染 Mermaid 图表...
12.1.2 Deployment Strategy Selection
Deployment Solution Comparison
| Deployment Method | Advantages | Disadvantages | Applicable Scenarios |
|---|---|---|---|
| Cloud Deployment | High performance, easy maintenance, scalable | Network dependency, high latency | Batch processing, non-real-time applications |
| Edge Deployment | Low latency, data security, offline availability | Limited computing power, difficult maintenance | Real-time applications, privacy-sensitive |
| Mobile Deployment | No network dependency, fast response | Extremely limited resources | Personal applications, offline scenarios |
12.2 Server-Side Deployment
12.2.1 Docker Containerized Deployment
Dockerfile Example
# YOLO model containerized deployment
FROM nvidia/cuda:11.8-runtime-ubuntu20.04
# Install Python and dependencies
RUN apt-get update && apt-get install -y \
python3 \
python3-pip \
libglib2.0-0 \
libsm6 \
libxext6 \
libxrender-dev \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy dependency files
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Download model weights (if needed)
RUN python3 download_weights.py
# Expose port
EXPOSE 8080
# Startup command
CMD ["python3", "app.py", "--host", "0.0.0.0", "--port", "8080"]
Docker Compose Configuration
# docker-compose.yml
version: '3.8'
services:
yolo-api:
build: .
ports:
- "8080:8080"
volumes:
- ./models:/app/models
- ./logs:/app/logs
environment:
- CUDA_VISIBLE_DEVICES=0
- MODEL_PATH=/app/models/yolov8n.pt
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
redis:
image: redis:alpine
ports:
- "6379:6379"
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- yolo-api
12.2.2 FastAPI Service Implementation
Basic API Service
# app.py - YOLO API service
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import cv2
import numpy as np
import torch
from ultralytics import YOLO
import io
from PIL import Image
import base64
app = FastAPI(title="YOLO Detection API", version="1.0.0")
# Global model instance
model = None
@app.on_event("startup")
async def load_model():
"""Load model on startup"""
global model
try:
model = YOLO("yolov8n.pt")
print("Model loaded successfully")
except Exception as e:
print(f"Model loading failed: {e}")
raise
@app.post("/detect")
async def detect_objects(file: UploadFile = File(...)):
"""
Object detection API endpoint
"""
try:
# Read uploaded image
image_data = await file.read()
image = Image.open(io.BytesIO(image_data))
# Convert to OpenCV format
image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Perform detection
results = model(image_cv)
# Parse results
detections = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = box.cls[0].item()
class_name = model.names[int(cls)]
detections.append({
"bbox": [x1, y1, x2, y2],
"confidence": conf,
"class": class_name,
"class_id": int(cls)
})
return {
"detections": detections,
"count": len(detections)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/detect_batch")
async def detect_batch(files: list[UploadFile] = File(...)):
"""
Batch detection API endpoint
"""
results = []
for file in files:
try:
# Process single image
image_data = await file.read()
image = Image.open(io.BytesIO(image_data))
image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Perform detection
detection_results = model(image_cv)
# Parse results
detections = []
for r in detection_results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = box.cls[0].item()
class_name = model.names[int(cls)]
detections.append({
"bbox": [x1, y1, x2, y2],
"confidence": conf,
"class": class_name,
"class_id": int(cls)
})
results.append({
"filename": file.filename,
"detections": detections,
"count": len(detections)
})
except Exception as e:
results.append({
"filename": file.filename,
"error": str(e)
})
return {"results": results}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "model_loaded": model is not None}
@app.get("/model_info")
async def model_info():
"""Model information endpoint"""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
return {
"model_type": "YOLOv8",
"classes": list(model.names.values()),
"input_size": [640, 640]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
12.2.3 Load Balancing and Scaling
Kubernetes Deployment Configuration
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: yolo-detection
spec:
replicas: 3
selector:
matchLabels:
app: yolo-detection
template:
metadata:
labels:
app: yolo-detection
spec:
containers:
- name: yolo-api
image: yolo-detection:latest
ports:
- containerPort: 8080
resources:
requests:
memory: "2Gi"
nvidia.com/gpu: 1
limits:
memory: "4Gi"
nvidia.com/gpu: 1
env:
- name: MODEL_PATH
value: "/app/models/yolov8n.pt"
---
apiVersion: v1
kind: Service
metadata:
name: yolo-service
spec:
selector:
app: yolo-detection
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: yolo-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: yolo-detection
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
12.3 Mobile Deployment
12.3.1 iOS Deployment (Core ML)
Model Conversion and Integration
// YOLODetector.swift - iOS YOLO detector
import CoreML
import Vision
import UIKit
class YOLODetector {
private var model: VNCoreMLModel?
init() {
loadModel()
}
private func loadModel() {
guard let modelURL = Bundle.main.url(forResource: "YOLOv8", withExtension: "mlmodelc"),
let coreMLModel = try? MLModel(contentsOf: modelURL),
let visionModel = try? VNCoreMLModel(for: coreMLModel) else {
print("Failed to load Core ML model")
return
}
self.model = visionModel
}
func detectObjects(in image: UIImage, completion: @escaping ([Detection]) -> Void) {
guard let model = self.model,
let cgImage = image.cgImage else {
completion([])
return
}
let request = VNCoreMLRequest(model: model) { [weak self] request, error in
if let error = error {
print("Detection error: \\(error)")
completion([])
return
}
let detections = self?.processResults(request.results) ?? []
DispatchQueue.main.async {
completion(detections)
}
}
request.imageCropAndScaleOption = .scaleFill
let handler = VNImageRequestHandler(cgImage: cgImage, options: [:])
DispatchQueue.global(qos: .userInitiated).async {
try? handler.perform([request])
}
}
private func processResults(_ results: [VNObservation]?) -> [Detection] {
guard let results = results as? [VNRecognizedObjectObservation] else {
return []
}
return results.compactMap { observation in
guard let topLabel = observation.labels.first else { return nil }
return Detection(
boundingBox: observation.boundingBox,
confidence: topLabel.confidence,
className: topLabel.identifier
)
}
}
}
struct Detection {
let boundingBox: CGRect
let confidence: Float
let className: String
}
Real-time Camera Detection
// CameraViewController.swift - Real-time detection interface
import UIKit
import AVFoundation
class CameraViewController: UIViewController {
private var captureSession: AVCaptureSession!
private var previewLayer: AVCaptureVideoPreviewLayer!
private let detector = YOLODetector()
private var overlayView: DetectionOverlayView!
override func viewDidLoad() {
super.viewDidLoad()
setupCamera()
setupUI()
}
private func setupCamera() {
captureSession = AVCaptureSession()
captureSession.sessionPreset = .high
guard let backCamera = AVCaptureDevice.default(for: .video),
let input = try? AVCaptureDeviceInput(device: backCamera) else {
return
}
captureSession.addInput(input)
let videoOutput = AVCaptureVideoDataOutput()
videoOutput.setSampleBufferDelegate(self, queue: DispatchQueue(label: "camera_queue"))
captureSession.addOutput(videoOutput)
previewLayer = AVCaptureVideoPreviewLayer(session: captureSession)
previewLayer.frame = view.bounds
previewLayer.videoGravity = .resizeAspectFill
view.layer.addSublayer(previewLayer)
captureSession.startRunning()
}
private func setupUI() {
overlayView = DetectionOverlayView(frame: view.bounds)
view.addSubview(overlayView)
}
}
extension CameraViewController: AVCaptureVideoDataOutputSampleBufferDelegate {
func captureOutput(_ output: AVCaptureOutput, didOutput sampleBuffer: CMSampleBuffer, from connection: AVCaptureConnection) {
guard let imageBuffer = CMSampleBufferGetImageBuffer(sampleBuffer) else { return }
let ciImage = CIImage(cvImageBuffer: imageBuffer)
let context = CIContext()
guard let cgImage = context.createCGImage(ciImage, from: ciImage.extent) else { return }
let uiImage = UIImage(cgImage: cgImage)
detector.detectObjects(in: uiImage) { [weak self] detections in
self?.overlayView.updateDetections(detections)
}
}
}
12.3.2 Android Deployment (TensorFlow Lite)
Model Integration
// YOLODetector.kt - Android YOLO detector
class YOLODetector(private val context: Context) {
private var interpreter: Interpreter? = null
private val inputSize = 640
private val classNames = loadClassNames()
init {
loadModel()
}
private fun loadModel() {
try {
val modelBuffer = loadModelFile("yolo_model.tflite")
val options = Interpreter.Options()
options.setNumThreads(4)
options.setUseNNAPI(true) // Use NNAPI acceleration
interpreter = Interpreter(modelBuffer, options)
} catch (e: Exception) {
Log.e("YOLODetector", "Error loading model", e)
}
}
private fun loadModelFile(modelName: String): ByteBuffer {
val assetFileDescriptor = context.assets.openFd(modelName)
val inputStream = FileInputStream(assetFileDescriptor.fileDescriptor)
val fileChannel = inputStream.channel
val startOffset = assetFileDescriptor.startOffset
val declaredLength = assetFileDescriptor.declaredLength
return fileChannel.map(FileChannel.MapMode.READ_ONLY, startOffset, declaredLength)
}
fun detectObjects(bitmap: Bitmap): List<Detection> {
val interpreter = this.interpreter ?: return emptyList()
// Preprocess image
val resizedBitmap = Bitmap.createScaledBitmap(bitmap, inputSize, inputSize, true)
val inputBuffer = preprocessImage(resizedBitmap)
// Prepare output buffer
val outputShape = interpreter.getOutputTensor(0).shape()
val outputBuffer = Array(1) { Array(outputShape[1]) { FloatArray(outputShape[2]) } }
// Execute inference
interpreter.run(inputBuffer, outputBuffer)
// Post-process results
return postprocessResults(outputBuffer[0], bitmap.width, bitmap.height)
}
private fun preprocessImage(bitmap: Bitmap): ByteBuffer {
val byteBuffer = ByteBuffer.allocateDirect(4 * inputSize * inputSize * 3)
byteBuffer.order(ByteOrder.nativeOrder())
val pixels = IntArray(inputSize * inputSize)
bitmap.getPixels(pixels, 0, inputSize, 0, 0, inputSize, inputSize)
for (pixel in pixels) {
val r = (pixel shr 16 and 0xFF) / 255.0f
val g = (pixel shr 8 and 0xFF) / 255.0f
val b = (pixel and 0xFF) / 255.0f
byteBuffer.putFloat(r)
byteBuffer.putFloat(g)
byteBuffer.putFloat(b)
}
return byteBuffer
}
private fun postprocessResults(outputs: Array<FloatArray>, imageWidth: Int, imageHeight: Int): List<Detection> {
val detections = mutableListOf<Detection>()
val confidenceThreshold = 0.5f
for (output in outputs) {
if (output.size >= 6) { // x, y, w, h, confidence, class_scores...
val centerX = output[0]
val centerY = output[1]
val width = output[2]
val height = output[3]
val confidence = output[4]
if (confidence > confidenceThreshold) {
// Find highest class score
var maxClassScore = 0f
var classId = 0
for (i in 5 until output.size) {
if (output[i] > maxClassScore) {
maxClassScore = output[i]
classId = i - 5
}
}
if (maxClassScore * confidence > confidenceThreshold) {
val left = (centerX - width / 2) * imageWidth
val top = (centerY - height / 2) * imageHeight
val right = (centerX + width / 2) * imageWidth
val bottom = (centerY + height / 2) * imageHeight
detections.add(
Detection(
RectF(left, top, right, bottom),
maxClassScore * confidence,
classNames.getOrElse(classId) { "Unknown" }
)
)
}
}
}
}
return applyNMS(detections)
}
private fun applyNMS(detections: List<Detection>): List<Detection> {
val sortedDetections = detections.sortedByDescending { it.confidence }
val finalDetections = mutableListOf<Detection>()
for (detection in sortedDetections) {
var keep = true
for (finalDetection in finalDetections) {
if (calculateIoU(detection.boundingBox, finalDetection.boundingBox) > 0.5) {
keep = false
break
}
}
if (keep) {
finalDetections.add(detection)
}
}
return finalDetections
}
private fun calculateIoU(box1: RectF, box2: RectF): Float {
val intersectionArea = maxOf(0f, minOf(box1.right, box2.right) - maxOf(box1.left, box2.left)) *
maxOf(0f, minOf(box1.bottom, box2.bottom) - maxOf(box1.top, box2.top))
val box1Area = (box1.right - box1.left) * (box1.bottom - box1.top)
val box2Area = (box2.right - box2.left) * (box2.bottom - box2.top)
return intersectionArea / (box1Area + box2Area - intersectionArea)
}
private fun loadClassNames(): List<String> {
return try {
context.assets.open("class_names.txt").bufferedReader().readLines()
} catch (e: Exception) {
Log.e("YOLODetector", "Error loading class names", e)
emptyList()
}
}
}
data class Detection(
val boundingBox: RectF,
val confidence: Float,
val className: String
)
12.4 Edge Device Deployment
12.4.1 Jetson Nano Deployment
Environment Setup Script
#!/bin/bash
# jetson_setup.sh - Jetson Nano environment setup
# Update system
sudo apt update && sudo apt upgrade -y
# Install Python dependencies
sudo apt install -y python3-pip python3-dev
# Install PyTorch for Jetson
wget https://nvidia.box.com/shared/static/p57jwntv436lfrd78inwl7iml6p13fzh.whl -O torch-1.8.0-cp36-cp36m-linux_aarch64.whl
pip3 install torch-1.8.0-cp36-cp36m-linux_aarch64.whl
# Install torchvision
git clone --branch v0.9.0 https://github.com/pytorch/vision torchvision
cd torchvision
sudo python3 setup.py install
# Install other dependencies
pip3 install ultralytics opencv-python numpy pillow
# Set power mode (maximum performance)
sudo nvpmodel -m 0
sudo jetson_clocks
echo "Jetson Nano environment setup complete"
Optimized Detection Script
# jetson_detector.py - Jetson optimized detector
import torch
import cv2
import numpy as np
from ultralytics import YOLO
import time
import argparse
class JetsonYOLODetector:
def __init__(self, model_path, device='cuda'):
self.device = device
self.model = YOLO(model_path)
# Optimization settings
if torch.cuda.is_available():
torch.backends.cudnn.benchmark = True
self.model.to(device)
def detect_video(self, source=0, save_path=None):
"""
Real-time video detection
"""
cap = cv2.VideoCapture(source)
# Set camera parameters
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
if save_path:
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(save_path, fourcc, 20.0, (640, 480))
fps_counter = 0
start_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
# Perform detection
results = self.model(frame, verbose=False)
# Draw results
annotated_frame = results[0].plot()
# Calculate FPS
fps_counter += 1
if fps_counter % 30 == 0:
end_time = time.time()
fps = 30 / (end_time - start_time)
start_time = end_time
print(f"FPS: {fps:.2f}")
# Display FPS
cv2.putText(annotated_frame, f"FPS: {fps:.1f}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
if save_path:
out.write(annotated_frame)
cv2.imshow('YOLO Detection', annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
if save_path:
out.release()
cv2.destroyAllWindows()
def benchmark(self, test_images_path, num_runs=100):
"""
Performance benchmark test
"""
import glob
image_paths = glob.glob(f"{test_images_path}/*.jpg")
if not image_paths:
print("No test images found")
return
total_time = 0
for i in range(num_runs):
image_path = image_paths[i % len(image_paths)]
image = cv2.imread(image_path)
start_time = time.time()
results = self.model(image, verbose=False)
end_time = time.time()
total_time += (end_time - start_time)
if i % 10 == 0:
print(f"Processed {i}/{num_runs} images")
avg_time = total_time / num_runs
avg_fps = 1 / avg_time
print(f"\\nBenchmark Results:")
print(f"Average inference time: {avg_time:.4f} seconds")
print(f"Average FPS: {avg_fps:.2f}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model", default="yolov8n.pt", help="Model path")
parser.add_argument("--source", default=0, help="Video source")
parser.add_argument("--save", help="Save video path")
parser.add_argument("--benchmark", help="Benchmark images path")
args = parser.parse_args()
detector = JetsonYOLODetector(args.model)
if args.benchmark:
detector.benchmark(args.benchmark)
else:
detector.detect_video(args.source, args.save)
12.4.2 Raspberry Pi Deployment
Lightweight Deployment Solution
# rpi_detector.py - Raspberry Pi optimized detector
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
import time
from threading import Thread
import queue
class RPiYOLODetector:
def __init__(self, model_path, num_threads=4):
# Load TensorFlow Lite model
self.interpreter = tflite.Interpreter(
model_path=model_path,
num_threads=num_threads
)
self.interpreter.allocate_tensors()
# Get input/output information
self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()
self.input_shape = self.input_details[0]['shape']
self.input_height = self.input_shape[1]
self.input_width = self.input_shape[2]
# Class names
self.class_names = self.load_class_names()
# Frame queues
self.frame_queue = queue.Queue(maxsize=2)
self.result_queue = queue.Queue(maxsize=2)
def load_class_names(self):
"""Load class names"""
try:
with open('class_names.txt', 'r') as f:
return [line.strip() for line in f.readlines()]
except:
return [f"class_{i}" for i in range(80)] # COCO default 80 classes
def preprocess_image(self, image):
"""Image preprocessing"""
# Resize
resized = cv2.resize(image, (self.input_width, self.input_height))
# Normalize
input_data = np.expand_dims(resized, axis=0)
input_data = (input_data / 255.0).astype(np.float32)
return input_data
def postprocess_results(self, outputs, image_shape, conf_threshold=0.5):
"""Post-process detection results"""
detections = []
# Parse output (assuming output format is [batch, num_detections, 6])
# 6 values: x_center, y_center, width, height, confidence, class_id
height, width = image_shape[:2]
for output in outputs[0]: # Take first batch
confidence = output[4]
if confidence > conf_threshold:
x_center, y_center, w, h = output[:4]
class_id = int(output[5])
# Convert to bounding box coordinates
x1 = int((x_center - w/2) * width)
y1 = int((y_center - h/2) * height)
x2 = int((x_center + w/2) * width)
y2 = int((y_center + h/2) * height)
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': confidence,
'class_id': class_id,
'class_name': self.class_names[class_id] if class_id < len(self.class_names) else 'unknown'
})
return detections
def detect_frame(self, frame):
"""Single frame detection"""
# Preprocess
input_data = self.preprocess_image(frame)
# Inference
self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
self.interpreter.invoke()
# Get output
outputs = [self.interpreter.get_tensor(detail['index'])
for detail in self.output_details]
# Post-process
detections = self.postprocess_results(outputs, frame.shape)
return detections
def draw_detections(self, frame, detections):
"""Draw detection results"""
for det in detections:
x1, y1, x2, y2 = det['bbox']
conf = det['confidence']
class_name = det['class_name']
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label
label = f"{class_name}: {conf:.2f}"
cv2.putText(frame, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
return frame
def detection_worker(self):
"""Detection worker thread"""
while True:
try:
frame = self.frame_queue.get(timeout=1)
detections = self.detect_frame(frame)
self.result_queue.put((frame, detections))
self.frame_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Detection error: {e}")
def run_camera_detection(self, camera_id=0):
"""Run camera detection"""
cap = cv2.VideoCapture(camera_id)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
cap.set(cv2.CAP_PROP_FPS, 15)
# Start detection thread
detection_thread = Thread(target=self.detection_worker, daemon=True)
detection_thread.start()
fps_counter = 0
start_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
# Add frame to queue
if not self.frame_queue.full():
self.frame_queue.put(frame.copy())
# Get detection results
try:
result_frame, detections = self.result_queue.get_nowait()
annotated_frame = self.draw_detections(result_frame, detections)
# Calculate and display FPS
fps_counter += 1
if fps_counter % 30 == 0:
end_time = time.time()
fps = 30 / (end_time - start_time)
start_time = end_time
print(f"FPS: {fps:.2f}")
cv2.putText(annotated_frame, f"FPS: {fps:.1f}",
(10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.imshow('RPi YOLO Detection', annotated_frame)
except queue.Empty:
cv2.imshow('RPi YOLO Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
detector = RPiYOLODetector("yolo_model.tflite")
detector.run_camera_detection()
12.5 Real-Time Detection System Design
12.5.1 System Architecture Design
🔄 正在渲染 Mermaid 图表...
12.5.2 Pipeline Processing System
# real_time_system.py - Real-time detection system
import cv2
import numpy as np
import torch
from ultralytics import YOLO
import threading
import queue
import time
from collections import deque
import json
class RealTimeDetectionSystem:
def __init__(self, model_path, max_queue_size=10, num_workers=2):
self.model = YOLO(model_path)
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.model.to(self.device)
# Queue management
self.input_queue = queue.Queue(maxsize=max_queue_size)
self.output_queue = queue.Queue(maxsize=max_queue_size)
# Worker threads
self.workers = []
self.num_workers = num_workers
self.running = False
# Performance monitoring
self.fps_tracker = deque(maxlen=30)
self.processing_times = deque(maxlen=100)
# Result storage
self.detection_history = deque(maxlen=1000)
def worker_thread(self):
"""Detection worker thread"""
while self.running:
try:
# Get input data
frame_data = self.input_queue.get(timeout=1)
if frame_data is None: # Stop signal
break
frame, timestamp, frame_id = frame_data
# Perform detection
start_time = time.time()
results = self.model(frame, verbose=False)
end_time = time.time()
processing_time = end_time - start_time
self.processing_times.append(processing_time)
# Parse results
detections = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = box.cls[0].item()
class_name = self.model.names[int(cls)]
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': conf,
'class': class_name,
'class_id': int(cls)
})
# Output results
result_data = {
'frame': frame,
'detections': detections,
'timestamp': timestamp,
'frame_id': frame_id,
'processing_time': processing_time
}
if not self.output_queue.full():
self.output_queue.put(result_data)
# Store history
self.detection_history.append({
'timestamp': timestamp,
'frame_id': frame_id,
'detection_count': len(detections),
'processing_time': processing_time
})
self.input_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Worker thread error: {e}")
def start(self):
"""Start system"""
self.running = True
for i in range(self.num_workers):
worker = threading.Thread(target=self.worker_thread, daemon=True)
worker.start()
self.workers.append(worker)
def stop(self):
"""Stop system"""
self.running = False
# Send stop signal
for _ in range(self.num_workers):
if not self.input_queue.full():
self.input_queue.put(None)
# Wait for worker threads to finish
for worker in self.workers:
worker.join(timeout=2)
def add_frame(self, frame, timestamp=None, frame_id=None):
"""Add frame to processing queue"""
if timestamp is None:
timestamp = time.time()
if frame_id is None:
frame_id = int(timestamp * 1000)
if not self.input_queue.full():
self.input_queue.put((frame, timestamp, frame_id))
return True
return False
def get_result(self, timeout=0.1):
"""Get detection result"""
try:
return self.output_queue.get(timeout=timeout)
except queue.Empty:
return None
def get_statistics(self):
"""Get system statistics"""
if not self.processing_times:
return {}
return {
'avg_processing_time': np.mean(self.processing_times),
'max_processing_time': np.max(self.processing_times),
'min_processing_time': np.min(self.processing_times),
'current_fps': len(self.fps_tracker) / 30 if self.fps_tracker else 0,
'input_queue_size': self.input_queue.qsize(),
'output_queue_size': self.output_queue.qsize(),
'total_detections': len(self.detection_history)
}
def run_camera_detection(self, camera_id=0, display=True):
"""Run camera detection"""
cap = cv2.VideoCapture(camera_id)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self.start()
frame_count = 0
fps_start_time = time.time()
try:
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Add frame to processing queue
self.add_frame(frame)
# Get detection results
result = self.get_result()
if result:
if display:
# Draw detection results
annotated_frame = self.draw_detections(
result['frame'],
result['detections']
)
# Display statistics
stats = self.get_statistics()
info_text = f"FPS: {stats.get('current_fps', 0):.1f} | " \
f"Proc: {stats.get('avg_processing_time', 0):.3f}s"
cv2.putText(annotated_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow('Real-time Detection', annotated_frame)
# Calculate FPS
if frame_count % 30 == 0:
fps_end_time = time.time()
fps = 30 / (fps_end_time - fps_start_time)
self.fps_tracker.append(fps)
fps_start_time = fps_end_time
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
self.stop()
cap.release()
cv2.destroyAllWindows()
def draw_detections(self, frame, detections):
"""Draw detection results"""
for det in detections:
x1, y1, x2, y2 = map(int, det['bbox'])
conf = det['confidence']
class_name = det['class']
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label
label = f"{class_name}: {conf:.2f}"
cv2.putText(frame, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
return frame
def export_statistics(self, filename):
"""Export statistics"""
stats = {
'system_stats': self.get_statistics(),
'detection_history': list(self.detection_history),
'performance_data': {
'processing_times': list(self.processing_times),
'fps_history': list(self.fps_tracker)
}
}
with open(filename, 'w') as f:
json.dump(stats, f, indent=2)
if __name__ == "__main__":
system = RealTimeDetectionSystem("yolov8n.pt", num_workers=2)
system.run_camera_detection(camera_id=0)
12.6 Production Environment Monitoring
12.6.1 Monitoring Metrics Design
🔄 正在渲染 Mermaid 图表...
12.6.2 Monitoring System Implementation
# monitoring.py - Production environment monitoring system
import time
import psutil
import GPUtil
import logging
import json
from dataclasses import dataclass
from typing import List, Dict, Any
from collections import defaultdict, deque
import threading
import sqlite3
@dataclass
class MetricRecord:
timestamp: float
metric_name: str
value: float
tags: Dict[str, str] = None
class PerformanceMonitor:
def __init__(self, db_path="metrics.db"):
self.db_path = db_path
self.metrics_buffer = deque(maxlen=10000)
self.running = False
self.monitor_thread = None
# Initialize database
self.init_db()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('detection_system.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def init_db(self):
"""Initialize monitoring database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
metric_name TEXT,
value REAL,
tags TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
alert_type TEXT,
message TEXT,
severity TEXT
)
''')
conn.commit()
conn.close()
def record_metric(self, name: str, value: float, tags: Dict[str, str] = None):
"""Record metric"""
record = MetricRecord(
timestamp=time.time(),
metric_name=name,
value=value,
tags=tags or {}
)
self.metrics_buffer.append(record)
def start_monitoring(self, interval=5):
"""Start monitoring"""
self.running = True
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,),
daemon=True
)
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop monitoring"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self, interval):
"""Monitoring loop"""
while self.running:
try:
# Collect system metrics
self._collect_system_metrics()
# Persist metrics
self._persist_metrics()
# Check alerts
self._check_alerts()
time.sleep(interval)
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
def _collect_system_metrics(self):
"""Collect system metrics"""
# CPU metrics
cpu_percent = psutil.cpu_percent(interval=1)
self.record_metric("system.cpu.usage", cpu_percent, {"unit": "percent"})
# Memory metrics
memory = psutil.virtual_memory()
self.record_metric("system.memory.usage", memory.percent, {"unit": "percent"})
self.record_metric("system.memory.available", memory.available, {"unit": "bytes"})
# GPU metrics
try:
gpus = GPUtil.getGPUs()
for i, gpu in enumerate(gpus):
self.record_metric("system.gpu.usage", gpu.load * 100,
{"gpu_id": str(i), "unit": "percent"})
self.record_metric("system.gpu.memory", gpu.memoryUtil * 100,
{"gpu_id": str(i), "unit": "percent"})
self.record_metric("system.gpu.temperature", gpu.temperature,
{"gpu_id": str(i), "unit": "celsius"})
except Exception as e:
self.logger.warning(f"GPU monitoring failed: {e}")
# Disk metrics
disk = psutil.disk_usage('/')
self.record_metric("system.disk.usage", disk.percent, {"unit": "percent"})
def _persist_metrics(self):
"""Persist metrics to database"""
if not self.metrics_buffer:
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Batch insert metrics
metrics_to_insert = []
while self.metrics_buffer:
try:
record = self.metrics_buffer.popleft()
metrics_to_insert.append((
record.timestamp,
record.metric_name,
record.value,
json.dumps(record.tags)
))
except IndexError:
break
if metrics_to_insert:
cursor.executemany(
'INSERT INTO metrics (timestamp, metric_name, value, tags) VALUES (?, ?, ?, ?)',
metrics_to_insert
)
conn.commit()
conn.close()
def _check_alerts(self):
"""Check alert conditions"""
# Get recent metrics
recent_metrics = self.get_recent_metrics(minutes=5)
# CPU usage alert
cpu_metrics = [m for m in recent_metrics if m['metric_name'] == 'system.cpu.usage']
if cpu_metrics:
avg_cpu = sum(m['value'] for m in cpu_metrics) / len(cpu_metrics)
if avg_cpu > 80:
self._send_alert("HIGH_CPU_USAGE", f"CPU usage: {avg_cpu:.1f}%", "warning")
# GPU memory alert
gpu_memory_metrics = [m for m in recent_metrics if m['metric_name'] == 'system.gpu.memory']
if gpu_memory_metrics:
max_gpu_memory = max(m['value'] for m in gpu_memory_metrics)
if max_gpu_memory > 90:
self._send_alert("HIGH_GPU_MEMORY", f"GPU memory: {max_gpu_memory:.1f}%", "warning")
def _send_alert(self, alert_type: str, message: str, severity: str):
"""Send alert"""
self.logger.warning(f"ALERT [{severity.upper()}] {alert_type}: {message}")
# Store alert to database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
'INSERT INTO alerts (timestamp, alert_type, message, severity) VALUES (?, ?, ?, ?)',
(time.time(), alert_type, message, severity)
)
conn.commit()
conn.close()
def get_recent_metrics(self, minutes=10):
"""Get recent metrics"""
since_timestamp = time.time() - (minutes * 60)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
'SELECT timestamp, metric_name, value, tags FROM metrics WHERE timestamp > ? ORDER BY timestamp DESC',
(since_timestamp,)
)
metrics = []
for row in cursor.fetchall():
metrics.append({
'timestamp': row[0],
'metric_name': row[1],
'value': row[2],
'tags': json.loads(row[3])
})
conn.close()
return metrics
def get_performance_summary(self, hours=24):
"""Get performance summary"""
since_timestamp = time.time() - (hours * 3600)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get statistics for various metrics
cursor.execute('''
SELECT metric_name,
AVG(value) as avg_value,
MIN(value) as min_value,
MAX(value) as max_value,
COUNT(*) as count
FROM metrics
WHERE timestamp > ?
GROUP BY metric_name
''', (since_timestamp,))
summary = {}
for row in cursor.fetchall():
summary[row[0]] = {
'avg': row[1],
'min': row[2],
'max': row[3],
'count': row[4]
}
conn.close()
return summary
class DetectionMonitor(PerformanceMonitor):
"""Detection system specific monitor"""
def __init__(self, db_path="detection_metrics.db"):
super().__init__(db_path)
self.detection_stats = defaultdict(int)
self.confidence_history = deque(maxlen=1000)
def record_detection(self, detections: List[Dict], processing_time: float):
"""Record detection results"""
# Record processing time
self.record_metric("detection.processing_time", processing_time, {"unit": "seconds"})
# Record detection count
detection_count = len(detections)
self.record_metric("detection.count", detection_count, {"unit": "objects"})
# Record confidence statistics
if detections:
confidences = [det['confidence'] for det in detections]
avg_confidence = sum(confidences) / len(confidences)
max_confidence = max(confidences)
min_confidence = min(confidences)
self.record_metric("detection.confidence.avg", avg_confidence)
self.record_metric("detection.confidence.max", max_confidence)
self.record_metric("detection.confidence.min", min_confidence)
# Store confidence history
self.confidence_history.extend(confidences)
# Record class statistics
class_counts = defaultdict(int)
for det in detections:
class_name = det['class']
class_counts[class_name] += 1
self.detection_stats[class_name] += 1
for class_name, count in class_counts.items():
self.record_metric("detection.class_count", count, {"class": class_name})
def get_detection_summary(self):
"""Get detection summary"""
summary = self.get_performance_summary()
# Add detection-specific statistics
summary['detection_stats'] = dict(self.detection_stats)
if self.confidence_history:
summary['confidence_distribution'] = {
'mean': sum(self.confidence_history) / len(self.confidence_history),
'std': np.std(list(self.confidence_history)),
'min': min(self.confidence_history),
'max': max(self.confidence_history)
}
return summary
# Usage example
if __name__ == "__main__":
monitor = DetectionMonitor()
monitor.start_monitoring(interval=5)
# Simulate detection results
import numpy as np
for i in range(100):
# Simulate detection results
num_detections = np.random.randint(0, 10)
detections = []
for j in range(num_detections):
detections.append({
'bbox': [100, 100, 200, 200],
'confidence': np.random.uniform(0.5, 1.0),
'class': np.random.choice(['person', 'car', 'bicycle'])
})
processing_time = np.random.uniform(0.01, 0.1)
monitor.record_detection(detections, processing_time)
time.sleep(0.1)
# Get summary
summary = monitor.get_detection_summary()
print(json.dumps(summary, indent=2))
monitor.stop_monitoring()
Chapter Summary
YOLO practical deployment and applications is the critical step in transforming theory into practice. Through this chapter, we have mastered:
- Deployment Architecture Design: Selection and design principles for three main deployment modes: cloud, edge, and mobile
- Server-Side Deployment: Modern deployment solutions including Docker containerization, FastAPI services, and Kubernetes clusters
- Mobile Deployment: Specific implementation methods for iOS Core ML and Android TensorFlow Lite
- Edge Device Deployment: Optimized deployment for edge computing devices such as Jetson Nano and Raspberry Pi
- Real-Time Detection Systems: High-performance pipeline processing and multi-threading optimization techniques
- Production Environment Monitoring: Comprehensive performance monitoring, resource monitoring, and business monitoring systems
These deployment techniques and monitoring solutions can help us:
- Build stable and reliable production-grade detection systems
- Meet performance and resource requirements for different scenarios
- Achieve system observability and maintainability
- Ensure long-term stable operation and continuous optimization
In the next chapter, we will gain deep insights into YOLO’s practical application patterns and technical key points across different industries through specific industry application cases.