main.ipynb @master — view markup · raw · history · blame
基于深度学习的目标检测 - 气球¶
1.实验介绍¶
1.1 实验背景¶
Object detection(目标检测)是计算机视觉中非常重要,也是非常具有挑战性的一个任务。本节作业我们将学习如何使用深度学习模型检测出图片中的气球。
1.2 实验要求¶
a) 建立深度神经网络模型,并尽可能将其调到最佳状态 b) 用准确率等指标对模型进行评估
1.3 实验环境¶
可以使用基于 Python 的 OpenCV 库进行图像相关处理,使用 Numpy 库进行相关数值运算,使用pytorch 等框架建立深度学习模型等。
1.4 注意事项¶
- Python 与 Python Package 的使用方式,可在右侧 API文档 中查阅。
- 当右上角的『Python 3』长时间指示为运行中的时候,造成代码无法执行时,可以重新启动 Kernel 解决(左上角『Kernel』-『Restart Kernel』)。
1.5 参考资料¶
OpenCV:https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_tutorials.html
Numpy:https://www.numpy.org/
Pytorch:https://pytorch.org/
2.实验内容¶
2.1 介绍数据集¶
该数据集包含了 380 个气球图片。数据格式是 VOC格式。
导入数据集成功后路径, data_path = "./dataset"
# 导入相关包
import glob, os
# 数据集路径
data_path = "./dataset/train"
# 获取数据名称列表
img_list = glob.glob(os.path.join(data_path, '*/*.jpg'))
# 打印数据集总量
print("数据集总数量:", len(img_list))
print("数据路径和名称:",img_list[0])
训练数据集总共有 300 张图片,现在随机展示其中的 6 张图片。
import random,cv2
import numpy as np
import matplotlib.pyplot as plt
# 从数据名称列表 img_list 中随机选取 6 个。
for i, img_path in enumerate(random.sample(img_list, 6)):
# 读取图片
img = cv2.imread(img_path)
# 将图片从 BGR 模式转为 RGB 模式
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# 将窗口设置为 2 行 3 列 6个子图
plt.subplot(2, 3, i + 1)
# 展示图片
plt.imshow(img)
# 不显示坐标尺寸
plt.axis('off')
- 获取图像形状 img.shape 可以获得图像的形状,返回值是一个包含行数,列数,通道数的元组
# 随机选取一张图片
path = random.sample(img_list, 1)
# 读取图片
img = cv2.imread(path[0])
# 将图片从 BGR 模式转为 RGB 模式
img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
# 获取图片的形状
print(img.shape)
2.2 数据处理¶
根据上面的介绍和我们数据集的特性,我们主要运用cv2.imread() 读取到图像信息和 xml.etree.ElementTree.ET.parse() 方法读取annotation的信息, 最后将他们封装为一个对象用于后续的训练,这个对象包含im_info:[height, width, im_scales], data: 图片的像素矩阵, gt_boxes: [xmin, ymin, xmax, ymax, cls_index]。我们将数据处理过程封装成为一个函数:
import os
import glob
import cv2
import xml.etree.ElementTree as ET
import numpy as np
import scipy.sparse
import scipy.io as sio
import numpy.random as npr
import time
from config import cfg
SCALES = (600,)
MAX_SIZE = 1000
PIXEL_MEANS = cfg.PIXEL_MEANS
class DataLayer(object):
def __init__(self, rootPath):
self.xmlPath = os.path.join(rootPath, "Annotations")
self.imgPath = os.path.join(rootPath, "JPEGImages")
self.point = 0 # 数据加载指针
# self.isOpen = True # 数据是否加载完,False代表已全部加载
self.classes = (
'__background__', # always index 0
'balloon')
self._class_to_ind = dict(
list(zip(self.classes, list(range(len(self.classes))))))
self.images = []
self.annotaions = []
self.filenames = []
self._load()
self.length = len(self.images)
# def refresh(self):
# self.point = 0 # 数据加载指针
# self.isOpen = True # 数据是否加载完,False代表已全部加载
def im_list_to_blob(self, ims):
"""Convert a list of images into a network input.
Assumes images are already prepared (means subtracted, BGR order, ...).
"""
max_shape = np.array([im.shape for im in ims]).max(axis=0)
num_images = len(ims)
blob = np.zeros((num_images, max_shape[0], max_shape[1], 3), dtype=np.float32)
for i in range(num_images):
im = ims[i]
blob[i, 0:im.shape[0], 0:im.shape[1], :] = im
return blob
def prep_im_for_blob(self, im, pixel_means, target_size, max_size):
"""Mean subtract and scale an image for use in a blob."""
im = im.astype(np.float32, copy=False)
im -= pixel_means
im_shape = im.shape
im_size_min = np.min(im_shape[0:2])
im_size_max = np.max(im_shape[0:2])
im_scale = float(target_size) / float(im_size_min)
# Prevent the biggest axis from being more than MAX_SIZE
if np.round(im_scale * im_size_max) > max_size:
im_scale = float(max_size) / float(im_size_max)
im = cv2.resize(
im,
None,
None,
fx=im_scale,
fy=im_scale,
interpolation=cv2.INTER_LINEAR)
return im, im_scale
def _get_image_blob(self, images, scale_inds):
"""Builds an input blob from the images in the roidb at the specified
scales.
"""
num_images = len(images)
processed_ims = []
im_scales = []
for i in range(num_images):
im = images[i]
target_size = SCALES[scale_inds[i]]
im, im_scale = self.prep_im_for_blob(im, PIXEL_MEANS, target_size, MAX_SIZE)
im_scales.append(im_scale)
processed_ims.append(im)
# Create a blob to hold the input images
blob = self.im_list_to_blob(processed_ims)
return blob, im_scales
def get_minibatch(self):
"""Given a roidb, construct a minibatch sampled from it."""
imageArray = np.asarray([self.images[self.point % self.length]])
annotationArray = self.annotaions[self.point % self.length]
num_images = len(imageArray)
# Sample random scales to use for each image in this batch
random_scale_inds = npr.randint(
0, high=len(SCALES), size=num_images)
# Get the input image blob, formatted for caffe
im_blob, im_scales = self._get_image_blob(imageArray, random_scale_inds)
assert len(im_scales) == 1, "Single batch only"
assert len(im_blob) == 1, "Single batch only"
blobs = {'data': im_blob}
# gt boxes: (x1, y1, x2, y2, cls)
gt_inds = np.where(annotationArray['gt_classes'] != 0)[0]
gt_boxes = np.empty((len(gt_inds), 5), dtype=np.float32)
gt_boxes[:, 0:4] = annotationArray['boxes'][gt_inds, :] * im_scales[0]
gt_boxes[:, 4] = annotationArray['gt_classes'][gt_inds]
blobs['gt_boxes'] = gt_boxes
blobs['im_info'] = np.array([im_blob.shape[1], im_blob.shape[2], im_scales[0]], dtype=np.float32) # [height, width, im_scales]
return blobs
def forward(self):
"""Get blobs and copy them into this layer's top blob vector."""
blobs = self.get_minibatch()
self.point += 1
# if self.point >= len(self.images):
# self.isOpen = False
return blobs
def _load(self):
imageArray = []
annotationArray = []
for imgP in glob.glob(self.imgPath + "/*.*"):
# 读取图片,
img = cv2.imread(imgP) # 读取的是BGR格式
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # 将图片从 BGR 模式转为 RGB 模式
imageArray.append(img)
# 读取xml中的标签内容
xmlP = imgP.replace("JPEGImages", "Annotations")
xmlP = xmlP.replace("jpg", "xml")
annotation = self._load_pascal_annotation(xmlP)
annotationArray.append(annotation)
self.filenames.append(xmlP)
self.images = imageArray
self.annotaions = annotationArray
def _load_pascal_annotation(self, filename):
"""
Load image and bounding boxes info from XML file in the PASCAL VOC
format.
"""
tree = ET.parse(filename)
objs = tree.findall('object')
num_objs = len(objs)
boxes = np.zeros((num_objs, 4), dtype=np.float32)
gt_classes = np.zeros((num_objs), dtype=np.int32)
overlaps = np.zeros((num_objs, len(self.classes)), dtype=np.float32)
# "Seg" area for pascal is just the box area
seg_areas = np.zeros((num_objs), dtype=np.float32)
# Load object bounding boxes into a data frame.
for ix, obj in enumerate(objs):
bbox = obj.find('bndbox')
# Make pixel indexes 0-based
x1 = float(bbox.find('xmin').text) - 1
y1 = float(bbox.find('ymin').text) - 1
x2 = float(bbox.find('xmax').text) - 1
y2 = float(bbox.find('ymax').text) - 1
cls = self._class_to_ind[obj.find('name').text.lower().strip()]
boxes[ix, :] = [x1, y1, x2, y2]
gt_classes[ix] = cls
overlaps[ix, cls] = 1.0
seg_areas[ix] = (x2 - x1 + 1) * (y2 - y1 + 1)
overlaps = scipy.sparse.csr_matrix(overlaps)
return {
'boxes': boxes,
'gt_classes': gt_classes,
'gt_overlaps': overlaps,
'flipped': False,
'seg_areas': seg_areas
}
if __name__ == '__main__':
datalayer = DataLayer("./dataset/val")
first = datalayer.forward()
print(first)
# --------------------------------------------------------
# Tensorflow Faster R-CNN
# Licensed under The MIT License [see LICENSE for details]
# Written by Xinlei Chen
# --------------------------------------------------------
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.models as models
from lib.snippets import generate_anchors_pre
from lib.proposal_layer import proposal_layer
from lib.proposal_top_layer import proposal_top_layer
from lib.anchor_target_layer import anchor_target_layer
from lib.proposal_target_layer import proposal_target_layer
from lib.visualization import draw_bounding_boxes
from torchvision.ops import RoIAlign, RoIPool
from config import cfg
import tensorboardX as tb
import cv2
# from scipy.misc import imresize
class Network(nn.Module):
def __init__(self):
nn.Module.__init__(self)
self._predictions = {}
self._losses = {}
self._anchor_targets = {}
self._proposal_targets = {}
self._layers = {}
self._gt_image = None
self._act_summaries = {}
self._score_summaries = {}
self._event_summaries = {}
self._image_gt_summaries = {}
self._variables_to_fix = {}
self._device = 'cuda'
# self._device = 'cpu'
def _add_gt_image(self):
# add back mean
image = self._image_gt_summaries['image'] + cfg.PIXEL_MEANS
# image = imresize(image[0], self._im_info[:2] / self._im_info[2])
image = cv2.resize(image[0], None, None, fx=self._im_info[1] / self._im_info[2], fy=self._im_info[0] / self._im_info[2])
# BGR to RGB (opencv uses BGR)
self._gt_image = image[np.newaxis, :, :, ::-1].copy(order='C')
def _add_gt_image_summary(self):
# use a customized visualization function to visualize the boxes
self._add_gt_image()
image = draw_bounding_boxes(\
self._gt_image, self._image_gt_summaries['gt_boxes'], self._image_gt_summaries['im_info'])
return tb.summary.image('GROUND_TRUTH',
image[0].astype('float32') / 255.0, dataformats='HWC')
def _add_act_summary(self, key, tensor):
return tb.summary.histogram(
'ACT/' + key + '/activations',
tensor.data.cpu().numpy(),
bins='auto'),
tb.summary.scalar('ACT/' + key + '/zero_fraction',
(tensor.data == 0).float().sum() / tensor.numel())
def _add_score_summary(self, key, tensor):
return tb.summary.histogram(
'SCORE/' + key + '/scores', tensor.data.cpu().numpy(), bins='auto')
def _add_train_summary(self, key, var):
return tb.summary.histogram(
'TRAIN/' + key, var.data.cpu().numpy(), bins='auto')
def _proposal_top_layer(self, rpn_cls_prob, rpn_bbox_pred):
rois, rpn_scores = proposal_top_layer(\
rpn_cls_prob, rpn_bbox_pred, self._im_info,
self._feat_stride, self._anchors, self._num_anchors)
return rois, rpn_scores
def _proposal_layer(self, rpn_cls_prob, rpn_bbox_pred):
rois, rpn_scores = proposal_layer(\
rpn_cls_prob, rpn_bbox_pred, self._im_info, self._mode,
self._feat_stride, self._anchors, self._num_anchors)
return rois, rpn_scores
def _roi_pool_layer(self, bottom, rois):
return RoIPool((cfg.POOLING_SIZE, cfg.POOLING_SIZE),
1.0 / 16.0)(bottom, rois)
def _roi_align_layer(self, bottom, rois):
return RoIAlign((cfg.POOLING_SIZE, cfg.POOLING_SIZE), 1.0 / 16.0,
0)(bottom, rois)
def _anchor_target_layer(self, rpn_cls_score):
rpn_labels, rpn_bbox_targets, rpn_bbox_inside_weights, rpn_bbox_outside_weights = \
anchor_target_layer(
rpn_cls_score.data, self._gt_boxes.data.cpu().numpy(), self._im_info, self._feat_stride, self._anchors.data.cpu().numpy(), self._num_anchors)
rpn_labels = torch.from_numpy(rpn_labels).float().to(
self._device) #.set_shape([1, 1, None, None])
rpn_bbox_targets = torch.from_numpy(rpn_bbox_targets).float().to(
self._device) #.set_shape([1, None, None, self._num_anchors * 4])
rpn_bbox_inside_weights = torch.from_numpy(
rpn_bbox_inside_weights).float().to(
self.
_device) #.set_shape([1, None, None, self._num_anchors * 4])
rpn_bbox_outside_weights = torch.from_numpy(
rpn_bbox_outside_weights).float().to(
self.
_device) #.set_shape([1, None, None, self._num_anchors * 4])
rpn_labels = rpn_labels.long()
self._anchor_targets['rpn_labels'] = rpn_labels
self._anchor_targets['rpn_bbox_targets'] = rpn_bbox_targets
self._anchor_targets[
'rpn_bbox_inside_weights'] = rpn_bbox_inside_weights
self._anchor_targets[
'rpn_bbox_outside_weights'] = rpn_bbox_outside_weights
for k in self._anchor_targets.keys():
self._score_summaries[k] = self._anchor_targets[k]
return rpn_labels
def _proposal_target_layer(self, rois, roi_scores):
rois, roi_scores, labels, bbox_targets, bbox_inside_weights, bbox_outside_weights = \
proposal_target_layer(
rois, roi_scores, self._gt_boxes, self._num_classes)
self._proposal_targets['rois'] = rois
self._proposal_targets['labels'] = labels.long()
self._proposal_targets['bbox_targets'] = bbox_targets
self._proposal_targets['bbox_inside_weights'] = bbox_inside_weights
self._proposal_targets['bbox_outside_weights'] = bbox_outside_weights
for k in self._proposal_targets.keys():
self._score_summaries[k] = self._proposal_targets[k]
return rois, roi_scores
def _anchor_component(self, height, width):
# just to get the shape right
#height = int(math.ceil(self._im_info.data[0, 0] / self._feat_stride[0]))
#width = int(math.ceil(self._im_info.data[0, 1] / self._feat_stride[0]))
anchors, anchor_length = generate_anchors_pre(\
height, width,
self._feat_stride, self._anchor_scales, self._anchor_ratios)
self._anchors = torch.from_numpy(anchors).to(self._device)
self._anchor_length = anchor_length
def _smooth_l1_loss(self,
bbox_pred,
bbox_targets,
bbox_inside_weights,
bbox_outside_weights,
sigma=1.0,
dim=[1]):
sigma_2 = sigma**2
box_diff = bbox_pred - bbox_targets
in_box_diff = bbox_inside_weights * box_diff
abs_in_box_diff = torch.abs(in_box_diff)
smoothL1_sign = (abs_in_box_diff < 1. / sigma_2).detach().float()
in_loss_box = torch.pow(in_box_diff, 2) * (sigma_2 / 2.) * smoothL1_sign \
+ (abs_in_box_diff - (0.5 / sigma_2)) * (1. - smoothL1_sign)
out_loss_box = bbox_outside_weights * in_loss_box
loss_box = out_loss_box
for i in sorted(dim, reverse=True):
loss_box = loss_box.sum(i)
loss_box = loss_box.mean()
return loss_box
def _add_losses(self, sigma_rpn=3.0):
# RPN, class loss
rpn_cls_score = self._predictions['rpn_cls_score_reshape'].view(-1, 2)
rpn_label = self._anchor_targets['rpn_labels'].view(-1)
rpn_select = (rpn_label.data != -1).nonzero().view(-1)
rpn_cls_score = rpn_cls_score.index_select(
0, rpn_select).contiguous().view(-1, 2)
rpn_label = rpn_label.index_select(0, rpn_select).contiguous().view(-1)
rpn_cross_entropy = F.cross_entropy(rpn_cls_score, rpn_label)
# RPN, bbox loss
rpn_bbox_pred = self._predictions['rpn_bbox_pred']
rpn_bbox_targets = self._anchor_targets['rpn_bbox_targets']
rpn_bbox_inside_weights = self._anchor_targets[
'rpn_bbox_inside_weights']
rpn_bbox_outside_weights = self._anchor_targets[
'rpn_bbox_outside_weights']
rpn_loss_box = self._smooth_l1_loss(
rpn_bbox_pred,
rpn_bbox_targets,
rpn_bbox_inside_weights,
rpn_bbox_outside_weights,
sigma=sigma_rpn,
dim=[1, 2, 3])
# RCNN, class loss
cls_score = self._predictions["cls_score"]
label = self._proposal_targets["labels"].view(-1)
cross_entropy = F.cross_entropy(
cls_score.view(-1, self._num_classes), label)
# RCNN, bbox loss
bbox_pred = self._predictions['bbox_pred']
bbox_targets = self._proposal_targets['bbox_targets']
bbox_inside_weights = self._proposal_targets['bbox_inside_weights']
bbox_outside_weights = self._proposal_targets['bbox_outside_weights']
loss_box = self._smooth_l1_loss(
bbox_pred, bbox_targets, bbox_inside_weights, bbox_outside_weights)
self._losses['cross_entropy'] = cross_entropy
self._losses['loss_box'] = loss_box
self._losses['rpn_cross_entropy'] = rpn_cross_entropy
self._losses['rpn_loss_box'] = rpn_loss_box
loss = cross_entropy + loss_box + rpn_cross_entropy + rpn_loss_box
self._losses['total_loss'] = loss
for k in self._losses.keys():
self._event_summaries[k] = self._losses[k]
return loss
def _region_proposal(self, net_conv):
rpn = F.relu(self.rpn_net(net_conv))
self._act_summaries['rpn'] = rpn
rpn_cls_score = self.rpn_cls_score_net(
rpn) # batch * (num_anchors * 2) * h * w
# change it so that the score has 2 as its channel size
rpn_cls_score_reshape = rpn_cls_score.view(
1, 2, -1,
rpn_cls_score.size()[-1]) # batch * 2 * (num_anchors*h) * w
rpn_cls_prob_reshape = F.softmax(rpn_cls_score_reshape, dim=1)
# Move channel to the last dimenstion, to fit the input of python functions
rpn_cls_prob = rpn_cls_prob_reshape.view_as(rpn_cls_score).permute(
0, 2, 3, 1) # batch * h * w * (num_anchors * 2)
rpn_cls_score = rpn_cls_score.permute(
0, 2, 3, 1) # batch * h * w * (num_anchors * 2)
rpn_cls_score_reshape = rpn_cls_score_reshape.permute(
0, 2, 3, 1).contiguous() # batch * (num_anchors*h) * w * 2
rpn_cls_pred = torch.max(rpn_cls_score_reshape.view(-1, 2), 1)[1]
rpn_bbox_pred = self.rpn_bbox_pred_net(rpn)
rpn_bbox_pred = rpn_bbox_pred.permute(
0, 2, 3, 1).contiguous() # batch * h * w * (num_anchors*4)
if self._mode == 'TRAIN':
rois, roi_scores = self._proposal_layer(
rpn_cls_prob, rpn_bbox_pred) # rois, roi_scores are varible
rpn_labels = self._anchor_target_layer(rpn_cls_score)
rois, _ = self._proposal_target_layer(rois, roi_scores)
else:
if cfg.TEST.MODE == 'nms':
rois, _ = self._proposal_layer(rpn_cls_prob, rpn_bbox_pred)
elif cfg.TEST.MODE == 'top':
rois, _ = self._proposal_top_layer(rpn_cls_prob, rpn_bbox_pred)
else:
raise NotImplementedError
self._predictions["rpn_cls_score"] = rpn_cls_score
self._predictions["rpn_cls_score_reshape"] = rpn_cls_score_reshape
self._predictions["rpn_cls_prob"] = rpn_cls_prob
self._predictions["rpn_cls_pred"] = rpn_cls_pred
self._predictions["rpn_bbox_pred"] = rpn_bbox_pred
self._predictions["rois"] = rois
return rois
def _region_classification(self, fc7):
cls_score = self.cls_score_net(fc7)
cls_pred = torch.max(cls_score, 1)[1]
cls_prob = F.softmax(cls_score, dim=1)
bbox_pred = self.bbox_pred_net(fc7)
self._predictions["cls_score"] = cls_score
self._predictions["cls_pred"] = cls_pred
self._predictions["cls_prob"] = cls_prob
self._predictions["bbox_pred"] = bbox_pred
return cls_prob, bbox_pred
def _image_to_head(self):
raise NotImplementedError
def _head_to_tail(self, pool5):
raise NotImplementedError
def create_architecture(self,
num_classes,
tag=None,
anchor_scales=(8, 16, 32),
anchor_ratios=(0.5, 1, 2)):
self._tag = tag
self._num_classes = num_classes
self._anchor_scales = anchor_scales
self._num_scales = len(anchor_scales)
self._anchor_ratios = anchor_ratios
self._num_ratios = len(anchor_ratios)
self._num_anchors = self._num_scales * self._num_ratios
assert tag != None
# Initialize layers
self._init_modules()
def _init_modules(self):
self._init_head_tail()
# rpn
self.rpn_net = nn.Conv2d(
self._net_conv_channels, cfg.RPN_CHANNELS, [3, 3], padding=1)
self.rpn_cls_score_net = nn.Conv2d(cfg.RPN_CHANNELS,
self._num_anchors * 2, [1, 1])
self.rpn_bbox_pred_net = nn.Conv2d(cfg.RPN_CHANNELS,
self._num_anchors * 4, [1, 1])
self.cls_score_net = nn.Linear(self._fc7_channels, self._num_classes)
self.bbox_pred_net = nn.Linear(self._fc7_channels,
self._num_classes * 4)
self.init_weights()
def _run_summary_op(self, val=False):
"""
Run the summary operator: feed the placeholders with corresponding newtork outputs(activations)
"""
summaries = []
# Add image gt
summaries.append(self._add_gt_image_summary())
# Add event_summaries
for key, var in self._event_summaries.items():
summaries.append(tb.summary.scalar(key, var.item()))
self._event_summaries = {}
if not val:
# Add score summaries
for key, var in self._score_summaries.items():
summaries.append(self._add_score_summary(key, var))
self._score_summaries = {}
# Add act summaries
for key, var in self._act_summaries.items():
summaries += self._add_act_summary(key, var)
self._act_summaries = {}
# Add train summaries
for k, var in dict(self.named_parameters()).items():
if var.requires_grad:
summaries.append(self._add_train_summary(k, var))
self._image_gt_summaries = {}
return summaries
def _predict(self):
# This is just _build_network in tf-faster-rcnn
torch.backends.cudnn.benchmark = False
net_conv = self._image_to_head()
# build the anchors for the image
self._anchor_component(net_conv.size(2), net_conv.size(3))
rois = self._region_proposal(net_conv)
if cfg.POOLING_MODE == 'align':
pool5 = self._roi_align_layer(net_conv, rois)
else:
pool5 = self._roi_pool_layer(net_conv, rois)
if self._mode == 'TRAIN':
torch.backends.cudnn.benchmark = True # benchmark because now the input size are fixed
fc7 = self._head_to_tail(pool5)
cls_prob, bbox_pred = self._region_classification(fc7)
for k in self._predictions.keys():
self._score_summaries[k] = self._predictions[k]
return rois, cls_prob, bbox_pred
def forward(self, image, im_info, gt_boxes=None, mode='TRAIN'):
self._image_gt_summaries['image'] = image
self._image_gt_summaries['gt_boxes'] = gt_boxes
self._image_gt_summaries['im_info'] = im_info
self._image = torch.from_numpy(image.transpose([0, 3, 1,
2])).to(self._device)
self._im_info = im_info # No need to change; actually it can be an list
self._gt_boxes = torch.from_numpy(gt_boxes).to(
self._device) if gt_boxes is not None else None
self._mode = mode
rois, cls_prob, bbox_pred = self._predict()
if mode == 'TEST':
stds = bbox_pred.data.new(cfg.TRAIN.BBOX_NORMALIZE_STDS).repeat(
self._num_classes).unsqueeze(0).expand_as(bbox_pred)
means = bbox_pred.data.new(cfg.TRAIN.BBOX_NORMALIZE_MEANS).repeat(
self._num_classes).unsqueeze(0).expand_as(bbox_pred)
self._predictions["bbox_pred"] = bbox_pred.mul(stds).add(means)
else:
self._add_losses() # compute losses
def init_weights(self):
def normal_init(m, mean, stddev, truncated=False):
"""
weight initalizer: truncated normal and random normal.
"""
# x is a parameter
if truncated:
m.weight.data.normal_().fmod_(2).mul_(stddev).add_(
mean) # not a perfect approximation
else:
m.weight.data.normal_(mean, stddev)
m.bias.data.zero_()
normal_init(self.rpn_net, 0, 0.01, cfg.TRAIN.TRUNCATED)
normal_init(self.rpn_cls_score_net, 0, 0.01, cfg.TRAIN.TRUNCATED)
normal_init(self.rpn_bbox_pred_net, 0, 0.01, cfg.TRAIN.TRUNCATED)
normal_init(self.cls_score_net, 0, 0.01, cfg.TRAIN.TRUNCATED)
normal_init(self.bbox_pred_net, 0, 0.001, cfg.TRAIN.TRUNCATED)
# Extract the head feature maps, for example for vgg16 it is conv5_3
# only useful during testing mode
def extract_head(self, image):
feat = self._layers["head"](torch.from_numpy(
image.transpose([0, 3, 1, 2])).to(self._device))
return feat
# only useful during testing mode
def test_image(self, image, im_info):
self.eval()
with torch.no_grad():
self.forward(image, im_info, None, mode='TEST')
cls_score, cls_prob, bbox_pred, rois = self._predictions["cls_score"].data.cpu().numpy(), \
self._predictions['cls_prob'].data.cpu().numpy(), \
self._predictions['bbox_pred'].data.cpu().numpy(), \
self._predictions['rois'].data.cpu().numpy()
return cls_score, cls_prob, bbox_pred, rois
def delete_intermediate_states(self):
# Delete intermediate result to save memory
for d in [
self._losses, self._predictions, self._anchor_targets,
self._proposal_targets
]:
for k in list(d):
del d[k]
def get_summary(self, blobs):
self.eval()
self.forward(blobs['data'], blobs['im_info'], blobs['gt_boxes'])
self.train()
summary = self._run_summary_op(True)
return summary
def train_step(self, blobs, train_op):
self.forward(blobs['data'], blobs['im_info'], blobs['gt_boxes'])
rpn_loss_cls, rpn_loss_box, loss_cls, loss_box, loss = self._losses["rpn_cross_entropy"].item(), \
self._losses['rpn_loss_box'].item(), \
self._losses['cross_entropy'].item(), \
self._losses['loss_box'].item(), \
self._losses['total_loss'].item()
#utils.timer.timer.tic('backward')
train_op.zero_grad()
self._losses['total_loss'].backward()
#utils.timer.timer.toc('backward')
train_op.step()
self.delete_intermediate_states()
return rpn_loss_cls, rpn_loss_box, loss_cls, loss_box, loss
def train_step_with_summary(self, blobs, train_op):
self.forward(blobs['data'], blobs['im_info'], blobs['gt_boxes'])
rpn_loss_cls, rpn_loss_box, loss_cls, loss_box, loss = self._losses["rpn_cross_entropy"].item(), \
self._losses['rpn_loss_box'].item(), \
self._losses['cross_entropy'].item(), \
self._losses['loss_box'].item(), \
self._losses['total_loss'].item()
train_op.zero_grad()
self._losses['total_loss'].backward()
train_op.step()
summary = self._run_summary_op()
self.delete_intermediate_states()
return rpn_loss_cls, rpn_loss_box, loss_cls, loss_box, loss, summary
def train_step_no_return(self, blobs, train_op):
self.forward(blobs['data'], blobs['im_info'], blobs['gt_boxes'])
train_op.zero_grad()
self._losses['total_loss'].backward()
train_op.step()
self.delete_intermediate_states()
def load_state_dict(self, state_dict):
"""
Because we remove the definition of fc layer in resnet now, it will fail when loading
the model trained before.
To provide back compatibility, we overwrite the load_state_dict
"""
nn.Module.load_state_dict(
self, {k: v
for k, v in state_dict.items() if k in self.state_dict()}
)
class VGG16(Network):
def __init__(self):
Network.__init__(self)
self._feat_stride = [
16,
]
self._feat_compress = [
1. / float(self._feat_stride[0]),
]
self._net_conv_channels = 512
self._fc7_channels = 4096
def _init_head_tail(self):
self.vgg = models.vgg16()
# Remove fc8
self.vgg.classifier = nn.Sequential(
*list(self.vgg.classifier._modules.values())[:-1])
# Fix the layers before conv3:
for layer in range(10):
for p in self.vgg.features[layer].parameters():
p.requires_grad = False
# not using the last maxpool layer
self._layers['head'] = nn.Sequential(
*list(self.vgg.features._modules.values())[:-1])
def _image_to_head(self):
net_conv = self._layers['head'](self._image)
self._act_summaries['conv'] = net_conv
return net_conv
def _head_to_tail(self, pool5):
pool5_flat = pool5.view(pool5.size(0), -1)
fc7 = self.vgg.classifier(pool5_flat)
return fc7
def load_pretrained_cnn(self, state_dict):
self.vgg.load_state_dict({
k: v
for k, v in state_dict.items() if k in self.vgg.state_dict()
})
2.3.2 建立一个简单的模型训练并保存¶
from loadData import DataLayer
from model import VGG16, Network
import torch
import tensorboardX as tb
import os
weight_output = "./weight_output"
if not os.path.exists(weight_output): # 如果路径不存在
os.makedirs(weight_output)
# import pydevd_pycharm
# pydevd_pycharm.settrace('10.214.160.245', port=8011, stdoutToServer=True, stderrToServer=True)
def train():
# 加载训练数据
data_layer = DataLayer("./dataset/train")
# image_len = len(data_layer.images)
data_layer_val = DataLayer("./dataset/val")
# 构建模型
net = VGG16()
# Construct the computation graph
net.create_architecture(2, tag='default', anchor_scales=[8,16,32], anchor_ratios=[0.5,1,2])
# Define the loss
# Set learning rate and momentum
lr = 0.0001
params = []
for key, value in dict(net.named_parameters()).items():
if value.requires_grad:
if 'bias' in key:
params += [{
'params': [value],
'lr': lr * 2,
'weight_decay': 0.0001
}]
else:
params += [{
'params': [value],
'lr': lr,
'weight_decay': getattr(value, 'weight_decay', 0.0001)
}]
optimizer = torch.optim.SGD(params, momentum=0.9)
# Write the train and validation information to tensorboard
writer = tb.writer.FileWriter("./tensorboard/train")
valwriter = tb.writer.FileWriter("./tensorboard/val")
print("model create successfully!")
# 加载模型
list_dir = os.listdir(weight_output)
if len(list_dir) == 0:
net.load_pretrained_cnn(torch.load("./imagenet_weights/vgg16.pth"))
else:
net.load_state_dict(torch.load(os.path.join(weight_output, list_dir[-1]), map_location='cpu'))
print("model params load successfully!")
net.train()
net.to(net._device)
# 开始训练
max_iter = 5001
for iter in range(max_iter):
# Compute the graph with summary
blobs = data_layer.forward()
if iter % 100 == 0:
rpn_loss_cls, rpn_loss_box, loss_cls, loss_box, total_loss, summary = net.train_step_with_summary(blobs, optimizer)
for _sum in summary:
writer.add_summary(_sum, float(iter))
# Also check the summary on the validation set
blobs_val = data_layer_val.forward()
summary_val = net.get_summary(blobs_val)
for _sum in summary_val:
valwriter.add_summary(_sum, float(iter))
else:
blobs = data_layer.forward()
# Compute the graph without summary
rpn_loss_cls, rpn_loss_box, loss_cls, loss_box, total_loss = net.train_step(blobs, optimizer)
if iter % 10 == 0:
# Display the last image training information
print('iter: %d / %d, total loss: %.6f\n >>> rpn_loss_cls: %.6f\n '
'>>> rpn_loss_box: %.6f\n >>> loss_cls: %.6f\n >>> loss_box: %.6f\n >>> lr: %f' % \
(iter, max_iter, total_loss, rpn_loss_cls, rpn_loss_box, loss_cls, loss_box, lr))
if iter % 1000 == 0:
torch.save(net.state_dict(), os.path.join(weight_output, "params"+'{0:0>9}'.format(iter)+".pkl"))
writer.close()
valwriter.close()
if __name__ == '__main__':
# os.environ['CUDA_VISIBLE_DEVICES'] = '0'
train()
2.3.3 分析模型训练过程以及模型概况¶
整个训练过程分为:
- 使用DataLayer加载训练数据。
- 构建模型VGG16。
- 使用torch.load加载模型参数。
- 设置训练epoch,然后开始训练。
- 记下训练结果,使用torch.save保存模型参数。
2.3.4 加载模型并对模型进行评估¶
目标检测的评估方法主要有两种,一种是ap(average precision),一种是CorLoc(Correct Localization)
本模型中只使用了ap作为评估方法
import xml.etree.ElementTree as ET
import os
import numpy as np
import glob
# classes = (
# '__background__', # always index 0
# 'balloon')
# class_to_ind = dict(
# list(zip(classes, list(range(len(classes))))))
def parse_rec(filename):
""" Parse a PASCAL VOC xml file """
tree = ET.parse(filename)
objects = []
for obj in tree.findall('object'):
obj_struct = {}
obj_struct['name'] = obj.find('name').text
# obj_struct['pose'] = obj.find('pose').text
# obj_struct['truncated'] = int(obj.find('truncated').text)
obj_struct['difficult'] = int(obj.find('difficult').text)
bbox = obj.find('bndbox')
obj_struct['bbox'] = [
float(bbox.find('xmin').text),
float(bbox.find('ymin').text),
float(bbox.find('xmax').text),
float(bbox.find('ymax').text)
]
objects.append(obj_struct)
return objects
def voc_ap(rec, prec, use_07_metric=False):
""" ap = voc_ap(rec, prec, [use_07_metric])
Compute VOC AP given precision and recall.
If use_07_metric is true, uses the
VOC 07 11 point method (default:False).
"""
if use_07_metric:
# 11 point metric
ap = 0.
for t in np.arange(0., 1.1, 0.1):
if np.sum(rec >= t) == 0:
p = 0
else:
p = np.max(prec[rec >= t])
ap = ap + p / 11.
else:
# correct AP calculation
# first append sentinel values at the end
mrec = np.concatenate(([0.], rec, [1.]))
mpre = np.concatenate(([0.], prec, [0.]))
# compute the precision envelope
for i in range(mpre.size - 1, 0, -1):
mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])
# to calculate area under PR curve, look for points
# where X axis (recall) changes value
i = np.where(mrec[1:] != mrec[:-1])[0]
# and sum (\Delta recall) * prec
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
return ap
def voc_eval(pred_values,
annopath,
classname,
ovthresh=0.5,
use_07_metric=False,
use_diff=False):
"""rec, prec, ap = voc_eval(pred_values,
annopath,
imagesetfile,
classname,
[ovthresh],
[use_07_metric])
Top level function that does the PASCAL VOC evaluation.
pred_values:
是一个2维矩阵,[[图片地址,一个bbox对于cls的分数,bbox_xmin, bbox_ymin, bbox_xmax, bbox_ymax]]
annopath: Path to annotations
annopath.format(imagename) should be the xml annotations file.
classname: Category name (duh)
[ovthresh]: Overlap threshold (default = 0.5)
[use_07_metric]: Whether to use VOC07's 11 point AP computation
(default False)
"""
recs = {}
for xmlP in glob.glob(annopath + "/*.*"):
recs[xmlP] = parse_rec(xmlP)
class_recs = {}
# extract gt objects for this cls class
npos = 0
for xmlP in glob.glob(annopath + "/*.*"):
R = [obj for obj in recs[xmlP] if obj['name'] == classname]
bbox = np.array([x['bbox'] for x in R])
if use_diff:
difficult = np.array([False for x in R]).astype(np.bool)
else:
difficult = np.array([x['difficult'] for x in R]).astype(np.bool)
det = [False] * len(R)
npos = npos + sum(~difficult)
class_recs[xmlP] = {
'bbox': bbox,
'difficult': difficult,
'det': det
}
image_ids = [x[0] for x in pred_values] # 图片下标或文件名(不包括.jpg),与imagename表示同样的意思
confidence = np.array([float(x[1]) for x in pred_values]) # 置信度
BB = np.array([[float(z) for z in x[2:]] for x in pred_values]) # bbox
nd = len(image_ids)
tp = np.zeros(nd)
fp = np.zeros(nd)
if BB.shape[0] > 0:
# sort by confidence
sorted_ind = np.argsort(-confidence)
sorted_scores = np.sort(-confidence)
BB = BB[sorted_ind, :]
image_ids = [image_ids[x] for x in sorted_ind]
# go down dets and mark TPs and FPs
for d in range(nd):
R = class_recs[image_ids[d]]
bb = BB[d, :].astype(float)
ovmax = -np.inf
BBGT = R['bbox'].astype(float)
if BBGT.size > 0:
# compute overlaps
# intersection
ixmin = np.maximum(BBGT[:, 0], bb[0])
iymin = np.maximum(BBGT[:, 1], bb[1])
ixmax = np.minimum(BBGT[:, 2], bb[2])
iymax = np.minimum(BBGT[:, 3], bb[3])
iw = np.maximum(ixmax - ixmin + 1., 0.)
ih = np.maximum(iymax - iymin + 1., 0.)
inters = iw * ih
# union
uni = ((bb[2] - bb[0] + 1.) * (bb[3] - bb[1] + 1.) +
(BBGT[:, 2] - BBGT[:, 0] + 1.) *
(BBGT[:, 3] - BBGT[:, 1] + 1.) - inters)
overlaps = inters / uni
ovmax = np.max(overlaps)
jmax = np.argmax(overlaps)
if ovmax > ovthresh:
if not R['difficult'][jmax]:
if not R['det'][jmax]:
tp[d] = 1.
R['det'][jmax] = 1
else:
fp[d] = 1.
else:
fp[d] = 1.
# compute precision recall
fp = np.cumsum(fp) # 在该方法之前fp中取值只能是0/1, np.cumsum方法是将fp数组中的所有数据加起来,
# 所加的总和在fp[-1]最后一位上(表示总共fp【错误肯定】的个数)。具体可以查看np.cumsum方法源码,在源码注释后面还有样例讲解
tp = np.cumsum(tp) # 在该方法之前tp中取值只能是0/1, np.cumsum方法是将tp数组中的所有数据加起来,
# 所加的总和在tp[-1]最后一位上(表示总共tp【正确肯定】的个数)。具体可以查看np.cumsum方法源码,在源码注释后面还有样例讲解
rec = tp / float(npos)
# avoid divide by zero in case the first detection matches a difficult
# ground truth
prec = tp / np.maximum(tp + fp, np.finfo(np.float64).eps)
ap = voc_ap(rec, prec, use_07_metric)
return rec, prec, ap
from loadData import DataLayer
from model import VGG16, Network
import torch
from lib.bbox_transform import bbox_transform_inv
import os
import numpy as np
from torchvision.ops import nms
from config import cfg
import cv2
from inference import _clip_boxes
from voc_eval import voc_eval
weight_output = "./weight_output"
def ap_test():
pred_values = []
data_layer_test = DataLayer(os.path.join("dataset", "test"))
net = VGG16()
# Construct the computation graph
net.create_architecture(2, tag='default', anchor_scales=[8, 16, 32], anchor_ratios=[0.5, 1, 2])
net.eval()
# if not torch.cuda.is_available():
net._device = 'cpu'
net.to(net._device)
# 加载模型
list_dir = os.listdir(weight_output)
if len(list_dir) == 0:
net.load_pretrained_cnn(torch.load("./imagenet_weights/vgg16.pth"))
else:
# net.load_state_dict(torch.load(os.path.join(weight_output, list_dir[-1])))
net.load_state_dict(torch.load(os.path.join(weight_output, list_dir[-1]), map_location='cpu'))
print("load model params successfully. ")
for i in range(data_layer_test.length):
blobs = data_layer_test.forward()
im_blob = blobs['data']
im_scale = blobs['im_info'][2]
_, scores, bbox_pred, rois = net.test_image(blobs['data'], blobs['im_info'])
boxes = rois[:, 1:5] / im_scale
img = cv2.resize(
im_blob[0],
None,
None,
fx=1 / im_scale,
fy=1 / im_scale,
interpolation=cv2.INTER_LINEAR)
img += cfg.PIXEL_MEANS
scores = np.reshape(scores, [scores.shape[0], -1])
bbox_pred = np.reshape(bbox_pred, [bbox_pred.shape[0], -1])
# Apply bounding-box regression deltas
box_deltas = bbox_pred
pred_boxes = bbox_transform_inv(
torch.from_numpy(boxes), torch.from_numpy(box_deltas)).numpy()
pred_boxes = _clip_boxes(pred_boxes, img.shape)
# skip j = 0, because it's the background class
for j in range(1, 2):
inds = np.where(scores[:, j] > 0.5)[0]
cls_scores = scores[inds, j]
cls_boxes = pred_boxes[inds, j * 4:(j + 1) * 4]
cls_dets = np.hstack((cls_boxes, cls_scores[:, np.newaxis])) \
.astype(np.float32, copy=False)
# NMS 操作,剔除一部分box
keep = nms(
torch.from_numpy(cls_boxes), torch.from_numpy(cls_scores),
cfg.TEST.NMS).numpy() if cls_dets.size > 0 else []
cls_dets = cls_dets[keep, :]
height, width, channel = img.shape
for r in range(len(cls_dets)):
# 画预测的框
left = float(max(cls_dets[r][0] + 1, 0))
top = float(max(cls_dets[r][1] + 1, 0))
right = float(min(cls_dets[r][2] + 1, width))
bottom = float(min(cls_dets[r][3] + 1, height))
pred_values.append([data_layer_test.filenames[i], cls_dets[r][4], left, top, right, bottom])
return voc_eval(pred_values, os.path.join("dataset", "test", "Annotations"), "balloon")
if __name__ == '__main__':
# os.environ['CUDA_VISIBLE_DEVICES'] = '0'
rec, prec, ap = ap_test()
print(rec)
print(prec)
print(ap)
score = 0
if ap <= 0:
score = 0
elif ap > 0 and ap <= 0.1:
score = 60
elif ap > 0.1 and ap <= 0.2:
score = 70
elif ap > 0.2 and ap <= 0.3:
score = 80
elif ap > 0.3 and ap <= 0.4:
score = 90
else:
score = 100
print("score: ", score)
2.3.5 加载模型并预测输入数据的结果¶
from loadData import DataLayer
from model import VGG16, Network
import torch
from lib.bbox_transform import bbox_transform_inv
import os
import numpy as np
from torchvision.ops import nms
from config import cfg
import cv2
import matplotlib.pyplot as plt
image_output = "./image_output"
if not os.path.exists(image_output): # 如果路径不存在
os.makedirs(image_output)
weight_output = "./weight_output"
def _clip_boxes(boxes, im_shape):
"""Clip boxes to image boundaries."""
# x1 >= 0
boxes[:, 0::4] = np.maximum(boxes[:, 0::4], 0)
# y1 >= 0
boxes[:, 1::4] = np.maximum(boxes[:, 1::4], 0)
# x2 < im_shape[1]
boxes[:, 2::4] = np.minimum(boxes[:, 2::4], im_shape[1] - 1)
# y2 < im_shape[0]
boxes[:, 3::4] = np.minimum(boxes[:, 3::4], im_shape[0] - 1)
return boxes
def inference():
data_layer_test = DataLayer("./dataset/val")
net = VGG16()
# Construct the computation graph
net.create_architecture(2, tag='default', anchor_scales=[8, 16, 32], anchor_ratios=[0.5, 1, 2])
net.eval()
# if not torch.cuda.is_available():
net._device = 'cpu'
net.to(net._device)
# 加载模型
list_dir = os.listdir(weight_output)
if len(list_dir) == 0:
net.load_pretrained_cnn(torch.load("./imagenet_weights/vgg16.pth"))
else:
# net.load_state_dict(torch.load(os.path.join(weight_output, list_dir[-1])))
net.load_state_dict(torch.load(os.path.join(weight_output, list_dir[-1]), map_location='cpu'))
print("load model params successfully. ")
fig = plt.gcf() # 获取当前图表,get current figure
fig.set_size_inches(10, 12) # 1寸等于 2.54 cm
for i in range(data_layer_test.length):
blobs = data_layer_test.forward()
im_blob = blobs['data']
im_scale = blobs['im_info'][2]
_, scores, bbox_pred, rois = net.test_image(blobs['data'], blobs['im_info'])
boxes = rois[:, 1:5] / im_scale
img = cv2.resize(
im_blob[0],
None,
None,
fx=1/im_scale,
fy=1/im_scale,
interpolation=cv2.INTER_LINEAR)
img += cfg.PIXEL_MEANS
scores = np.reshape(scores, [scores.shape[0], -1])
bbox_pred = np.reshape(bbox_pred, [bbox_pred.shape[0], -1])
# Apply bounding-box regression deltas
box_deltas = bbox_pred
pred_boxes = bbox_transform_inv(
torch.from_numpy(boxes), torch.from_numpy(box_deltas)).numpy()
pred_boxes = _clip_boxes(pred_boxes, img.shape)
# skip j = 0, because it's the background class
for j in range(1, 2):
inds = np.where(scores[:, j] > 0.5)[0]
cls_scores = scores[inds, j]
cls_boxes = pred_boxes[inds, j * 4:(j + 1) * 4]
cls_dets = np.hstack((cls_boxes, cls_scores[:, np.newaxis])) \
.astype(np.float32, copy=False)
# NMS 操作,剔除一部分box
keep = nms(
torch.from_numpy(cls_boxes), torch.from_numpy(cls_scores),
cfg.TEST.NMS).numpy() if cls_dets.size > 0 else []
cls_dets = cls_dets[keep, :]
height, width, channel = img.shape
for r in range(len(cls_dets)):
# 画预测的框
left = int(max(cls_dets[r][0], 0))
top = int(max(cls_dets[r][1], 0))
right = int(min(cls_dets[r][2], width))
bottom = int(min(cls_dets[r][3], height))
cv2.rectangle(img, (left, top), (right, bottom), (0, 255, 0), 1)
text_size, baseline = cv2.getTextSize(str(cls_dets[r][4]), 1, 1, 1)
cv2.rectangle(img, (left, top - text_size[1] - (baseline * 2)), (left + text_size[0], top),
(44, 44, 44), -1)
cv2.putText(img, str(cls_dets[r][4]), (left, top - baseline), 1,
1, (255, 255, 255), 1)
ax = plt.subplot(1, 1, 1) # 获取当前需要处理的子图
show_image = img.astype(np.int32, copy=False)
ax.imshow(show_image, cmap="binary")
ax.set_xticks([])
ax.set_yticks([])
plt.show()
# 文件保存
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
cv2.imwrite(os.path.join(image_output, '{0:0>6}'.format(i)+ ".jpg"), img)
if __name__ == '__main__':
# os.environ['CUDA_VISIBLE_DEVICES'] = '0'
inference()
print("11")