123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- import torch
- import torchvision
- import time
- import cv2
- import random
- random.seed(0)
- import tqdm
- import os
- import numpy as np
- from config import *
- from typing import Callable
- from multiprocessing import Pool
- def parallelise(function: Callable, data, chunksize=100, verbose=True, num_workers=os.cpu_count()):
- num_workers = 1 if num_workers < 1 else num_workers # Pool needs to have at least 1 worker.
- pool = Pool(processes=num_workers)
- results = list(
- tqdm.tqdm(pool.imap(function, data, chunksize), total=len(data), disable=not verbose)
- )
- pool.close()
- pool.join()
- return results
- def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
- # Resize and pad image while meeting stride-multiple constraints
- shape = im.shape[:2] # current shape [height, width]
- if isinstance(new_shape, int):
- new_shape = (new_shape, new_shape)
- # Scale ratio (new / old)
- r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
- if not scaleup: # only scale down, do not scale up (for better val mAP)
- r = min(r, 1.0)
- # Compute padding
- ratio = r, r # width, height ratios
- new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
- dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
- if auto: # minimum rectangle
- dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
- elif scaleFill: # stretch
- dw, dh = 0.0, 0.0
- new_unpad = (new_shape[1], new_shape[0])
- ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
- dw /= 2 # divide padding into 2 sides
- dh /= 2
- if shape[::-1] != new_unpad: # resize
- im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
- top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
- left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
- im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
- return im, ratio, (dw, dh)
- def xywh2xyxy(x):
- # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right
- y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x)
- y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x
- y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y
- y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x
- y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y
- return y
- def box_iou(box1, box2, eps=1e-7):
- """
- Calculate intersection-over-union (IoU) of boxes.
- Both sets of boxes are expected to be in (x1, y1, x2, y2) format.
- Based on https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py
- Args:
- box1 (torch.Tensor): A tensor of shape (N, 4) representing N bounding boxes.
- box2 (torch.Tensor): A tensor of shape (M, 4) representing M bounding boxes.
- eps (float, optional): A small value to avoid division by zero. Defaults to 1e-7.
- Returns:
- (torch.Tensor): An NxM tensor containing the pairwise IoU values for every element in box1 and box2.
- """
- # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2)
- (a1, a2), (b1, b2) = box1.unsqueeze(1).chunk(2, 2), box2.unsqueeze(0).chunk(2, 2)
- inter = (torch.min(a2, b2) - torch.max(a1, b1)).clamp_(0).prod(2)
- # IoU = inter / (area1 + area2 - inter)
- return inter / ((a2 - a1).prod(2) + (b2 - b1).prod(2) - inter + eps)
- def non_max_suppression(
- prediction,
- conf_thres=0.25,
- iou_thres=0.45,
- classes=None,
- agnostic=False,
- multi_label=False,
- labels=(),
- max_det=300,
- nc=0, # number of classes (optional)
- max_time_img=0.05,
- max_nms=30000,
- max_wh=7680,
- ):
- """
- Perform non-maximum suppression (NMS) on a set of boxes, with support for masks and multiple labels per box.
- Arguments:
- prediction (torch.Tensor): A tensor of shape (batch_size, num_classes + 4 + num_masks, num_boxes)
- containing the predicted boxes, classes, and masks. The tensor should be in the format
- output by a model, such as YOLO.
- conf_thres (float): The confidence threshold below which boxes will be filtered out.
- Valid values are between 0.0 and 1.0.
- iou_thres (float): The IoU threshold below which boxes will be filtered out during NMS.
- Valid values are between 0.0 and 1.0.
- classes (List[int]): A list of class indices to consider. If None, all classes will be considered.
- agnostic (bool): If True, the model is agnostic to the number of classes, and all
- classes will be considered as one.
- multi_label (bool): If True, each box may have multiple labels.
- labels (List[List[Union[int, float, torch.Tensor]]]): A list of lists, where each inner
- list contains the apriori labels for a given image. The list should be in the format
- output by a dataloader, with each label being a tuple of (class_index, x1, y1, x2, y2).
- max_det (int): The maximum number of boxes to keep after NMS.
- nc (int, optional): The number of classes output by the model. Any indices after this will be considered masks.
- max_time_img (float): The maximum time (seconds) for processing one image.
- max_nms (int): The maximum number of boxes into torchvision.ops.nms().
- max_wh (int): The maximum box width and height in pixels
- Returns:
- (List[torch.Tensor]): A list of length batch_size, where each element is a tensor of
- shape (num_boxes, 6 + num_masks) containing the kept boxes, with columns
- (x1, y1, x2, y2, confidence, class, mask1, mask2, ...).
- """
- # Checks
- assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0'
- assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0'
- if isinstance(prediction, (list, tuple)): # YOLOv8 model in validation model, output = (inference_out, loss_out)
- prediction = prediction[0] # select only inference output
- device = prediction.device
- mps = 'mps' in device.type # Apple MPS
- if mps: # MPS not fully supported yet, convert tensors to CPU before NMS
- prediction = prediction.cpu()
- bs = prediction.shape[0] # batch size
- nc = nc or (prediction.shape[1] - 4) # number of classes
- nm = prediction.shape[1] - nc - 4
- mi = 4 + nc # mask start index
- xc = prediction[:, 4:mi].amax(1) > conf_thres # candidates
- # Settings
- # min_wh = 2 # (pixels) minimum box width and height
- time_limit = 0.5 + max_time_img * bs # seconds to quit after
- redundant = True # require redundant detections
- multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img)
- merge = False # use merge-NMS
- t = time.time()
- output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs
- for xi, x in enumerate(prediction): # image index, image inference
- # Apply constraints
- # x[((x[:, 2:4] < min_wh) | (x[:, 2:4] > max_wh)).any(1), 4] = 0 # width-height
- x = x.transpose(0, -1)[xc[xi]] # confidence
- # Cat apriori labels if autolabelling
- if labels and len(labels[xi]):
- lb = labels[xi]
- v = torch.zeros((len(lb), nc + nm + 5), device=x.device)
- v[:, :4] = lb[:, 1:5] # box
- v[range(len(lb)), lb[:, 0].long() + 4] = 1.0 # cls
- x = torch.cat((x, v), 0)
- # If none remain process next image
- if not x.shape[0]:
- continue
- # Detections matrix nx6 (xyxy, conf, cls)
- box, cls, mask = x.split((4, nc, nm), 1)
- box = xywh2xyxy(box) # center_x, center_y, width, height) to (x1, y1, x2, y2)
- if multi_label:
- i, j = (cls > conf_thres).nonzero(as_tuple=False).T
- x = torch.cat((box[i], x[i, 4 + j, None], j[:, None].float(), mask[i]), 1)
- else: # best class only
- conf, j = cls.max(1, keepdim=True)
- x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres]
- # Filter by class
- if classes is not None:
- x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)]
- # Apply finite constraint
- # if not torch.isfinite(x).all():
- # x = x[torch.isfinite(x).all(1)]
- # Check shape
- n = x.shape[0] # number of boxes
- if not n: # no boxes
- continue
- x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence and remove excess boxes
- # Batched NMS
- c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
- boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores
- i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS
- i = i[:max_det] # limit detections
- if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean)
- # Update boxes as boxes(i,4) = weights(i,n) * boxes(n,4)
- iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix
- weights = iou * scores[None] # box weights
- x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes
- if redundant:
- i = i[iou.sum(1) > 1] # require redundancy
- output[xi] = x[i]
- if mps:
- output[xi] = output[xi].to(device)
- return output
- def plot_one_box(x, img, color=None, label=None, line_thickness=3):
- # Plots one bounding box on image img
- tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
- color = color or [random.randint(0, 255) for _ in range(3)]
- c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
- cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
- if label:
- tf = max(tl - 1, 1) # font thickness
- t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
- c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3
- cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filled
- cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
- def softmax(z):
- e_z = np.exp(z - np.max(z))
- return e_z / e_z.sum(axis=0)
- def dfl_conv(z):
- weights = np.expand_dims(np.arange(z.shape[-1]), axis=0)
- z = z * weights
- return np.sum(z, axis=-1)
- def dfl(res):
- res = np.expand_dims(res, axis=0)
- b, c, a = res.shape
- c1 = c // 4
- res = res.reshape((b, 4, c1, a))
- res = np.transpose(res, axes=[0, 3, 1, 2])
- # res = parallelise(softmax, res.reshape((-1, c1)))
- # res = parallelise(dfl_conv, res)
- res = np.stack([softmax(i) for i in res.reshape((-1, c1))])
- res = dfl_conv(res)
- return np.transpose(res.reshape((b, a, 4)), axes=[0, 2, 1])
- def make_anchor(input_shape=net_shape, grid_cell_offset=0.5):
- anchor_points, stride = [], []
- for i in strides:
- h, w = input_shape[0] // i, input_shape[1] // i
- sx = np.arange(w) + grid_cell_offset
- sy = np.arange(h) + grid_cell_offset
- sx, sy = np.meshgrid(sx, sy)
- anchor_points.append(np.stack((sx, sy), -1).reshape((-1, 2)))
- stride.append(np.full((h * w, 1), i))
- return np.transpose(np.concatenate(anchor_points), axes=[1, 0]), np.transpose(np.concatenate(stride), axes=[1, 0])
- def yolov8layer(res, c1=16):
- n, c = res[0].shape[:2]
- res = np.concatenate([xi.reshape((n, c, -1)) for xi in res], 2)
- res_dfl = parallelise(dfl, res[:, :4 * c1])
- res = np.concatenate([np.concatenate(res_dfl, axis=0), res[:, 4 * c1:]], axis=1)
- anchor_points, stride = make_anchor()
- x1y1 = anchor_points - res[:, :2]
- x2y2 = anchor_points + res[:, 2:4]
- # x1y1x2y2 -> xywh
- c_xy = (x1y1 + x2y2) / 2
- wh = x2y2 - x1y1
- res[:, :4] = np.concatenate((c_xy, wh), axis=1) * stride
- return res
|