Chapter 8: Dataset Preparation and Annotation
Haiyue
37min
Chapter 8: Dataset Preparation and Annotation
Learning Objectives
- Understand common object detection datasets (COCO, VOC, ImageNet, etc.)
- Master the use of data annotation tools (LabelImg, CVAT, etc.)
- Learn data augmentation techniques
- Familiarize with data format conversion and preprocessing workflows
8.1 Dataset Overview
8.1.1 Common Public Datasets
import os
import json
import xml.etree.ElementTree as ET
from pathlib import Path
import numpy as np
import cv2
class DatasetInfo:
"""Dataset Information Management"""
def __init__(self):
self.datasets = {
"COCO": {
"description": "Common Objects in Context",
"classes": 80,
"train_images": 118287,
"val_images": 5000,
"annotation_format": "JSON",
"download_size": "~20GB",
"use_case": "General object detection and segmentation"
},
"PASCAL VOC": {
"description": "Visual Object Classes",
"classes": 20,
"train_images": 16551,
"val_images": 4952,
"annotation_format": "XML",
"download_size": "~2GB",
"use_case": "Classic object detection benchmark"
},
"Open Images": {
"description": "Google open-source large-scale dataset",
"classes": 600,
"train_images": 1743042,
"val_images": 41620,
"annotation_format": "CSV",
"download_size": "~500GB",
"use_case": "Large-scale pre-training"
}
}
def print_dataset_info(self):
"""Print dataset information"""
print("Common Object Detection Datasets:")
print("=" * 60)
for name, info in self.datasets.items():
print(f"\n{name}:")
print(f" Description: {info['description']}")
print(f" Classes: {info['classes']}")
print(f" Training Images: {info['train_images']:,}")
print(f" Validation Images: {info['val_images']:,}")
print(f" Annotation Format: {info['annotation_format']}")
print(f" Dataset Size: {info['download_size']}")
print(f" Use Case: {info['use_case']}")
# COCO Class Definitions
COCO_CLASSES = [
'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
'train', 'truck', 'boat', 'traffic light', 'fire hydrant',
'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog',
'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe',
'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat',
'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot',
'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop',
'mouse', 'remote', 'keyboard', 'cell phone', 'microwave',
'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock',
'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]
# VOC Class Definitions
VOC_CLASSES = [
'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car',
'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse',
'motorbike', 'person', 'pottedplant', 'sheep', 'sofa',
'train', 'tvmonitor'
]
dataset_info = DatasetInfo()
dataset_info.print_dataset_info()
8.2 Data Annotation Tools
8.2.1 LabelImg Usage Guide
# Install LabelImg
# pip install labelImg
class LabelImgGuide:
"""LabelImg Usage Guide"""
def __init__(self):
self.shortcuts = {
"Open Image": "Ctrl+O",
"Save": "Ctrl+S",
"Create Rectangle": "W",
"Next Image": "D",
"Previous Image": "A",
"Verify Image": "Space",
"Delete Selected Box": "Delete",
"Duplicate Box": "Ctrl+D",
"Undo": "Ctrl+Z"
}
self.workflow = [
"1. Launch LabelImg: labelImg",
"2. Open image folder",
"3. Set save directory",
"4. Select annotation format (YOLO/Pascal VOC)",
"5. Create classes.txt file",
"6. Start annotating: Press W to create bounding box",
"7. Select class and confirm",
"8. Save and continue to next image",
"9. Periodically check annotation quality"
]
def print_guide(self):
"""Print usage guide"""
print("LabelImg Usage Guide:")
print("=" * 40)
print("\nWorkflow:")
for step in self.workflow:
print(f" {step}")
print(f"\nCommon Shortcuts:")
for action, key in self.shortcuts.items():
print(f" {action}: {key}")
print(f"\nNotes:")
print(" • Ensure bounding boxes tightly fit the object")
print(" • Avoid missing small objects")
print(" • Handle occlusion cases carefully")
print(" • Regularly backup annotation files")
labelimg_guide = LabelImgGuide()
labelimg_guide.print_guide()
8.2.2 Annotation Quality Control
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from collections import Counter
class AnnotationQualityControl:
"""Annotation Quality Control Tool"""
def __init__(self):
self.quality_metrics = {}
def check_annotation_consistency(self, annotation_dir: str):
"""Check annotation consistency"""
issues = {
'invalid_bbox': [],
'missing_class': [],
'duplicate_bbox': [],
'size_anomaly': []
}
annotation_files = list(Path(annotation_dir).glob('*.txt'))
for file_path in annotation_files:
try:
with open(file_path, 'r') as f:
lines = f.readlines()
bboxes = []
for line_idx, line in enumerate(lines):
parts = line.strip().split()
if len(parts) < 5:
continue
class_id = int(parts[0])
x, y, w, h = map(float, parts[1:5])
# Check bounding box validity
if not (0 <= x <= 1 and 0 <= y <= 1 and 0 < w <= 1 and 0 < h <= 1):
issues['invalid_bbox'].append(f"{file_path.name}:{line_idx+1}")
# Check size anomalies
if w < 0.01 or h < 0.01: # Too small
issues['size_anomaly'].append(f"{file_path.name}:{line_idx+1} - too small")
elif w > 0.9 or h > 0.9: # Too large
issues['size_anomaly'].append(f"{file_path.name}:{line_idx+1} - too large")
bboxes.append((class_id, x, y, w, h))
# Check for duplicate bounding boxes
if len(bboxes) != len(set(bboxes)):
issues['duplicate_bbox'].append(file_path.name)
except Exception as e:
print(f"Error processing {file_path}: {e}")
return issues
def analyze_class_distribution(self, annotation_dir: str):
"""Analyze class distribution"""
class_counts = Counter()
bbox_sizes = []
annotation_files = list(Path(annotation_dir).glob('*.txt'))
for file_path in annotation_files:
try:
with open(file_path, 'r') as f:
lines = f.readlines()
for line in lines:
parts = line.strip().split()
if len(parts) >= 5:
class_id = int(parts[0])
w, h = float(parts[3]), float(parts[4])
class_counts[class_id] += 1
bbox_sizes.append((w, h))
except Exception as e:
print(f"Error analyzing {file_path}: {e}")
return class_counts, bbox_sizes
def visualize_annotations(self, image_path: str, annotation_path: str,
class_names: list = None):
"""Visualize annotations"""
# Read image
image = cv2.imread(image_path)
if image is None:
print(f"Cannot load image: {image_path}")
return
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
h, w = image.shape[:2]
# Read annotations
annotations = []
try:
with open(annotation_path, 'r') as f:
lines = f.readlines()
for line in lines:
parts = line.strip().split()
if len(parts) >= 5:
class_id = int(parts[0])
x_center, y_center, width, height = map(float, parts[1:5])
# Convert to pixel coordinates
x_center *= w
y_center *= h
width *= w
height *= h
x1 = x_center - width / 2
y1 = y_center - height / 2
x2 = x_center + width / 2
y2 = y_center + height / 2
annotations.append((class_id, x1, y1, x2, y2))
except Exception as e:
print(f"Error reading annotations: {e}")
return
# Draw image and annotations
fig, ax = plt.subplots(1, 1, figsize=(12, 8))
ax.imshow(image)
colors = plt.cm.tab10(np.linspace(0, 1, 10))
for class_id, x1, y1, x2, y2 in annotations:
color = colors[class_id % len(colors)]
# Draw bounding box
rect = patches.Rectangle((x1, y1), x2-x1, y2-y1,
linewidth=2, edgecolor=color,
facecolor='none')
ax.add_patch(rect)
# Add class label
if class_names and class_id < len(class_names):
label = class_names[class_id]
else:
label = f"Class {class_id}"
ax.text(x1, y1-5, label, color=color, fontsize=12,
bbox=dict(facecolor='white', alpha=0.8))
ax.set_title(f"Annotations: {Path(image_path).name}")
ax.axis('off')
plt.tight_layout()
plt.show()
def generate_quality_report(self, annotation_dir: str):
"""Generate quality report"""
print("Annotation Quality Check Report")
print("=" * 40)
# Check consistency issues
issues = self.check_annotation_consistency(annotation_dir)
print(f"\nConsistency Check:")
for issue_type, issue_list in issues.items():
print(f" {issue_type}: {len(issue_list)} issues")
if issue_list and len(issue_list) <= 5:
for issue in issue_list:
print(f" - {issue}")
elif len(issue_list) > 5:
print(f" - Showing first 5: {issue_list[:5]}")
# Analyze class distribution
class_counts, bbox_sizes = self.analyze_class_distribution(annotation_dir)
print(f"\nClass Distribution:")
for class_id, count in sorted(class_counts.items()):
print(f" Class {class_id}: {count} instances")
if bbox_sizes:
widths, heights = zip(*bbox_sizes)
print(f"\nBounding Box Statistics:")
print(f" Average Width: {np.mean(widths):.3f}")
print(f" Average Height: {np.mean(heights):.3f}")
print(f" Min Area: {min(w*h for w, h in bbox_sizes):.6f}")
print(f" Max Area: {max(w*h for w, h in bbox_sizes):.6f}")
# Usage example
qc = AnnotationQualityControl()
# qc.generate_quality_report('./annotations')
# qc.visualize_annotations('./images/sample.jpg', './annotations/sample.txt', COCO_CLASSES)
8.3 Data Format Conversion
8.3.1 Common Format Converters
class FormatConverter:
"""Data Format Converter"""
def __init__(self):
self.supported_formats = ['yolo', 'coco', 'voc', 'csv']
def voc_to_yolo(self, xml_path: str, image_width: int, image_height: int):
"""Convert VOC XML format to YOLO format"""
tree = ET.parse(xml_path)
root = tree.getroot()
yolo_annotations = []
for obj in root.findall('object'):
class_name = obj.find('name').text
# Need class name to ID mapping
if class_name in VOC_CLASSES:
class_id = VOC_CLASSES.index(class_name)
else:
continue
bbox = obj.find('bndbox')
x1 = float(bbox.find('xmin').text)
y1 = float(bbox.find('ymin').text)
x2 = float(bbox.find('xmax').text)
y2 = float(bbox.find('ymax').text)
# Convert to YOLO format (normalized center coordinates and width/height)
x_center = (x1 + x2) / 2.0 / image_width
y_center = (y1 + y2) / 2.0 / image_height
width = (x2 - x1) / image_width
height = (y2 - y1) / image_height
yolo_annotations.append(f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}")
return yolo_annotations
def yolo_to_voc(self, yolo_line: str, image_width: int, image_height: int,
class_names: list):
"""Convert YOLO format to VOC format"""
parts = yolo_line.strip().split()
if len(parts) < 5:
return None
class_id = int(parts[0])
x_center = float(parts[1])
y_center = float(parts[2])
width = float(parts[3])
height = float(parts[4])
# Convert to VOC format absolute coordinates
x1 = int((x_center - width/2) * image_width)
y1 = int((y_center - height/2) * image_height)
x2 = int((x_center + width/2) * image_width)
y2 = int((y_center + height/2) * image_height)
class_name = class_names[class_id] if class_id < len(class_names) else f"class_{class_id}"
return {
'class_name': class_name,
'xmin': max(0, x1),
'ymin': max(0, y1),
'xmax': min(image_width, x2),
'ymax': min(image_height, y2)
}
def coco_to_yolo(self, coco_annotation: dict, image_width: int, image_height: int):
"""Convert COCO format to YOLO format"""
bbox = coco_annotation['bbox'] # [x, y, width, height]
x, y, w, h = bbox
# COCO coordinates are top-left, convert to center coordinates
x_center = (x + w/2) / image_width
y_center = (y + h/2) / image_height
width = w / image_width
height = h / image_height
class_id = coco_annotation['category_id']
return f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}"
def batch_convert(self, input_dir: str, output_dir: str,
input_format: str, output_format: str):
"""Batch format conversion"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Converting from {input_format} to {output_format}")
print(f"Input: {input_dir}")
print(f"Output: {output_dir}")
if input_format == 'voc' and output_format == 'yolo':
self._convert_voc_to_yolo_batch(input_dir, output_dir)
elif input_format == 'yolo' and output_format == 'voc':
self._convert_yolo_to_voc_batch(input_dir, output_dir)
else:
print(f"Conversion from {input_format} to {output_format} not implemented")
def _convert_voc_to_yolo_batch(self, input_dir: Path, output_dir: Path):
"""Batch VOC to YOLO conversion"""
xml_files = list(input_dir.glob('*.xml'))
for xml_file in xml_files:
try:
# Get corresponding image file
img_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
img_file = None
for ext in img_extensions:
potential_img = input_dir / f"{xml_file.stem}{ext}"
if potential_img.exists():
img_file = potential_img
break
if img_file is None:
print(f"Image file not found for {xml_file}")
continue
# Read image dimensions
image = cv2.imread(str(img_file))
if image is None:
continue
h, w = image.shape[:2]
# Convert annotations
yolo_annotations = self.voc_to_yolo(str(xml_file), w, h)
# Save YOLO format file
output_file = output_dir / f"{xml_file.stem}.txt"
with open(output_file, 'w') as f:
f.write('\n'.join(yolo_annotations))
print(f"Converted: {xml_file.name} -> {output_file.name}")
except Exception as e:
print(f"Error converting {xml_file}: {e}")
# Dataset Splitter
class DatasetSplitter:
"""Dataset Splitting Tool"""
def __init__(self):
pass
def split_dataset(self, image_dir: str, annotation_dir: str,
output_dir: str, split_ratios: dict = None):
"""Split dataset"""
if split_ratios is None:
split_ratios = {'train': 0.7, 'val': 0.2, 'test': 0.1}
image_dir = Path(image_dir)
annotation_dir = Path(annotation_dir)
output_dir = Path(output_dir)
# Get all image files
image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
image_files = []
for ext in image_extensions:
image_files.extend(list(image_dir.glob(f'*{ext}')))
image_files.extend(list(image_dir.glob(f'*{ext.upper()}')))
# Filter images with corresponding annotation files
valid_images = []
for img_file in image_files:
ann_file = annotation_dir / f"{img_file.stem}.txt"
if ann_file.exists():
valid_images.append(img_file)
print(f"Found {len(valid_images)} valid image-annotation pairs")
# Shuffle data
np.random.shuffle(valid_images)
# Calculate split points
total = len(valid_images)
train_end = int(total * split_ratios['train'])
val_end = train_end + int(total * split_ratios['val'])
splits = {
'train': valid_images[:train_end],
'val': valid_images[train_end:val_end],
'test': valid_images[val_end:]
}
# Create directory structure and copy files
for split_name, files in splits.items():
if not files:
continue
# Create directories
split_img_dir = output_dir / split_name / 'images'
split_ann_dir = output_dir / split_name / 'labels'
split_img_dir.mkdir(parents=True, exist_ok=True)
split_ann_dir.mkdir(parents=True, exist_ok=True)
# Copy files
for img_file in files:
ann_file = annotation_dir / f"{img_file.stem}.txt"
# Copy image
import shutil
shutil.copy2(img_file, split_img_dir / img_file.name)
# Copy annotation
if ann_file.exists():
shutil.copy2(ann_file, split_ann_dir / ann_file.name)
print(f"{split_name}: {len(files)} files")
# Generate data configuration file
self._create_dataset_yaml(output_dir, list(splits.keys()))
print(f"Dataset split completed. Output: {output_dir}")
def _create_dataset_yaml(self, dataset_dir: Path, splits: list):
"""Create dataset configuration file"""
config = {
'path': str(dataset_dir.absolute()),
'train': 'train/images' if 'train' in splits else None,
'val': 'val/images' if 'val' in splits else None,
'test': 'test/images' if 'test' in splits else None,
'nc': 80, # Modify according to actual situation
'names': COCO_CLASSES # Modify according to actual situation
}
# Remove None values
config = {k: v for k, v in config.items() if v is not None}
import yaml
with open(dataset_dir / 'data.yaml', 'w') as f:
yaml.dump(config, f, default_flow_style=False)
# Usage example
converter = FormatConverter()
splitter = DatasetSplitter()
print("Data format conversion and dataset splitting tools initialized")
8.4 Data Augmentation Techniques
8.4.1 Geometric Transformation Augmentation
import albumentations as A
from albumentations.pytorch import ToTensorV2
import random
class DataAugmentation:
"""Data Augmentation Tool"""
def __init__(self):
self.geometric_transforms = A.Compose([
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.1),
A.RandomRotate90(p=0.5),
A.Rotate(limit=15, p=0.5),
A.ShiftScaleRotate(
shift_limit=0.1,
scale_limit=0.2,
rotate_limit=15,
p=0.5
),
A.Perspective(scale=(0.05, 0.1), p=0.3),
], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
self.color_transforms = A.Compose([
A.RandomBrightnessContrast(p=0.5),
A.HueSaturationValue(p=0.5),
A.RGBShift(p=0.3),
A.RandomGamma(p=0.3),
A.CLAHE(p=0.2),
], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
self.noise_transforms = A.Compose([
A.GaussNoise(p=0.3),
A.MotionBlur(p=0.2),
A.GaussianBlur(p=0.2),
A.ImageCompression(quality_lower=85, quality_upper=100, p=0.3),
], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
self.weather_transforms = A.Compose([
A.RandomRain(p=0.1),
A.RandomSnow(p=0.1),
A.RandomFog(p=0.1),
A.RandomSunFlare(p=0.1),
], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
def apply_augmentation(self, image: np.ndarray, bboxes: list,
class_labels: list, aug_type: str = 'all'):
"""Apply data augmentation"""
if aug_type == 'geometric':
transform = self.geometric_transforms
elif aug_type == 'color':
transform = self.color_transforms
elif aug_type == 'noise':
transform = self.noise_transforms
elif aug_type == 'weather':
transform = self.weather_transforms
elif aug_type == 'all':
# Randomly select a transformation type
transforms = [
self.geometric_transforms,
self.color_transforms,
self.noise_transforms,
self.weather_transforms
]
transform = random.choice(transforms)
else:
return image, bboxes, class_labels
try:
transformed = transform(
image=image,
bboxes=bboxes,
class_labels=class_labels
)
return transformed['image'], transformed['bboxes'], transformed['class_labels']
except Exception as e:
print(f"Augmentation error: {e}")
return image, bboxes, class_labels
def create_training_transform(self, image_size: int = 640):
"""Create training transformation pipeline"""
return A.Compose([
A.LongestMaxSize(max_size=image_size),
A.PadIfNeeded(min_height=image_size, min_width=image_size,
border_mode=cv2.BORDER_CONSTANT, value=0),
# Geometric transforms
A.HorizontalFlip(p=0.5),
A.ShiftScaleRotate(
shift_limit=0.1,
scale_limit=0.2,
rotate_limit=10,
p=0.5
),
# Color transforms
A.RandomBrightnessContrast(
brightness_limit=0.2,
contrast_limit=0.2,
p=0.5
),
A.HueSaturationValue(
hue_shift_limit=10,
sat_shift_limit=20,
val_shift_limit=20,
p=0.5
),
# Noise and blur
A.OneOf([
A.GaussNoise(var_limit=(10, 50)),
A.GaussianBlur(blur_limit=(1, 3)),
A.MotionBlur(blur_limit=(1, 3)),
], p=0.3),
# Normalization
A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
ToTensorV2(),
], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
def create_validation_transform(self, image_size: int = 640):
"""Create validation transformation pipeline"""
return A.Compose([
A.LongestMaxSize(max_size=image_size),
A.PadIfNeeded(min_height=image_size, min_width=image_size,
border_mode=cv2.BORDER_CONSTANT, value=0),
A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
ToTensorV2(),
], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
# Mosaic Augmentation Implementation
class MosaicAugmentation:
"""Mosaic Data Augmentation"""
def __init__(self, image_size: int = 640):
self.image_size = image_size
def apply_mosaic(self, images: list, annotations_list: list):
"""Apply Mosaic augmentation"""
if len(images) != 4 or len(annotations_list) != 4:
raise ValueError("Mosaic requires exactly 4 images and annotations")
# Create mosaic canvas
mosaic_image = np.zeros((self.image_size, self.image_size, 3), dtype=np.uint8)
mosaic_boxes = []
mosaic_labels = []
# Randomly determine split point
xc = random.randint(self.image_size // 4, 3 * self.image_size // 4)
yc = random.randint(self.image_size // 4, 3 * self.image_size // 4)
# Four position coordinates
positions = [
(0, 0, xc, yc), # Top-left
(xc, 0, self.image_size, yc), # Top-right
(0, yc, xc, self.image_size), # Bottom-left
(xc, yc, self.image_size, self.image_size) # Bottom-right
]
for i, (image, annotations) in enumerate(zip(images, annotations_list)):
x1a, y1a, x2a, y2a = positions[i]
# Resize image to fit region
h, w = image.shape[:2]
scale = min((x2a - x1a) / w, (y2a - y1a) / h)
new_w, new_h = int(w * scale), int(h * scale)
if new_w > 0 and new_h > 0:
resized_image = cv2.resize(image, (new_w, new_h))
# Calculate placement position
x_offset = (x2a - x1a - new_w) // 2
y_offset = (y2a - y1a - new_h) // 2
x1b, y1b = x1a + x_offset, y1a + y_offset
x2b, y2b = x1b + new_w, y1b + new_h
# Place image
mosaic_image[y1b:y2b, x1b:x2b] = resized_image
# Adjust annotation coordinates
for annotation in annotations:
class_id, x_center, y_center, width, height = annotation
# Convert to new coordinate system
x_center = x1b + x_center * new_w
y_center = y1b + y_center * new_h
width *= new_w
height *= new_h
# Normalize to mosaic image
x_center /= self.image_size
y_center /= self.image_size
width /= self.image_size
height /= self.image_size
# Check if bounding box is within image
if 0 < x_center < 1 and 0 < y_center < 1 and width > 0 and height > 0:
mosaic_boxes.append([x_center, y_center, width, height])
mosaic_labels.append(class_id)
return mosaic_image, mosaic_boxes, mosaic_labels
# Usage example
data_aug = DataAugmentation()
mosaic_aug = MosaicAugmentation()
print("Data augmentation tools initialized")
8.5 Data Preprocessing Pipeline
8.5.1 Complete Data Processing Pipeline
class DataProcessingPipeline:
"""Complete Data Processing Pipeline"""
def __init__(self, config: dict):
self.config = config
self.augmentation = DataAugmentation()
self.format_converter = FormatConverter()
self.quality_controller = AnnotationQualityControl()
def process_raw_dataset(self, input_dir: str, output_dir: str):
"""Process raw dataset"""
print("Starting raw dataset processing...")
# 1. Check data quality
print("1. Checking data quality...")
issues = self.quality_controller.check_annotation_consistency(
os.path.join(input_dir, 'annotations')
)
if any(len(issue_list) > 0 for issue_list in issues.values()):
print("Data quality issues found, please check:")
for issue_type, issue_list in issues.items():
if issue_list:
print(f" {issue_type}: {len(issue_list)} issues")
# 2. Format conversion (if needed)
print("2. Checking and converting data format...")
if self.config.get('convert_format'):
self.format_converter.batch_convert(
os.path.join(input_dir, 'annotations'),
os.path.join(output_dir, 'annotations'),
self.config['input_format'],
self.config['output_format']
)
# 3. Dataset splitting
print("3. Splitting dataset...")
splitter = DatasetSplitter()
splitter.split_dataset(
os.path.join(input_dir, 'images'),
os.path.join(input_dir, 'annotations'),
output_dir,
self.config.get('split_ratios', {'train': 0.7, 'val': 0.2, 'test': 0.1})
)
# 4. Generate statistics report
print("4. Generating statistics report...")
self._generate_dataset_statistics(output_dir)
print("Data processing complete!")
def _generate_dataset_statistics(self, dataset_dir: str):
"""Generate dataset statistics report"""
dataset_dir = Path(dataset_dir)
stats = {}
for split in ['train', 'val', 'test']:
split_dir = dataset_dir / split
if not split_dir.exists():
continue
img_dir = split_dir / 'images'
label_dir = split_dir / 'labels'
# Count images
img_count = len(list(img_dir.glob('*.jpg')) + list(img_dir.glob('*.png')))
# Count annotations and class distribution
class_counts = Counter()
obj_count = 0
for label_file in label_dir.glob('*.txt'):
try:
with open(label_file, 'r') as f:
lines = f.readlines()
for line in lines:
parts = line.strip().split()
if len(parts) >= 5:
class_id = int(parts[0])
class_counts[class_id] += 1
obj_count += 1
except Exception as e:
print(f"Error reading {label_file}: {e}")
stats[split] = {
'images': img_count,
'objects': obj_count,
'classes': len(class_counts),
'class_distribution': dict(class_counts)
}
# Save statistics report
report_path = dataset_dir / 'dataset_statistics.json'
with open(report_path, 'w') as f:
json.dump(stats, f, indent=2)
# Print statistics
print(f"\nDataset Statistics:")
print("=" * 40)
total_images = sum(split_stats['images'] for split_stats in stats.values())
total_objects = sum(split_stats['objects'] for split_stats in stats.values())
print(f"Total Images: {total_images:,}")
print(f"Total Objects: {total_objects:,}")
for split, split_stats in stats.items():
print(f"\n{split.upper()} Set:")
print(f" Images: {split_stats['images']:,}")
print(f" Objects: {split_stats['objects']:,}")
print(f" Classes: {split_stats['classes']}")
print(f"\nStatistics report saved to: {report_path}")
# Configuration example
config = {
'convert_format': False,
'input_format': 'voc',
'output_format': 'yolo',
'split_ratios': {'train': 0.7, 'val': 0.2, 'test': 0.1},
'image_size': 640,
'augmentation_prob': 0.5
}
# Usage example
pipeline = DataProcessingPipeline(config)
print("Data processing pipeline initialized")
8.6 Chapter Summary
8.6.1 Data Preparation Best Practices
def data_preparation_best_practices():
"""Data Preparation Best Practices"""
best_practices = {
"Data Quality": [
"Ensure annotation accuracy and consistency",
"Handle bounding box overlap and occlusion",
"Avoid annotation errors and omissions",
"Regularly conduct quality checks and validation"
],
"Data Balance": [
"Maintain relatively balanced class distribution",
"Collect data from various scenes and conditions",
"Include objects of different scales",
"Consider proportion of difficult samples"
],
"Data Augmentation": [
"Select augmentation strategies suitable for the task",
"Avoid over-augmentation causing distortion",
"Maintain annotation accuracy after augmentation",
"Validate effectiveness of augmentation"
],
"Format Management": [
"Use standardized data formats",
"Maintain good directory structure",
"Establish version control mechanism",
"Provide detailed data description documentation"
],
"Processing Workflow": [
"Build automated processing pipeline",
"Implement data validation and testing",
"Keep backups of original data",
"Record all processing steps and parameters"
]
}
print("Data Preparation Best Practices:")
print("=" * 50)
for category, practices in best_practices.items():
print(f"\n{category}:")
for practice in practices:
print(f" • {practice}")
print(f"\nData Preparation Checklist:")
checklist = [
"□ Data format correct and consistent",
"□ Annotation quality check passed",
"□ Class distribution reasonable",
"□ Dataset splitting completed",
"□ Augmentation strategy validated",
"□ Statistics report generated",
"□ Configuration file created",
"□ Backup and version control established"
]
for item in checklist:
print(f" {item}")
# Run best practices guide
data_preparation_best_practices()
print("\nDataset preparation and annotation tool usage complete!")
print("Next step: Start model training")
After completing this chapter, you should be able to:
- Understand characteristics and uses of common object detection datasets
- Proficiently use annotation tools like LabelImg
- Implement effective annotation quality control measures
- Perform format conversions between various data formats
- Apply multiple data augmentation techniques
- Build complete data processing pipelines
- Follow data preparation best practices
Data is the foundation of deep learning success. High-quality dataset preparation is a key factor in successful YOLO model training. Through this chapter, you have mastered the complete workflow from data collection, annotation, processing to augmentation, laying a solid foundation for subsequent model training.
Key Points: Master methods for high-quality dataset preparation, establish complete data processing workflows, and ensure training data quality and diversity.