Pytorch深度学习框架60天进阶学习计划 - 第40天:工业缺陷检测

今天,我们将深入探讨工业领域中的一个关键应用——工业缺陷检测,并且重点关注如何将高效的目标检测模型EfficientDet部署到实际生产环境中的嵌入式设备上。

工业缺陷检测是计算机视觉在工业4.0时代的重要应用,它能够替代传统的人工检测,提高检测精度和效率,降低成本。而将深度学习模型部署到边缘设备上,则是实现实时、高效检测的关键步骤。

今天的学习要点是:部署EfficientDet到嵌入式设备,实践ONNX-TensorRT转换,测试产线实时检测速度

1. 工业缺陷检测简介

1.1 工业缺陷检测的重要性

工业生产中,产品质量控制至关重要。传统的人工检测方式存在以下问题:

  • 检测效率低,无法满足高速生产线需求
  • 人工检测容易疲劳,导致漏检
  • 对于微小缺陷或复杂产品,人眼难以判断
  • 人工成本高且持续上升

深度学习赋能的自动缺陷检测系统能够解决这些问题,具有以下优势:

  • 7×24小时不间断工作
  • 检测精度高且稳定
  • 可识别复杂或细微的缺陷
  • 长期运行成本低
  • 可追溯性好,便于质量管理

1.2 工业缺陷检测的常见类型

缺陷类型 典型行业 检测难点 适用算法
表面缺陷(划痕、凹陷) 汽车、金属加工、玻璃制造 光照变化、反射 CNN分类、语义分割
结构性缺陷(断裂、错位) 电子元件、PCB板 缺陷小、形状多样 目标检测、实例分割
材质缺陷(杂质、气泡) 纺织、塑料制品 背景复杂、类别模糊 异常检测、GAN
组装缺陷(缺件、错装) 机械装配、电器制造 部件遮挡、姿态变化 目标检测、姿态估计
尺寸缺陷(长度、角度误差) 精密制造、机械零件 需要精确测量 关键点检测、实例分割

1.3 工业场景中的部署挑战

在工业环境中部署深度学习模型面临以下挑战:

  1. 实时性要求:生产线速度快,要求模型推理时间短
  2. 资源受限:嵌入式设备计算能力和内存有限
  3. 稳定性要求:需要7×24小时稳定运行,不能崩溃
  4. 环境适应性:工业环境光照变化、振动干扰等因素多
  5. 集成难度:需要与现有生产线和MES系统无缝集成

2. EfficientDet模型简介

2.1 EfficientDet架构

EfficientDet是由Google Brain团队提出的一系列高效目标检测模型,其核心创新点包括:

  1. 基于EfficientNet的主干网络:利用复合缩放策略平衡网络深度、宽度和分辨率
  2. 双向特征金字塔网络(BiFPN):增强了特征融合能力
  3. 复合缩放:统一缩放所有网络组件(主干网络、特征网络、预测网络)

EfficientDet系列包含D0~D7八个变体,参数量从3.9M到51.9M不等,适合不同的计算资源需求。

2.2 EfficientDet模型性能对比

模型 mAP (COCO) 参数量 FLOPs 图像输入尺寸 推理速度(1080Ti)
EfficientDet-D0 33.8% 3.9M 2.5B 512×512 32ms
EfficientDet-D1 39.6% 6.6M 6.1B 640×640 42ms
EfficientDet-D2 43.0% 8.1M 11B 768×768 53ms
EfficientDet-D3 45.8% 12.0M 25B 896×896 75ms
EfficientDet-D4 49.4% 20.7M 55B 1024×1024 130ms
YOLOv4-tiny 40.2% 6.1M 6.9B 416×416 28ms
SSD MobileNet-V2 22.1% 4.3M 0.8B 320×320 21ms

为工业部署考量,我们通常选择EfficientDet-D0或D1,以平衡精度和速度需求。

3. EfficientDet训练与优化

3.1 搭建工业缺陷检测数据集

首先,我们需要准备一个工业缺陷检测数据集。实际工作中,通常需要为特定产品收集数据并标注。以下是数据准备的代码示例:

import os
import cv2
import numpy as np
import albumentations as A
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch

class DefectDataset(Dataset):
    def __init__(self, image_paths, labels, bbox_list, transforms=None):
        self.image_paths = image_paths
        self.labels = labels
        self.bbox_list = bbox_list  # [x_min, y_min, x_max, y_max, class_id]
        self.transforms = transforms
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        bboxes = self.bbox_list[idx]
        label = self.labels[idx]
        
        if self.transforms:
            transformed = self.transforms(image=image, bboxes=bboxes)
            image = transformed["image"]
            bboxes = transformed["bboxes"]
        
        # Convert to tensors and normalize
        image = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
        
        # Prepare target format required by EfficientDet
        target = {
            "boxes": torch.tensor(bboxes)[:, :4],
            "labels": torch.tensor(bboxes)[:, 4].long(),
            "image_id": torch.tensor([idx])
        }
        
        return image, target

# 数据增强策略,适用于工业缺陷检测
def get_train_transforms():
    return A.Compose([
        A.RandomBrightnessContrast(p=0.5),
        A.GaussNoise(p=0.3),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.Rotate(limit=20, p=0.5),
        A.RandomScale(scale_limit=0.15, p=0.5)
    ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

def get_valid_transforms():
    return A.Compose([
    ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

# 假设我们有如下路径和标注数据结构
def load_defect_dataset(dataset_path):
    image_paths = []
    all_bboxes = []
    all_labels = []
    
    # 加载标注文件
    annotation_file = os.path.join(dataset_path, 'annotations.txt')
    with open(annotation_file, 'r') as f:
        lines = f.readlines()
        
    for line in lines:
        parts = line.strip().split()
        image_path = os.path.join(dataset_path, 'images', parts[0])
        image_paths.append(image_path)
        
        num_boxes = int(parts[1])
        boxes = []
        for i in range(num_boxes):
            # 每个框的格式:x_min y_min x_max y_max class_id
            box = list(map(float, parts[2+i*5:7+i*5]))
            if len(box) == 5:  # 确保有5个元素
                boxes.append(box)
        
        all_bboxes.append(boxes)
        all_labels.append(1)  # 假设只有一类缺陷
    
    # 划分训练集和验证集
    train_images, valid_images, train_bboxes, valid_bboxes, train_labels, valid_labels = train_test_split(
        image_paths, all_bboxes, all_labels, test_size=0.2, random_state=42
    )
    
    return (train_images, train_bboxes, train_labels), (valid_images, valid_bboxes, valid_labels)

3.2 训练EfficientDet模型

接下来,我们使用PyTorch训练EfficientDet模型来检测工业缺陷:

import torch
import torchvision

from effdet import get_efficientdet_config, EfficientDet, DetBenchTrain, DetBenchPredict
from effdet.efficientdet import HeadNet

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def train_efficientdet(train_dataset, valid_dataset, num_classes=1, num_epochs=20, batch_size=8):
    # 创建数据加载器
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, 
        num_workers=4, collate_fn=collate_fn
    )
    
    valid_loader = DataLoader(
        valid_dataset, batch_size=batch_size, shuffle=False, 
        num_workers=4, collate_fn=collate_fn
    )
    
    # 加载EfficientDet-D0配置
    config = get_efficientdet_config('efficientdet_d0')
    config.num_classes = num_classes
    config.image_size = (512, 512)  # 调整为适合您数据的大小
    
    # 创建模型
    model = EfficientDet(config, pretrained_backbone=True)
    
    # 更新和替换头部网络
    model.class_net = HeadNet(
        config, num_outputs=config.num_classes
    )
    
    # 创建训练基准
    model = DetBenchTrain(model, config)
    model.to(device)
    
    # 优化器和学习率调度器
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=2
    )
    
    # 训练循环
    best_val_loss = float('inf')
    for epoch in range(num_epochs):
        # 训练阶段
        model.train()
        train_loss = 0
        for images, targets in train_loader:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            # 清零梯度
            optimizer.zero_grad()
            
            # 模型前向传播
            loss = model(images, targets)
            
            # 反向传播和优化
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # 验证阶段
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for images, targets in valid_loader:
                images = list(image.to(device) for image in images)
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
                
                loss = model(images, targets)
                val_loss += loss.item()
        
        # 打印每个epoch的损失
        train_loss /= len(train_loader)
        val_loss /= len(valid_loader)
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
        
        # 学习率调度
        scheduler.step(val_loss)
        
        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_efficientdet_defect.pth')
            
    # 将模型转换为推理模式
    config = get_efficientdet_config('efficientdet_d0')
    config.num_classes = num_classes
    config.image_size = (512, 512)
    
    model = EfficientDet(config, pretrained_backbone=False)
    model.class_net = HeadNet(
        config, num_outputs=config.num_classes
    )
    
    # 加载训练好的权重
    checkpoint = torch.load('best_efficientdet_defect.pth', map_location=device)
    model.load_state_dict(checkpoint)
    
    # 转为推理模式
    inference_model = DetBenchPredict(model)
    inference_model.eval()
    
    # 保存推理模型
    torch.save(inference_model.state_dict(), 'efficientdet_defect_inference.pth')
    
    return inference_model

# 定义数据集收集函数
def collate_fn(batch):
    images, targets = tuple(zip(*batch))
    return images, targets

# 执行训练
# 假设我们已经加载了数据集并创建了Dataset对象
(train_images, train_bboxes, train_labels), (valid_images, valid_bboxes, valid_labels) = load_defect_dataset('path/to/dataset')

train_dataset = DefectDataset(
    train_images, train_labels, train_bboxes, 
    transforms=get_train_transforms()
)

valid_dataset = DefectDataset(
    valid_images, valid_labels, valid_bboxes, 
    transforms=get_valid_transforms()
)

trained_model = train_efficientdet(train_dataset, valid_dataset, num_classes=2)  # 假设有背景和缺陷两类

3.3 模型优化和微调

在工业场景中,我们通常需要对模型进行进一步优化以适应特定的部署环境:

import torch
from effdet import get_efficientdet_config, EfficientDet, DetBenchPredict, DetBenchTrain
from effdet.efficientdet import HeadNet
import timm
from torch.quantization import quantize_dynamic

def optimize_model_for_deployment():
    # 加载训练好的模型
    config = get_efficientdet_config('efficientdet_d0')
    config.num_classes = 2  # 背景+缺陷
    config.image_size = (512, 512)
    
    model = EfficientDet(config, pretrained_backbone=False)
    model.class_net = HeadNet(
        config, num_outputs=config.num_classes
    )
    
    device = torch.device('cpu')  # 用于量化的设备
    checkpoint = torch.load('best_efficientdet_defect.pth', map_location=device)
    model.load_state_dict(checkpoint)
    
    # 转换为推理模式
    model = DetBenchPredict(model)
    model.eval()
    
    # 1. 剪枝 - 移除低重要性的过滤器(示例中只展示流程)
    # 实际中需要使用专门的剪枝工具如torch.nn.utils.prune
    
    # 2. 知识蒸馏 - 使用大模型指导小模型训练(示例流程)
    # 需要完整的训练循环,此处省略
    
    # 3. 动态量化 - 对权重进行量化
    # 注意:对于检测模型,通常只量化部分层以保证精度
    # 此处为示例,完整实现需要更复杂设置
    try:
        quantized_model = quantize_dynamic(
            model, {torch.nn.Linear, torch.nn.Conv2d}, dtype=torch.qint8
        )
        print("模型量化成功")
        torch.save(quantized_model.state_dict(), 'quantized_efficientdet.pth')
    except Exception as e:
        print(f"量化失败,错误信息: {e}")
        print("使用原始模型继续")
        quantized_model = model
    
    return quantized_model

# 调用优化函数
optimized_model = optimize_model_for_deployment()

4. 将模型转换为ONNX格式

ONNX(Open Neural Network Exchange)是一种用于表示深度学习模型的开放格式,它能够在不同框架之间转换模型。将PyTorch模型转换为ONNX是部署到多种平台的重要步骤。

4.1 PyTorch模型转ONNX

import torch
import numpy as np
from effdet import get_efficientdet_config, EfficientDet, DetBenchPredict
from effdet.efficientdet import HeadNet

def convert_to_onnx(model_path, onnx_path, input_size=(512, 512)):
    """
    将EfficientDet模型转换为ONNX格式
    """
    # 加载配置和模型
    config = get_efficientdet_config('efficientdet_d0')
    config.num_classes = 2  # 背景 + 缺陷类别
    config.image_size = input_size
    
    # 创建模型架构
    model = EfficientDet(config, pretrained_backbone=False)
    model.class_net = HeadNet(
        config, num_outputs=config.num_classes
    )
    
    # 创建推理模型
    inference_model = DetBenchPredict(model)
    inference_model.eval()
    
    # 加载训练好的权重
    device = torch.device('cpu')
    checkpoint = torch.load(model_path, map_location=device)
    inference_model.load_state_dict(checkpoint)
    
    # 创建随机输入进行测试
    dummy_input = torch.randn(1, 3, input_size[0], input_size[1], device=device)
    
    # 设置导出选项
    input_names = ["input"]
    output_names = ["scores", "boxes"]
    dynamic_axes = {
        'input': {0: 'batch_size'},
        'scores': {0: 'batch_size'},
        'boxes': {0: 'batch_size'}
    }
    
    # 导出为ONNX
    try:
        torch.onnx.export(
            inference_model,               # 模型
            dummy_input,                   # 模型输入
            onnx_path,                     # 输出文件路径
            export_params=True,            # 保存训练好的参数
            opset_version=11,              # ONNX版本
            do_constant_folding=True,      # 常量折叠优化
            input_names=input_names,       # 输入名称
            output_names=output_names,     # 输出名称
            dynamic_axes=dynamic_axes      # 动态尺寸
        )
        print(f"模型已成功导出为ONNX格式: {onnx_path}")
        return True
    except Exception as e:
        print(f"ONNX导出失败: {e}")
        return False

# 执行转换
convert_to_onnx(
    model_path='efficientdet_defect_inference.pth',
    onnx_path='efficientdet_defect.onnx',
    input_size=(512, 512)
)

4.2 验证和优化ONNX模型

导出ONNX模型后,我们需要验证其正确性并进行进一步优化:

import onnx
import onnxruntime as ort
import numpy as np
import cv2
from PIL import Image
import time

def verify_onnx_model(onnx_path, test_image_path, input_size=(512, 512)):
    """
    验证ONNX模型的正确性并测试推理速度
    """
    # 加载ONNX模型
    onnx_model = onnx.load(onnx_path)
    
    # 检查模型是否格式正确
    onnx.checker.check_model(onnx_model)
    print("ONNX模型格式验证通过")
    
    # 创建推理会话
    session = ort.InferenceSession(onnx_path)
    
    # 准备输入数据
    img = Image.open(test_image_path).convert('RGB')
    img = img.resize(input_size)
    img_array = np.array(img) / 255.0  # 归一化
    img_tensor = np.transpose(img_array, (2, 0, 1)).astype(np.float32)  # CHW格式
    img_tensor = np.expand_dims(img_tensor, axis=0)  # 增加批次维度
    
    # 获取输入和输出名称
    input_name = session.get_inputs()[0].name
    output_names = [output.name for output in session.get_outputs()]
    
    # 测量推理时间
    start_time = time.time()
    outputs = session.run(output_names, {input_name: img_tensor})
    inference_time = time.time() - start_time
    
    print(f"ONNX模型推理时间: {inference_time * 1000:.2f} ms")
    
    # 解析结果
    scores, boxes = outputs
    # 过滤置信度高的检测结果
    detections = []
    for i in range(len(scores[0])):
        if scores[0][i] > 0.5:  # 置信度阈值
            detections.append({
                'score': float(scores[0][i]),
                'box': boxes[0][i].tolist()
            })
    
    print(f"检测到 {len(detections)} 个缺陷")
    for i, det in enumerate(detections):
        print(f"缺陷 {i+1}: 置信度 {det['score']:.4f}, 位置 {det['box']}")
    
    return True

def optimize_onnx_model(input_onnx_path, output_onnx_path):
    """
    使用ONNX Runtime优化ONNX模型
    """
    # 加载原始模型
    model = onnx.load(input_onnx_path)
    
    try:
        # 运行优化器
        from onnxruntime.transformers import optimizer
        optimized_model = optimizer.optimize_model(
            input_onnx_path,
            model_type='detection',
            num_heads=4,
            hidden_size=64
        )
        
        # 保存优化后的模型
        optimized_model.save_model_to_file(output_onnx_path)
        print(f"优化后的ONNX模型已保存到: {output_onnx_path}")
        return True
    except Exception as e:
        print(f"ONNX模型优化失败: {e}")
        print("将使用原始ONNX模型继续")
        onnx.save(model, output_onnx_path)
        return False

# 执行验证和优化
verify_onnx_model(
    onnx_path='efficientdet_defect.onnx',
    test_image_path='path/to/test_image.jpg'
)

optimize_onnx_model(
    input_onnx_path='efficientdet_defect.onnx',
    output_onnx_path='efficientdet_defect_optimized.onnx'
)

5. ONNX转换为TensorRT引擎

TensorRT是NVIDIA提供的高性能深度学习推理优化库,可以显著提高模型在NVIDIA设备上的推理速度。

5.1 ONNX转TensorRT

import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
import time
import os

def build_tensorrt_engine(onnx_path, engine_path, precision='fp16'):
    """
    将ONNX模型转换为TensorRT引擎
    """
    # 初始化TensorRT
    TRT_LOGGER = trt.Logger(trt.Logger.INFO)
    builder = trt.Builder(TRT_LOGGER)
    
    # 创建网络定义
    EXPLICIT_BATCH = 1 << (int)(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
    network = builder.create_network(EXPLICIT_BATCH)
    
    # 创建ONNX解析器
    parser = trt.OnnxParser(network, TRT_LOGGER)
    
    # 解析ONNX模型
    with open(onnx_path, 'rb') as model:
        if not parser.parse(model.read()):
            print("ERROR: Failed to parse the ONNX file.")
            for error in range(parser.num_errors):
                print(parser.get_error(error))
            return False
    
    # 配置构建器
    config = builder.create_builder_config()
    config.max_workspace_size = 1 << 30  # 1GB工作空间
    
    # 设置精度
    if precision == 'fp16' and builder.platform_has_fast_fp16:
        config.set_flag(trt.BuilderFlag.FP16)
        print("Using FP16 precision")
    elif precision == 'int8' and builder.platform_has_fast_int8:
        config.set_flag(trt.BuilderFlag.INT8)
        print("Using INT8 precision")
    else:
        print("Using FP32 precision")
    
    # 构建引擎
    engine = builder.build_engine(network, config)
    
    # 保存引擎
    with open(engine_path, 'wb') as f:
        f.write(engine.serialize())
    
    print(f"TensorRT引擎已保存到: {engine_path}")
    return True

# 执行转换
build_tensorrt_engine(
    onnx_path='efficientdet_defect_optimized.onnx',
    engine_path='efficientdet_defect.trt',
    precision='fp16'  # 使用半精度以加速推理
)

5.2 TensorRT引擎推理

class TensorRTInference:
    def __init__(self, engine_path):
        # 加载TensorRT引擎
        self.logger = trt.Logger(trt.Logger.INFO)
        with open(engine_path, 'rb') as f, trt.Runtime(self.logger) as runtime:
            self.engine = runtime.deserialize_cuda_engine(f.read())
        
        # 创建执行上下文
        self.context = self.engine.create_execution_context()
        
        # 分配内存
        self.inputs = []
        self.outputs = []
        self.bindings = []
        
        for binding in range(self.engine.num_bindings):
            size = trt.volume(self.engine.get_binding_shape(binding)) * self.engine.max_batch_size
            dtype = trt.nptype(self.engine.get_binding_dtype(binding))
            
            # 分配主机和设备内存
            host_mem = cuda.pagelocked_empty(size, dtype)
            device_mem = cuda.mem_alloc(host_mem.nbytes)
            
            # 添加到绑定列表
            self.bindings.append(int(device_mem))
            
            if self.engine.binding_is_input(binding):
                self.inputs.append({'host': host_mem, 'device': device_mem, 'binding': binding})
            else:
                self.outputs.append({'host': host_mem, 'device': device_mem, 'binding': binding})
    

    def infer(self, img_array):
        # 准备输入数据
        # 假设img_array已经预处理为正确的形状和类型(NCHW, FP32)
        np.copyto(self.inputs[0]['host'], img_array.ravel())
        
        # 将输入数据从主机内存复制到设备内存
        for inp in self.inputs:
            cuda.memcpy_htod(inp['device'], inp['host'])
        
        # 执行推理
        self.context.execute_v2(bindings=self.bindings)
        
        # 将输出数据从设备内存复制到主机内存
        for out in self.outputs:
            cuda.memcpy_dtoh(out['host'], out['device'])
        
        # 处理输出,通常第一个输出是类别分数,第二个输出是边界框
        scores = self.outputs[0]['host']
        boxes = self.outputs[1]['host']
        
        # 将扁平数组重塑为正确的形状
        # 注意:根据实际模型输出形状进行调整
        # 假设scores形状为[batch_size, num_detections]
        # 假设boxes形状为[batch_size, num_detections, 4]
        scores = scores.reshape(1, -1)  # 调整为正确的形状
        boxes = boxes.reshape(1, -1, 4)  # 调整为正确的形状
        
        return scores, boxes
    
    def process_results(self, scores, boxes, confidence_threshold=0.5):
        """处理推理结果,过滤低置信度的检测并返回高置信度的检测结果"""
        detections = []
        
        # 只处理第一个批次(如果有多个)
        batch_scores = scores[0]
        batch_boxes = boxes[0]
        
        for i in range(len(batch_scores)):
            if batch_scores[i] > confidence_threshold:
                detections.append({
                    'confidence': float(batch_scores[i]),
                    'bbox': batch_boxes[i].tolist()  # [x1, y1, x2, y2]
                })
        
        return detections
    
    def __del__(self):
        """析构函数,释放资源"""
        # 释放设备内存
        for inp in self.inputs:
            inp['device'].free()
        
        for out in self.outputs:
            out['device'].free()

# 使用TensorRT引擎进行推理测试
def test_tensorrt_inference(engine_path, test_image_path, input_size=(512, 512)):
    """测试TensorRT引擎的推理性能"""
    # 加载测试图像
    img = cv2.imread(test_image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, input_size)
    
    # 预处理图像
    img_array = img.astype(np.float32) / 255.0  # 归一化
    img_tensor = np.transpose(img_array, (2, 0, 1))  # HWC -> CHW
    img_tensor = np.expand_dims(img_tensor, axis=0)  # 添加批次维度 NCHW
    
    # 创建TensorRT推理实例
    trt_inference = TensorRTInference(engine_path)
    
    # 测量推理时间
    num_iterations = 100
    total_time = 0
    
    # 预热
    for _ in range(10):
        trt_inference.infer(img_tensor)
    
    # 正式测试
    for _ in range(num_iterations):
        start_time = time.time()
        scores, boxes = trt_inference.infer(img_tensor)
        inference_time = time.time() - start_time
        total_time += inference_time
    
    avg_time = total_time / num_iterations
    fps = 1.0 / avg_time
    
    print(f"TensorRT平均推理时间: {avg_time * 1000:.2f} ms")
    print(f"TensorRT FPS: {fps:.1f}")
    
    # 处理并显示结果
    detections = trt_inference.process_results(scores, boxes, confidence_threshold=0.5)
    print(f"检测到 {len(detections)} 个缺陷")
    
    # 在图像上绘制检测结果
    img_display = img.copy()
    for det in detections:
        box = det['bbox']
        x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3])
        conf = det['confidence']
        cv2.rectangle(img_display, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(img_display, f"{conf:.2f}", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    
    # 保存结果图像
    cv2.imwrite('detection_result.jpg', cv2.cvtColor(img_display, cv2.COLOR_RGB2BGR))
    print("结果图像已保存为: detection_result.jpg")
    
    return detections, avg_time

# 执行测试
detections, avg_inference_time = test_tensorrt_inference(
    engine_path='efficientdet_defect.trt',
    test_image_path='path/to/test_image.jpg'
)

6. 部署到嵌入式设备

现在我们已经将模型转换为TensorRT格式,接下来将其部署到嵌入式设备上(如NVIDIA Jetson系列)。

6.1 嵌入式部署准备

在部署到嵌入式设备之前,我们需要准备环境和代码:

import os
import sys
import time
import threading
import cv2
import numpy as np
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit

# 嵌入式设备上的实时缺陷检测系统
class DefectDetectionSystem:
    def __init__(self, trt_engine_path, camera_id=0, input_size=(512, 512), confidence_threshold=0.5):
        self.input_size = input_size
        self.confidence_threshold = confidence_threshold
        self.camera_id = camera_id
        
        # 初始化TensorRT
        self.trt_inference = TensorRTInference(trt_engine_path)
        
        # 初始化相机
        self.initialize_camera()
        
        # 线程控制
        self.running = False
        self.detection_thread = None
        
        # 性能统计
        self.fps_counter = FPSCounter()
        self.detections = []
    
    def initialize_camera(self):
        """初始化相机"""
        self.cap = cv2.VideoCapture(self.camera_id)
        if not self.cap.isOpened():
            raise Exception(f"无法打开相机 {self.camera_id}")
        
        # 设置相机分辨率
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
        print(f"相机初始化成功,ID: {self.camera_id}")
    
    def preprocess_frame(self, frame):
        """预处理相机帧"""
        # 裁剪和缩放
        resized = cv2.resize(frame, self.input_size)
        
        # 转换颜色空间
        rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
        
        # 归一化
        normalized = rgb.astype(np.float32) / 255.0
        
        # 转换为NCHW格式
        nchw = np.transpose(normalized, (2, 0, 1))
        nchw = np.expand_dims(nchw, axis=0)
        
        return nchw, resized
    
    def draw_detections(self, frame, detections):
        """在帧上绘制检测结果"""
        for det in detections:
            box = det['bbox']
            x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3])
            conf = det['confidence']
            
            # 绘制边界框
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # 绘制置信度
            label = f"{conf:.2f}"
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(frame, (x1, y1-20), (x1+w, y1), (0, 255, 0), -1)
            cv2.putText(frame, label, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
        
        # 绘制FPS
        cv2.putText(frame, f"FPS: {self.fps_counter.get_fps():.1f}", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        # 绘制时间戳
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        cv2.putText(frame, timestamp, (10, frame.shape[0]-10), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
        
        return frame
    
    def detection_loop(self):
        """检测主循环"""
        while self.running:
            # 捕获帧
            ret, frame = self.cap.read()
            if not ret:
                print("无法读取相机帧")
                time.sleep(0.1)
                continue
            
            # 更新FPS计数器
            self.fps_counter.update()
            
            # 预处理帧
            input_tensor, resized_frame = self.preprocess_frame(frame)
            
            # 执行推理
            scores, boxes = self.trt_inference.infer(input_tensor)
            
            # 处理结果
            self.detections = self.trt_inference.process_results(scores, boxes, self.confidence_threshold)
            
            # 绘制结果
            result_frame = self.draw_detections(resized_frame, self.detections)
            
            # 调整大小以显示
            display_frame = cv2.resize(result_frame, (640, 480))
            
            # 显示结果
            cv2.imshow("Defect Detection", display_frame)
            
            # 检查是否按下'q'键退出
            key = cv2.waitKey(1)
            if key == ord('q'):
                self.running = False
    
    def start(self):
        """启动检测系统"""
        if self.detection_thread is not None and self.detection_thread.is_alive():
            print("检测系统已在运行")
            return
        
        self.running = True
        self.detection_thread = threading.Thread(target=self.detection_loop)
        self.detection_thread.start()
        print("缺陷检测系统已启动")
    
    def stop(self):
        """停止检测系统"""
        self.running = False
        if self.detection_thread is not None:
            self.detection_thread.join()
        
        # 释放资源
        self.cap.release()
        cv2.destroyAllWindows()
        print("缺陷检测系统已停止")

class FPSCounter:
    """FPS计数器,用于测量帧率"""
    def __init__(self, avg_frames=30):
        self.avg_frames = avg_frames
        self.frame_times = []
        self.last_time = time.time()
    
    def update(self):
        """更新帧计数"""
        current_time = time.time()
        self.frame_times.append(current_time - self.last_time)
        self.last_time = current_time
        
        # 保持固定长度的历史记录
        if len(self.frame_times) > self.avg_frames:
            self.frame_times.pop(0)
    
    def get_fps(self):
        """获取当前FPS"""
        if not self.frame_times:
            return 0
        
        # 计算平均帧时间并转换为FPS
        avg_frame_time = sum(self.frame_times) / len(self.frame_times)
        return 1.0 / avg_frame_time if avg_frame_time > 0 else 0

# 主函数
def main():
    # 参数解析
    import argparse
    parser = argparse.ArgumentParser(description='工业缺陷检测系统')
    parser.add_argument('--engine', type=str, required=True, help='TensorRT引擎路径')
    parser.add_argument('--camera', type=int, default=0, help='相机ID')
    parser.add_argument('--threshold', type=float, default=0.5, help='检测置信度阈值')
    args = parser.parse_args()
    
    try:
        # 创建并启动检测系统
        detection_system = DefectDetectionSystem(
            trt_engine_path=args.engine,
            camera_id=args.camera,
            confidence_threshold=args.threshold
        )
        
        detection_system.start()
        
        # 等待用户输入退出
        input("按Enter键停止检测系统...")
        
    except KeyboardInterrupt:
        print("检测到Ctrl+C,正在退出...")
    finally:
        # 确保资源被正确释放
        if 'detection_system' in locals():
            detection_system.stop()

if __name__ == "__main__":
    main()

6.2 嵌入式设备安装脚本

以下是在NVIDIA Jetson设备上安装必要依赖的脚本:

#!/bin/bash
# 这个脚本用于在Jetson设备上安装工业缺陷检测系统所需的依赖

echo "开始安装工业缺陷检测系统依赖..."

# 更新系统包
sudo apt-get update
sudo apt-get upgrade -y

# 安装基本依赖
sudo apt-get install -y \
    python3-pip \
    python3-dev \
    libopencv-dev \
    cmake \
    git

# 安装Python包
pip3 install --upgrade pip
pip3 install \
    numpy \
    pillow \
    pycuda \
    matplotlib \
    onnx \
    onnxruntime

# 创建项目目录
mkdir -p ~/defect_detection/{models,data,logs}

echo "依赖安装完成!"
echo "请手动确认TensorRT已正确安装(通常在JetPack中预装)"
echo "请将TensorRT引擎文件放置在 ~/defect_detection/models/ 目录下"

6.3 产线部署脚本

以下是将系统部署到实际产线的脚本:

#!/bin/bash
# 这个脚本用于在产线启动缺陷检测系统

# 设置变量
MODEL_DIR=~/defect_detection/models
LOG_DIR=~/defect_detection/logs
CAMERA_ID=0
CONFIDENCE=0.6

# 确保日志目录存在
mkdir -p $LOG_DIR

# 获取当前时间戳
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
LOG_FILE=$LOG_DIR/defect_detection_$TIMESTAMP.log

# 检查模型文件是否存在
if [ ! -f "$MODEL_DIR/efficientdet_defect.trt" ]; then
    echo "错误:找不到TensorRT模型文件!" | tee -a $LOG_FILE
    exit 1
fi

# 启动检测系统
echo "启动工业缺陷检测系统..." | tee -a $LOG_FILE
echo "时间:$(date)" | tee -a $LOG_FILE
echo "相机ID:$CAMERA_ID" | tee -a $LOG_FILE
echo "置信度阈值:$CONFIDENCE" | tee -a $LOG_FILE

# 运行Python脚本
python3 defect_detection_system.py \
    --engine $MODEL_DIR/efficientdet_defect.trt \
    --camera $CAMERA_ID \
    --threshold $CONFIDENCE \
    2>&1 | tee -a $LOG_FILE

# 检查退出状态
if [ $? -ne 0 ]; then
    echo "错误:检测系统异常退出!" | tee -a $LOG_FILE
    exit 1
fi

echo "检测系统正常退出" | tee -a $LOG_FILE

7. 产线实时性能测试与优化

在部署到产线后,我们需要进行详细的性能测试和优化。

7.1 性能测试工具

import time
import numpy as np
import matplotlib.pyplot as plt
import csv
import os
from datetime import datetime

class PerformanceTester:
    """产线性能测试工具"""
    def __init__(self, defect_system, test_duration=600, log_dir='./logs'):
        self.defect_system = defect_system
        self.test_duration = test_duration  # 测试时长(秒)
        self.log_dir = log_dir
        
        # 确保日志目录存在
        os.makedirs(log_dir, exist_ok=True)
        
        # 性能指标
        self.inference_times = []
        self.fps_values = []
        self.detection_counts = []
        self.memory_usage = []
        self.gpu_usage = []
        self.timestamps = []
    
    def monitor_performance(self):
        """性能监控主函数"""
        start_time = time.time()
        
        # 启动检测系统
        self.defect_system.start()
        
        # 监控循环
        try:
            while time.time() - start_time < self.test_duration:
                # 记录当前时间戳
                current_time = time.time()
                self.timestamps.append(current_time - start_time)
                
                # 记录推理时间
                if hasattr(self.defect_system.trt_inference, 'last_inference_time'):
                    self.inference_times.append(
                        self.defect_system.trt_inference.last_inference_time * 1000  # 转换为毫秒
                    )
                
                # 记录FPS
                self.fps_values.append(self.defect_system.fps_counter.get_fps())
                
                # 记录检测数量
                self.detection_counts.append(len(self.defect_system.detections))
                
                # 获取内存和GPU使用情况(需要额外工具,此处简化)
                # 在实际应用中,可以使用pynvml库获取GPU使用情况
                self.memory_usage.append(0)  # 占位符
                self.gpu_usage.append(0)  # 占位符
                
                # 每秒采样一次
                time.sleep(1)
                
        except KeyboardInterrupt:
            print("测试被用户中断")
        finally:
            # 停止检测系统
            self.defect_system.stop()
            
            # 保存性能日志
            self.save_results()
    
    def save_results(self):
        """保存测试结果"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # 保存CSV数据
        csv_path = os.path.join(self.log_dir, f'performance_test_{timestamp}.csv')
        with open(csv_path, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['Time(s)', 'Inference_Time(ms)', 'FPS', 'Detection_Count', 'Memory_Usage', 'GPU_Usage'])
            
            for i in range(len(self.timestamps)):
                writer.writerow([
                    self.timestamps[i],
                    self.inference_times[i] if i < len(self.inference_times) else '',
                    self.fps_values[i],
                    self.detection_counts[i],
                    self.memory_usage[i],
                    self.gpu_usage[i]
                ])
        
        print(f"性能数据已保存到: {csv_path}")
        
        # 生成性能报告图
        self.generate_performance_charts(timestamp)
        
        # 计算性能统计
        self.print_performance_statistics()
    
    def generate_performance_charts(self, timestamp):
        """生成性能图表"""
        plt.figure(figsize=(15, 10))
        
        # 推理时间图
        plt.subplot(2, 2, 1)
        plt.plot(self.timestamps, self.inference_times)
        plt.title('Inference Time')
        plt.xlabel('Time (s)')
        plt.ylabel('Inference Time (ms)')
        plt.grid(True)
        
        # FPS图
        plt.subplot(2, 2, 2)
        plt.plot(self.timestamps, self.fps_values)
        plt.title('FPS')
        plt.xlabel('Time (s)')
        plt.ylabel('Frames Per Second')
        plt.grid(True)
        
        # 检测数量图
        plt.subplot(2, 2, 3)
        plt.plot(self.timestamps, self.detection_counts)
        plt.title('Detection Count')
        plt.xlabel('Time (s)')
        plt.ylabel('Number of Detections')
        plt.grid(True)
        
        # 保存图表
        chart_path = os.path.join(self.log_dir, f'performance_chart_{timestamp}.png')
        plt.tight_layout()
        plt.savefig(chart_path)
        plt.close()
        
        print(f"性能图表已保存到: {chart_path}")
    
    def print_performance_statistics(self):
        """打印性能统计信息"""
        if not self.inference_times:
            print("没有收集到推理时间数据")
            return
        
        avg_inference_time = np.mean(self.inference_times)
        max_inference_time = np.max(self.inference_times)
        min_inference_time = np.min(self.inference_times)
        std_inference_time = np.std(self.inference_times)
        
        avg_fps = np.mean(self.fps_values)
        min_fps = np.min(self.fps_values)
        
        print("\n性能统计:")
        print(f"测试持续时间: {len(self.timestamps)} 秒")
        print(f"平均推理时间: {avg_inference_time:.2f} ms")
        print(f"最大推理时间: {max_inference_time:.2f} ms")
        print(f"最小推理时间: {min_inference_time:.2f} ms")
        print(f"推理时间标准差: {std_inference_time:.2f} ms")
        print(f"平均FPS: {avg_fps:.2f}")
        print(f"最小FPS: {min_fps:.2f}")
        
        # 计算实时性评分
        if avg_fps >= 30:
            realtime_grade = "优秀"
        elif avg_fps >= 20:
            realtime_grade = "良好"
        elif avg_fps >= 10:
            realtime_grade = "一般"
        else:
            realtime_grade = "不足"
        
        print(f"实时性评级: {realtime_grade}")

# 使用示例
def run_performance_test():
    # 需要事先创建好缺陷检测系统实例
    defect_system = DefectDetectionSystem(
        trt_engine_path='efficientdet_defect.trt',
        camera_id=0,
        confidence_threshold=0.5
    )
    
    # 创建测试器
    tester = PerformanceTester(
        defect_system=defect_system,
        test_duration=300,  # 5分钟测试
        log_dir='./performance_logs'
    )
    
    # 运行测试
    print("开始性能测试 (5分钟)...")
    tester.monitor_performance()
    print("测试完成!")

if __name__ == "__main__":
    run_performance_test()

7.2 优化策略表

优化方向 具体策略 预期提升 适用条件
模型优化 减小输入分辨率 2-4倍速度提升 缺陷较大或特征明显时
使用更小的模型变体(D0->D0-lite) 30-50%速度提升 检测任务简单
剪枝不重要的卷积通道 20-40%速度提升 性能是首要考虑因素
知识蒸馏到更小的网络 2-3倍速度提升 有足够训练数据和时间
TensorRT优化 使用FP16量化 40-50%速度提升 设备支持FP16
使用INT8量化 2-4倍速度提升 可接受轻微精度损失
优化网络图和层融合 10-30%速度提升 TensorRT推理
动态批处理大小 批处理场景下提升 批量推理
系统优化 优化图像预处理 5-15%总时间减少 CPU瓶颈场景
使用CUDA加速预处理 30-50%预处理提速 预处理是瓶颈
多线程并行处理 20-40%吞吐量提升 多相机或复杂后处理
使用内存映射减少数据拷贝 5-10%延迟减少 数据传输是瓶颈
算法优化 区域感兴趣(ROI)检测 根据场景2-10倍提速 有固定检测区域
跳帧检测 接近线性吞吐量提升 不需要检测每一帧
自适应推理频率 动态优化资源使用 有峰值负载的场景

7.3 模型部署方案比较

部署方案 延迟 吞吐量 开发难度 适用场景
PyTorch直接推理 原型验证、开发调试
ONNX Runtime 跨平台部署、开发过渡
TensorRT (FP32) 中低 中高 需要精确推理的场景
TensorRT (FP16) 大多数通用场景
TensorRT (INT8) 非常低 非常高 高吞吐量、可容忍精度损失
TensorRT + DeepStream 非常低 极高 非常高 多路视频流、超高性能需求
TVM 多种加速器、特殊硬件

清华大学全五版的《DeepSeek教程》完整的文档需要的朋友,关注我私信:deepseek 即可获得。

怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

Logo

脑启社区是一个专注类脑智能领域的开发者社区。欢迎加入社区,共建类脑智能生态。社区为开发者提供了丰富的开源类脑工具软件、类脑算法模型及数据集、类脑知识库、类脑技术培训课程以及类脑应用案例等资源。

更多推荐