Chapter 12: YOLO Practical Deployment and Applications

Haiyue
52min

Chapter 12: YOLO Practical Deployment and Applications

Learning Objectives

  1. Master deployment solutions for different platforms (server, mobile, edge devices)
  2. Learn design and implementation of real-time detection systems
  3. Understand production environment monitoring and maintenance
  4. Familiarize with API interface design and service deployment

12.1 Deployment Architecture Design

12.1.1 Deployment Architecture Overview

🔄 正在渲染 Mermaid 图表...

12.1.2 Deployment Strategy Selection

Deployment Solution Comparison

Deployment MethodAdvantagesDisadvantagesApplicable Scenarios
Cloud DeploymentHigh performance, easy maintenance, scalableNetwork dependency, high latencyBatch processing, non-real-time applications
Edge DeploymentLow latency, data security, offline availabilityLimited computing power, difficult maintenanceReal-time applications, privacy-sensitive
Mobile DeploymentNo network dependency, fast responseExtremely limited resourcesPersonal applications, offline scenarios

12.2 Server-Side Deployment

12.2.1 Docker Containerized Deployment

Dockerfile Example

# YOLO model containerized deployment
FROM nvidia/cuda:11.8-runtime-ubuntu20.04

# Install Python and dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    libglib2.0-0 \
    libsm6 \
    libxext6 \
    libxrender-dev \
    libgomp1 \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy dependency files
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Download model weights (if needed)
RUN python3 download_weights.py

# Expose port
EXPOSE 8080

# Startup command
CMD ["python3", "app.py", "--host", "0.0.0.0", "--port", "8080"]

Docker Compose Configuration

# docker-compose.yml
version: '3.8'

services:
  yolo-api:
    build: .
    ports:
      - "8080:8080"
    volumes:
      - ./models:/app/models
      - ./logs:/app/logs
    environment:
      - CUDA_VISIBLE_DEVICES=0
      - MODEL_PATH=/app/models/yolov8n.pt
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - yolo-api

12.2.2 FastAPI Service Implementation

Basic API Service

# app.py - YOLO API service
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import cv2
import numpy as np
import torch
from ultralytics import YOLO
import io
from PIL import Image
import base64

app = FastAPI(title="YOLO Detection API", version="1.0.0")

# Global model instance
model = None

@app.on_event("startup")
async def load_model():
    """Load model on startup"""
    global model
    try:
        model = YOLO("yolov8n.pt")
        print("Model loaded successfully")
    except Exception as e:
        print(f"Model loading failed: {e}")
        raise

@app.post("/detect")
async def detect_objects(file: UploadFile = File(...)):
    """
    Object detection API endpoint
    """
    try:
        # Read uploaded image
        image_data = await file.read()
        image = Image.open(io.BytesIO(image_data))

        # Convert to OpenCV format
        image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)

        # Perform detection
        results = model(image_cv)

        # Parse results
        detections = []
        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    conf = box.conf[0].item()
                    cls = box.cls[0].item()
                    class_name = model.names[int(cls)]

                    detections.append({
                        "bbox": [x1, y1, x2, y2],
                        "confidence": conf,
                        "class": class_name,
                        "class_id": int(cls)
                    })

        return {
            "detections": detections,
            "count": len(detections)
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/detect_batch")
async def detect_batch(files: list[UploadFile] = File(...)):
    """
    Batch detection API endpoint
    """
    results = []
    for file in files:
        try:
            # Process single image
            image_data = await file.read()
            image = Image.open(io.BytesIO(image_data))
            image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)

            # Perform detection
            detection_results = model(image_cv)

            # Parse results
            detections = []
            for r in detection_results:
                boxes = r.boxes
                if boxes is not None:
                    for box in boxes:
                        x1, y1, x2, y2 = box.xyxy[0].tolist()
                        conf = box.conf[0].item()
                        cls = box.cls[0].item()
                        class_name = model.names[int(cls)]

                        detections.append({
                            "bbox": [x1, y1, x2, y2],
                            "confidence": conf,
                            "class": class_name,
                            "class_id": int(cls)
                        })

            results.append({
                "filename": file.filename,
                "detections": detections,
                "count": len(detections)
            })

        except Exception as e:
            results.append({
                "filename": file.filename,
                "error": str(e)
            })

    return {"results": results}

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "model_loaded": model is not None}

@app.get("/model_info")
async def model_info():
    """Model information endpoint"""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    return {
        "model_type": "YOLOv8",
        "classes": list(model.names.values()),
        "input_size": [640, 640]
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

12.2.3 Load Balancing and Scaling

Kubernetes Deployment Configuration

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: yolo-detection
spec:
  replicas: 3
  selector:
    matchLabels:
      app: yolo-detection
  template:
    metadata:
      labels:
        app: yolo-detection
    spec:
      containers:
      - name: yolo-api
        image: yolo-detection:latest
        ports:
        - containerPort: 8080
        resources:
          requests:
            memory: "2Gi"
            nvidia.com/gpu: 1
          limits:
            memory: "4Gi"
            nvidia.com/gpu: 1
        env:
        - name: MODEL_PATH
          value: "/app/models/yolov8n.pt"

---
apiVersion: v1
kind: Service
metadata:
  name: yolo-service
spec:
  selector:
    app: yolo-detection
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: yolo-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: yolo-detection
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

12.3 Mobile Deployment

12.3.1 iOS Deployment (Core ML)

Model Conversion and Integration

// YOLODetector.swift - iOS YOLO detector
import CoreML
import Vision
import UIKit

class YOLODetector {
    private var model: VNCoreMLModel?

    init() {
        loadModel()
    }

    private func loadModel() {
        guard let modelURL = Bundle.main.url(forResource: "YOLOv8", withExtension: "mlmodelc"),
              let coreMLModel = try? MLModel(contentsOf: modelURL),
              let visionModel = try? VNCoreMLModel(for: coreMLModel) else {
            print("Failed to load Core ML model")
            return
        }
        self.model = visionModel
    }

    func detectObjects(in image: UIImage, completion: @escaping ([Detection]) -> Void) {
        guard let model = self.model,
              let cgImage = image.cgImage else {
            completion([])
            return
        }

        let request = VNCoreMLRequest(model: model) { [weak self] request, error in
            if let error = error {
                print("Detection error: \\(error)")
                completion([])
                return
            }

            let detections = self?.processResults(request.results) ?? []
            DispatchQueue.main.async {
                completion(detections)
            }
        }

        request.imageCropAndScaleOption = .scaleFill

        let handler = VNImageRequestHandler(cgImage: cgImage, options: [:])
        DispatchQueue.global(qos: .userInitiated).async {
            try? handler.perform([request])
        }
    }

    private func processResults(_ results: [VNObservation]?) -> [Detection] {
        guard let results = results as? [VNRecognizedObjectObservation] else {
            return []
        }

        return results.compactMap { observation in
            guard let topLabel = observation.labels.first else { return nil }

            return Detection(
                boundingBox: observation.boundingBox,
                confidence: topLabel.confidence,
                className: topLabel.identifier
            )
        }
    }
}

struct Detection {
    let boundingBox: CGRect
    let confidence: Float
    let className: String
}

Real-time Camera Detection

// CameraViewController.swift - Real-time detection interface
import UIKit
import AVFoundation

class CameraViewController: UIViewController {
    private var captureSession: AVCaptureSession!
    private var previewLayer: AVCaptureVideoPreviewLayer!
    private let detector = YOLODetector()
    private var overlayView: DetectionOverlayView!

    override func viewDidLoad() {
        super.viewDidLoad()
        setupCamera()
        setupUI()
    }

    private func setupCamera() {
        captureSession = AVCaptureSession()
        captureSession.sessionPreset = .high

        guard let backCamera = AVCaptureDevice.default(for: .video),
              let input = try? AVCaptureDeviceInput(device: backCamera) else {
            return
        }

        captureSession.addInput(input)

        let videoOutput = AVCaptureVideoDataOutput()
        videoOutput.setSampleBufferDelegate(self, queue: DispatchQueue(label: "camera_queue"))
        captureSession.addOutput(videoOutput)

        previewLayer = AVCaptureVideoPreviewLayer(session: captureSession)
        previewLayer.frame = view.bounds
        previewLayer.videoGravity = .resizeAspectFill
        view.layer.addSublayer(previewLayer)

        captureSession.startRunning()
    }

    private func setupUI() {
        overlayView = DetectionOverlayView(frame: view.bounds)
        view.addSubview(overlayView)
    }
}

extension CameraViewController: AVCaptureVideoDataOutputSampleBufferDelegate {
    func captureOutput(_ output: AVCaptureOutput, didOutput sampleBuffer: CMSampleBuffer, from connection: AVCaptureConnection) {
        guard let imageBuffer = CMSampleBufferGetImageBuffer(sampleBuffer) else { return }

        let ciImage = CIImage(cvImageBuffer: imageBuffer)
        let context = CIContext()
        guard let cgImage = context.createCGImage(ciImage, from: ciImage.extent) else { return }

        let uiImage = UIImage(cgImage: cgImage)

        detector.detectObjects(in: uiImage) { [weak self] detections in
            self?.overlayView.updateDetections(detections)
        }
    }
}

12.3.2 Android Deployment (TensorFlow Lite)

Model Integration

// YOLODetector.kt - Android YOLO detector
class YOLODetector(private val context: Context) {
    private var interpreter: Interpreter? = null
    private val inputSize = 640
    private val classNames = loadClassNames()

    init {
        loadModel()
    }

    private fun loadModel() {
        try {
            val modelBuffer = loadModelFile("yolo_model.tflite")
            val options = Interpreter.Options()
            options.setNumThreads(4)
            options.setUseNNAPI(true) // Use NNAPI acceleration

            interpreter = Interpreter(modelBuffer, options)
        } catch (e: Exception) {
            Log.e("YOLODetector", "Error loading model", e)
        }
    }

    private fun loadModelFile(modelName: String): ByteBuffer {
        val assetFileDescriptor = context.assets.openFd(modelName)
        val inputStream = FileInputStream(assetFileDescriptor.fileDescriptor)
        val fileChannel = inputStream.channel
        val startOffset = assetFileDescriptor.startOffset
        val declaredLength = assetFileDescriptor.declaredLength
        return fileChannel.map(FileChannel.MapMode.READ_ONLY, startOffset, declaredLength)
    }

    fun detectObjects(bitmap: Bitmap): List<Detection> {
        val interpreter = this.interpreter ?: return emptyList()

        // Preprocess image
        val resizedBitmap = Bitmap.createScaledBitmap(bitmap, inputSize, inputSize, true)
        val inputBuffer = preprocessImage(resizedBitmap)

        // Prepare output buffer
        val outputShape = interpreter.getOutputTensor(0).shape()
        val outputBuffer = Array(1) { Array(outputShape[1]) { FloatArray(outputShape[2]) } }

        // Execute inference
        interpreter.run(inputBuffer, outputBuffer)

        // Post-process results
        return postprocessResults(outputBuffer[0], bitmap.width, bitmap.height)
    }

    private fun preprocessImage(bitmap: Bitmap): ByteBuffer {
        val byteBuffer = ByteBuffer.allocateDirect(4 * inputSize * inputSize * 3)
        byteBuffer.order(ByteOrder.nativeOrder())

        val pixels = IntArray(inputSize * inputSize)
        bitmap.getPixels(pixels, 0, inputSize, 0, 0, inputSize, inputSize)

        for (pixel in pixels) {
            val r = (pixel shr 16 and 0xFF) / 255.0f
            val g = (pixel shr 8 and 0xFF) / 255.0f
            val b = (pixel and 0xFF) / 255.0f

            byteBuffer.putFloat(r)
            byteBuffer.putFloat(g)
            byteBuffer.putFloat(b)
        }

        return byteBuffer
    }

    private fun postprocessResults(outputs: Array<FloatArray>, imageWidth: Int, imageHeight: Int): List<Detection> {
        val detections = mutableListOf<Detection>()
        val confidenceThreshold = 0.5f

        for (output in outputs) {
            if (output.size >= 6) { // x, y, w, h, confidence, class_scores...
                val centerX = output[0]
                val centerY = output[1]
                val width = output[2]
                val height = output[3]
                val confidence = output[4]

                if (confidence > confidenceThreshold) {
                    // Find highest class score
                    var maxClassScore = 0f
                    var classId = 0
                    for (i in 5 until output.size) {
                        if (output[i] > maxClassScore) {
                            maxClassScore = output[i]
                            classId = i - 5
                        }
                    }

                    if (maxClassScore * confidence > confidenceThreshold) {
                        val left = (centerX - width / 2) * imageWidth
                        val top = (centerY - height / 2) * imageHeight
                        val right = (centerX + width / 2) * imageWidth
                        val bottom = (centerY + height / 2) * imageHeight

                        detections.add(
                            Detection(
                                RectF(left, top, right, bottom),
                                maxClassScore * confidence,
                                classNames.getOrElse(classId) { "Unknown" }
                            )
                        )
                    }
                }
            }
        }

        return applyNMS(detections)
    }

    private fun applyNMS(detections: List<Detection>): List<Detection> {
        val sortedDetections = detections.sortedByDescending { it.confidence }
        val finalDetections = mutableListOf<Detection>()

        for (detection in sortedDetections) {
            var keep = true
            for (finalDetection in finalDetections) {
                if (calculateIoU(detection.boundingBox, finalDetection.boundingBox) > 0.5) {
                    keep = false
                    break
                }
            }
            if (keep) {
                finalDetections.add(detection)
            }
        }

        return finalDetections
    }

    private fun calculateIoU(box1: RectF, box2: RectF): Float {
        val intersectionArea = maxOf(0f, minOf(box1.right, box2.right) - maxOf(box1.left, box2.left)) *
                maxOf(0f, minOf(box1.bottom, box2.bottom) - maxOf(box1.top, box2.top))

        val box1Area = (box1.right - box1.left) * (box1.bottom - box1.top)
        val box2Area = (box2.right - box2.left) * (box2.bottom - box2.top)

        return intersectionArea / (box1Area + box2Area - intersectionArea)
    }

    private fun loadClassNames(): List<String> {
        return try {
            context.assets.open("class_names.txt").bufferedReader().readLines()
        } catch (e: Exception) {
            Log.e("YOLODetector", "Error loading class names", e)
            emptyList()
        }
    }
}

data class Detection(
    val boundingBox: RectF,
    val confidence: Float,
    val className: String
)

12.4 Edge Device Deployment

12.4.1 Jetson Nano Deployment

Environment Setup Script

#!/bin/bash
# jetson_setup.sh - Jetson Nano environment setup

# Update system
sudo apt update && sudo apt upgrade -y

# Install Python dependencies
sudo apt install -y python3-pip python3-dev

# Install PyTorch for Jetson
wget https://nvidia.box.com/shared/static/p57jwntv436lfrd78inwl7iml6p13fzh.whl -O torch-1.8.0-cp36-cp36m-linux_aarch64.whl
pip3 install torch-1.8.0-cp36-cp36m-linux_aarch64.whl

# Install torchvision
git clone --branch v0.9.0 https://github.com/pytorch/vision torchvision
cd torchvision
sudo python3 setup.py install

# Install other dependencies
pip3 install ultralytics opencv-python numpy pillow

# Set power mode (maximum performance)
sudo nvpmodel -m 0
sudo jetson_clocks

echo "Jetson Nano environment setup complete"

Optimized Detection Script

# jetson_detector.py - Jetson optimized detector
import torch
import cv2
import numpy as np
from ultralytics import YOLO
import time
import argparse

class JetsonYOLODetector:
    def __init__(self, model_path, device='cuda'):
        self.device = device
        self.model = YOLO(model_path)

        # Optimization settings
        if torch.cuda.is_available():
            torch.backends.cudnn.benchmark = True
            self.model.to(device)

    def detect_video(self, source=0, save_path=None):
        """
        Real-time video detection
        """
        cap = cv2.VideoCapture(source)

        # Set camera parameters
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        cap.set(cv2.CAP_PROP_FPS, 30)

        if save_path:
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            out = cv2.VideoWriter(save_path, fourcc, 20.0, (640, 480))

        fps_counter = 0
        start_time = time.time()

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Perform detection
            results = self.model(frame, verbose=False)

            # Draw results
            annotated_frame = results[0].plot()

            # Calculate FPS
            fps_counter += 1
            if fps_counter % 30 == 0:
                end_time = time.time()
                fps = 30 / (end_time - start_time)
                start_time = end_time
                print(f"FPS: {fps:.2f}")

            # Display FPS
            cv2.putText(annotated_frame, f"FPS: {fps:.1f}",
                       (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

            if save_path:
                out.write(annotated_frame)

            cv2.imshow('YOLO Detection', annotated_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        if save_path:
            out.release()
        cv2.destroyAllWindows()

    def benchmark(self, test_images_path, num_runs=100):
        """
        Performance benchmark test
        """
        import glob

        image_paths = glob.glob(f"{test_images_path}/*.jpg")
        if not image_paths:
            print("No test images found")
            return

        total_time = 0
        for i in range(num_runs):
            image_path = image_paths[i % len(image_paths)]
            image = cv2.imread(image_path)

            start_time = time.time()
            results = self.model(image, verbose=False)
            end_time = time.time()

            total_time += (end_time - start_time)

            if i % 10 == 0:
                print(f"Processed {i}/{num_runs} images")

        avg_time = total_time / num_runs
        avg_fps = 1 / avg_time

        print(f"\\nBenchmark Results:")
        print(f"Average inference time: {avg_time:.4f} seconds")
        print(f"Average FPS: {avg_fps:.2f}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--model", default="yolov8n.pt", help="Model path")
    parser.add_argument("--source", default=0, help="Video source")
    parser.add_argument("--save", help="Save video path")
    parser.add_argument("--benchmark", help="Benchmark images path")

    args = parser.parse_args()

    detector = JetsonYOLODetector(args.model)

    if args.benchmark:
        detector.benchmark(args.benchmark)
    else:
        detector.detect_video(args.source, args.save)

12.4.2 Raspberry Pi Deployment

Lightweight Deployment Solution

# rpi_detector.py - Raspberry Pi optimized detector
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
import time
from threading import Thread
import queue

class RPiYOLODetector:
    def __init__(self, model_path, num_threads=4):
        # Load TensorFlow Lite model
        self.interpreter = tflite.Interpreter(
            model_path=model_path,
            num_threads=num_threads
        )
        self.interpreter.allocate_tensors()

        # Get input/output information
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        self.input_shape = self.input_details[0]['shape']
        self.input_height = self.input_shape[1]
        self.input_width = self.input_shape[2]

        # Class names
        self.class_names = self.load_class_names()

        # Frame queues
        self.frame_queue = queue.Queue(maxsize=2)
        self.result_queue = queue.Queue(maxsize=2)

    def load_class_names(self):
        """Load class names"""
        try:
            with open('class_names.txt', 'r') as f:
                return [line.strip() for line in f.readlines()]
        except:
            return [f"class_{i}" for i in range(80)]  # COCO default 80 classes

    def preprocess_image(self, image):
        """Image preprocessing"""
        # Resize
        resized = cv2.resize(image, (self.input_width, self.input_height))

        # Normalize
        input_data = np.expand_dims(resized, axis=0)
        input_data = (input_data / 255.0).astype(np.float32)

        return input_data

    def postprocess_results(self, outputs, image_shape, conf_threshold=0.5):
        """Post-process detection results"""
        detections = []

        # Parse output (assuming output format is [batch, num_detections, 6])
        # 6 values: x_center, y_center, width, height, confidence, class_id

        height, width = image_shape[:2]

        for output in outputs[0]:  # Take first batch
            confidence = output[4]
            if confidence > conf_threshold:
                x_center, y_center, w, h = output[:4]
                class_id = int(output[5])

                # Convert to bounding box coordinates
                x1 = int((x_center - w/2) * width)
                y1 = int((y_center - h/2) * height)
                x2 = int((x_center + w/2) * width)
                y2 = int((y_center + h/2) * height)

                detections.append({
                    'bbox': [x1, y1, x2, y2],
                    'confidence': confidence,
                    'class_id': class_id,
                    'class_name': self.class_names[class_id] if class_id < len(self.class_names) else 'unknown'
                })

        return detections

    def detect_frame(self, frame):
        """Single frame detection"""
        # Preprocess
        input_data = self.preprocess_image(frame)

        # Inference
        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
        self.interpreter.invoke()

        # Get output
        outputs = [self.interpreter.get_tensor(detail['index'])
                  for detail in self.output_details]

        # Post-process
        detections = self.postprocess_results(outputs, frame.shape)

        return detections

    def draw_detections(self, frame, detections):
        """Draw detection results"""
        for det in detections:
            x1, y1, x2, y2 = det['bbox']
            conf = det['confidence']
            class_name = det['class_name']

            # Draw bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # Draw label
            label = f"{class_name}: {conf:.2f}"
            cv2.putText(frame, label, (x1, y1-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        return frame

    def detection_worker(self):
        """Detection worker thread"""
        while True:
            try:
                frame = self.frame_queue.get(timeout=1)
                detections = self.detect_frame(frame)
                self.result_queue.put((frame, detections))
                self.frame_queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Detection error: {e}")

    def run_camera_detection(self, camera_id=0):
        """Run camera detection"""
        cap = cv2.VideoCapture(camera_id)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
        cap.set(cv2.CAP_PROP_FPS, 15)

        # Start detection thread
        detection_thread = Thread(target=self.detection_worker, daemon=True)
        detection_thread.start()

        fps_counter = 0
        start_time = time.time()

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Add frame to queue
            if not self.frame_queue.full():
                self.frame_queue.put(frame.copy())

            # Get detection results
            try:
                result_frame, detections = self.result_queue.get_nowait()
                annotated_frame = self.draw_detections(result_frame, detections)

                # Calculate and display FPS
                fps_counter += 1
                if fps_counter % 30 == 0:
                    end_time = time.time()
                    fps = 30 / (end_time - start_time)
                    start_time = end_time
                    print(f"FPS: {fps:.2f}")

                cv2.putText(annotated_frame, f"FPS: {fps:.1f}",
                           (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

                cv2.imshow('RPi YOLO Detection', annotated_frame)

            except queue.Empty:
                cv2.imshow('RPi YOLO Detection', frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    detector = RPiYOLODetector("yolo_model.tflite")
    detector.run_camera_detection()

12.5 Real-Time Detection System Design

12.5.1 System Architecture Design

🔄 正在渲染 Mermaid 图表...

12.5.2 Pipeline Processing System

# real_time_system.py - Real-time detection system
import cv2
import numpy as np
import torch
from ultralytics import YOLO
import threading
import queue
import time
from collections import deque
import json

class RealTimeDetectionSystem:
    def __init__(self, model_path, max_queue_size=10, num_workers=2):
        self.model = YOLO(model_path)
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model.to(self.device)

        # Queue management
        self.input_queue = queue.Queue(maxsize=max_queue_size)
        self.output_queue = queue.Queue(maxsize=max_queue_size)

        # Worker threads
        self.workers = []
        self.num_workers = num_workers
        self.running = False

        # Performance monitoring
        self.fps_tracker = deque(maxlen=30)
        self.processing_times = deque(maxlen=100)

        # Result storage
        self.detection_history = deque(maxlen=1000)

    def worker_thread(self):
        """Detection worker thread"""
        while self.running:
            try:
                # Get input data
                frame_data = self.input_queue.get(timeout=1)
                if frame_data is None:  # Stop signal
                    break

                frame, timestamp, frame_id = frame_data

                # Perform detection
                start_time = time.time()
                results = self.model(frame, verbose=False)
                end_time = time.time()

                processing_time = end_time - start_time
                self.processing_times.append(processing_time)

                # Parse results
                detections = []
                for r in results:
                    boxes = r.boxes
                    if boxes is not None:
                        for box in boxes:
                            x1, y1, x2, y2 = box.xyxy[0].tolist()
                            conf = box.conf[0].item()
                            cls = box.cls[0].item()
                            class_name = self.model.names[int(cls)]

                            detections.append({
                                'bbox': [x1, y1, x2, y2],
                                'confidence': conf,
                                'class': class_name,
                                'class_id': int(cls)
                            })

                # Output results
                result_data = {
                    'frame': frame,
                    'detections': detections,
                    'timestamp': timestamp,
                    'frame_id': frame_id,
                    'processing_time': processing_time
                }

                if not self.output_queue.full():
                    self.output_queue.put(result_data)

                # Store history
                self.detection_history.append({
                    'timestamp': timestamp,
                    'frame_id': frame_id,
                    'detection_count': len(detections),
                    'processing_time': processing_time
                })

                self.input_queue.task_done()

            except queue.Empty:
                continue
            except Exception as e:
                print(f"Worker thread error: {e}")

    def start(self):
        """Start system"""
        self.running = True
        for i in range(self.num_workers):
            worker = threading.Thread(target=self.worker_thread, daemon=True)
            worker.start()
            self.workers.append(worker)

    def stop(self):
        """Stop system"""
        self.running = False

        # Send stop signal
        for _ in range(self.num_workers):
            if not self.input_queue.full():
                self.input_queue.put(None)

        # Wait for worker threads to finish
        for worker in self.workers:
            worker.join(timeout=2)

    def add_frame(self, frame, timestamp=None, frame_id=None):
        """Add frame to processing queue"""
        if timestamp is None:
            timestamp = time.time()
        if frame_id is None:
            frame_id = int(timestamp * 1000)

        if not self.input_queue.full():
            self.input_queue.put((frame, timestamp, frame_id))
            return True
        return False

    def get_result(self, timeout=0.1):
        """Get detection result"""
        try:
            return self.output_queue.get(timeout=timeout)
        except queue.Empty:
            return None

    def get_statistics(self):
        """Get system statistics"""
        if not self.processing_times:
            return {}

        return {
            'avg_processing_time': np.mean(self.processing_times),
            'max_processing_time': np.max(self.processing_times),
            'min_processing_time': np.min(self.processing_times),
            'current_fps': len(self.fps_tracker) / 30 if self.fps_tracker else 0,
            'input_queue_size': self.input_queue.qsize(),
            'output_queue_size': self.output_queue.qsize(),
            'total_detections': len(self.detection_history)
        }

    def run_camera_detection(self, camera_id=0, display=True):
        """Run camera detection"""
        cap = cv2.VideoCapture(camera_id)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

        self.start()

        frame_count = 0
        fps_start_time = time.time()

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                frame_count += 1

                # Add frame to processing queue
                self.add_frame(frame)

                # Get detection results
                result = self.get_result()
                if result:
                    if display:
                        # Draw detection results
                        annotated_frame = self.draw_detections(
                            result['frame'],
                            result['detections']
                        )

                        # Display statistics
                        stats = self.get_statistics()
                        info_text = f"FPS: {stats.get('current_fps', 0):.1f} | " \
                                   f"Proc: {stats.get('avg_processing_time', 0):.3f}s"

                        cv2.putText(annotated_frame, info_text, (10, 30),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

                        cv2.imshow('Real-time Detection', annotated_frame)

                # Calculate FPS
                if frame_count % 30 == 0:
                    fps_end_time = time.time()
                    fps = 30 / (fps_end_time - fps_start_time)
                    self.fps_tracker.append(fps)
                    fps_start_time = fps_end_time

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

        finally:
            self.stop()
            cap.release()
            cv2.destroyAllWindows()

    def draw_detections(self, frame, detections):
        """Draw detection results"""
        for det in detections:
            x1, y1, x2, y2 = map(int, det['bbox'])
            conf = det['confidence']
            class_name = det['class']

            # Draw bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # Draw label
            label = f"{class_name}: {conf:.2f}"
            cv2.putText(frame, label, (x1, y1-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        return frame

    def export_statistics(self, filename):
        """Export statistics"""
        stats = {
            'system_stats': self.get_statistics(),
            'detection_history': list(self.detection_history),
            'performance_data': {
                'processing_times': list(self.processing_times),
                'fps_history': list(self.fps_tracker)
            }
        }

        with open(filename, 'w') as f:
            json.dump(stats, f, indent=2)

if __name__ == "__main__":
    system = RealTimeDetectionSystem("yolov8n.pt", num_workers=2)
    system.run_camera_detection(camera_id=0)

12.6 Production Environment Monitoring

12.6.1 Monitoring Metrics Design

🔄 正在渲染 Mermaid 图表...

12.6.2 Monitoring System Implementation

# monitoring.py - Production environment monitoring system
import time
import psutil
import GPUtil
import logging
import json
from dataclasses import dataclass
from typing import List, Dict, Any
from collections import defaultdict, deque
import threading
import sqlite3

@dataclass
class MetricRecord:
    timestamp: float
    metric_name: str
    value: float
    tags: Dict[str, str] = None

class PerformanceMonitor:
    def __init__(self, db_path="metrics.db"):
        self.db_path = db_path
        self.metrics_buffer = deque(maxlen=10000)
        self.running = False
        self.monitor_thread = None

        # Initialize database
        self.init_db()

        # Configure logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('detection_system.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def init_db(self):
        """Initialize monitoring database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp REAL,
                metric_name TEXT,
                value REAL,
                tags TEXT
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alerts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp REAL,
                alert_type TEXT,
                message TEXT,
                severity TEXT
            )
        ''')

        conn.commit()
        conn.close()

    def record_metric(self, name: str, value: float, tags: Dict[str, str] = None):
        """Record metric"""
        record = MetricRecord(
            timestamp=time.time(),
            metric_name=name,
            value=value,
            tags=tags or {}
        )
        self.metrics_buffer.append(record)

    def start_monitoring(self, interval=5):
        """Start monitoring"""
        self.running = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop,
            args=(interval,),
            daemon=True
        )
        self.monitor_thread.start()

    def stop_monitoring(self):
        """Stop monitoring"""
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join()

    def _monitor_loop(self, interval):
        """Monitoring loop"""
        while self.running:
            try:
                # Collect system metrics
                self._collect_system_metrics()

                # Persist metrics
                self._persist_metrics()

                # Check alerts
                self._check_alerts()

                time.sleep(interval)

            except Exception as e:
                self.logger.error(f"Monitoring error: {e}")

    def _collect_system_metrics(self):
        """Collect system metrics"""
        # CPU metrics
        cpu_percent = psutil.cpu_percent(interval=1)
        self.record_metric("system.cpu.usage", cpu_percent, {"unit": "percent"})

        # Memory metrics
        memory = psutil.virtual_memory()
        self.record_metric("system.memory.usage", memory.percent, {"unit": "percent"})
        self.record_metric("system.memory.available", memory.available, {"unit": "bytes"})

        # GPU metrics
        try:
            gpus = GPUtil.getGPUs()
            for i, gpu in enumerate(gpus):
                self.record_metric("system.gpu.usage", gpu.load * 100,
                                 {"gpu_id": str(i), "unit": "percent"})
                self.record_metric("system.gpu.memory", gpu.memoryUtil * 100,
                                 {"gpu_id": str(i), "unit": "percent"})
                self.record_metric("system.gpu.temperature", gpu.temperature,
                                 {"gpu_id": str(i), "unit": "celsius"})
        except Exception as e:
            self.logger.warning(f"GPU monitoring failed: {e}")

        # Disk metrics
        disk = psutil.disk_usage('/')
        self.record_metric("system.disk.usage", disk.percent, {"unit": "percent"})

    def _persist_metrics(self):
        """Persist metrics to database"""
        if not self.metrics_buffer:
            return

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Batch insert metrics
        metrics_to_insert = []
        while self.metrics_buffer:
            try:
                record = self.metrics_buffer.popleft()
                metrics_to_insert.append((
                    record.timestamp,
                    record.metric_name,
                    record.value,
                    json.dumps(record.tags)
                ))
            except IndexError:
                break

        if metrics_to_insert:
            cursor.executemany(
                'INSERT INTO metrics (timestamp, metric_name, value, tags) VALUES (?, ?, ?, ?)',
                metrics_to_insert
            )
            conn.commit()

        conn.close()

    def _check_alerts(self):
        """Check alert conditions"""
        # Get recent metrics
        recent_metrics = self.get_recent_metrics(minutes=5)

        # CPU usage alert
        cpu_metrics = [m for m in recent_metrics if m['metric_name'] == 'system.cpu.usage']
        if cpu_metrics:
            avg_cpu = sum(m['value'] for m in cpu_metrics) / len(cpu_metrics)
            if avg_cpu > 80:
                self._send_alert("HIGH_CPU_USAGE", f"CPU usage: {avg_cpu:.1f}%", "warning")

        # GPU memory alert
        gpu_memory_metrics = [m for m in recent_metrics if m['metric_name'] == 'system.gpu.memory']
        if gpu_memory_metrics:
            max_gpu_memory = max(m['value'] for m in gpu_memory_metrics)
            if max_gpu_memory > 90:
                self._send_alert("HIGH_GPU_MEMORY", f"GPU memory: {max_gpu_memory:.1f}%", "warning")

    def _send_alert(self, alert_type: str, message: str, severity: str):
        """Send alert"""
        self.logger.warning(f"ALERT [{severity.upper()}] {alert_type}: {message}")

        # Store alert to database
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            'INSERT INTO alerts (timestamp, alert_type, message, severity) VALUES (?, ?, ?, ?)',
            (time.time(), alert_type, message, severity)
        )
        conn.commit()
        conn.close()

    def get_recent_metrics(self, minutes=10):
        """Get recent metrics"""
        since_timestamp = time.time() - (minutes * 60)

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            'SELECT timestamp, metric_name, value, tags FROM metrics WHERE timestamp > ? ORDER BY timestamp DESC',
            (since_timestamp,)
        )

        metrics = []
        for row in cursor.fetchall():
            metrics.append({
                'timestamp': row[0],
                'metric_name': row[1],
                'value': row[2],
                'tags': json.loads(row[3])
            })

        conn.close()
        return metrics

    def get_performance_summary(self, hours=24):
        """Get performance summary"""
        since_timestamp = time.time() - (hours * 3600)

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Get statistics for various metrics
        cursor.execute('''
            SELECT metric_name,
                   AVG(value) as avg_value,
                   MIN(value) as min_value,
                   MAX(value) as max_value,
                   COUNT(*) as count
            FROM metrics
            WHERE timestamp > ?
            GROUP BY metric_name
        ''', (since_timestamp,))

        summary = {}
        for row in cursor.fetchall():
            summary[row[0]] = {
                'avg': row[1],
                'min': row[2],
                'max': row[3],
                'count': row[4]
            }

        conn.close()
        return summary

class DetectionMonitor(PerformanceMonitor):
    """Detection system specific monitor"""

    def __init__(self, db_path="detection_metrics.db"):
        super().__init__(db_path)
        self.detection_stats = defaultdict(int)
        self.confidence_history = deque(maxlen=1000)

    def record_detection(self, detections: List[Dict], processing_time: float):
        """Record detection results"""
        # Record processing time
        self.record_metric("detection.processing_time", processing_time, {"unit": "seconds"})

        # Record detection count
        detection_count = len(detections)
        self.record_metric("detection.count", detection_count, {"unit": "objects"})

        # Record confidence statistics
        if detections:
            confidences = [det['confidence'] for det in detections]
            avg_confidence = sum(confidences) / len(confidences)
            max_confidence = max(confidences)
            min_confidence = min(confidences)

            self.record_metric("detection.confidence.avg", avg_confidence)
            self.record_metric("detection.confidence.max", max_confidence)
            self.record_metric("detection.confidence.min", min_confidence)

            # Store confidence history
            self.confidence_history.extend(confidences)

        # Record class statistics
        class_counts = defaultdict(int)
        for det in detections:
            class_name = det['class']
            class_counts[class_name] += 1
            self.detection_stats[class_name] += 1

        for class_name, count in class_counts.items():
            self.record_metric("detection.class_count", count, {"class": class_name})

    def get_detection_summary(self):
        """Get detection summary"""
        summary = self.get_performance_summary()

        # Add detection-specific statistics
        summary['detection_stats'] = dict(self.detection_stats)

        if self.confidence_history:
            summary['confidence_distribution'] = {
                'mean': sum(self.confidence_history) / len(self.confidence_history),
                'std': np.std(list(self.confidence_history)),
                'min': min(self.confidence_history),
                'max': max(self.confidence_history)
            }

        return summary

# Usage example
if __name__ == "__main__":
    monitor = DetectionMonitor()
    monitor.start_monitoring(interval=5)

    # Simulate detection results
    import numpy as np

    for i in range(100):
        # Simulate detection results
        num_detections = np.random.randint(0, 10)
        detections = []
        for j in range(num_detections):
            detections.append({
                'bbox': [100, 100, 200, 200],
                'confidence': np.random.uniform(0.5, 1.0),
                'class': np.random.choice(['person', 'car', 'bicycle'])
            })

        processing_time = np.random.uniform(0.01, 0.1)
        monitor.record_detection(detections, processing_time)

        time.sleep(0.1)

    # Get summary
    summary = monitor.get_detection_summary()
    print(json.dumps(summary, indent=2))

    monitor.stop_monitoring()

Chapter Summary

YOLO practical deployment and applications is the critical step in transforming theory into practice. Through this chapter, we have mastered:

  1. Deployment Architecture Design: Selection and design principles for three main deployment modes: cloud, edge, and mobile
  2. Server-Side Deployment: Modern deployment solutions including Docker containerization, FastAPI services, and Kubernetes clusters
  3. Mobile Deployment: Specific implementation methods for iOS Core ML and Android TensorFlow Lite
  4. Edge Device Deployment: Optimized deployment for edge computing devices such as Jetson Nano and Raspberry Pi
  5. Real-Time Detection Systems: High-performance pipeline processing and multi-threading optimization techniques
  6. Production Environment Monitoring: Comprehensive performance monitoring, resource monitoring, and business monitoring systems

These deployment techniques and monitoring solutions can help us:

  • Build stable and reliable production-grade detection systems
  • Meet performance and resource requirements for different scenarios
  • Achieve system observability and maintainability
  • Ensure long-term stable operation and continuous optimization

In the next chapter, we will gain deep insights into YOLO’s practical application patterns and technical key points across different industries through specific industry application cases.