import torch import torchvision import time import cv2 import random random.seed(0) import tqdm import os import numpy as np from config import * from typing import Callable from multiprocessing import Pool def parallelise(function: Callable, data, chunksize=100, verbose=True, num_workers=os.cpu_count()): num_workers = 1 if num_workers < 1 else num_workers # Pool needs to have at least 1 worker. pool = Pool(processes=num_workers) results = list( tqdm.tqdm(pool.imap(function, data, chunksize), total=len(data), disable=not verbose) ) pool.close() pool.join() return results def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32): # Resize and pad image while meeting stride-multiple constraints shape = im.shape[:2] # current shape [height, width] if isinstance(new_shape, int): new_shape = (new_shape, new_shape) # Scale ratio (new / old) r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) if not scaleup: # only scale down, do not scale up (for better val mAP) r = min(r, 1.0) # Compute padding ratio = r, r # width, height ratios new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding if auto: # minimum rectangle dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding elif scaleFill: # stretch dw, dh = 0.0, 0.0 new_unpad = (new_shape[1], new_shape[0]) ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios dw /= 2 # divide padding into 2 sides dh /= 2 if shape[::-1] != new_unpad: # resize im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border return im, ratio, (dw, dh) def xywh2xyxy(x): # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y return y def box_iou(box1, box2, eps=1e-7): """ Calculate intersection-over-union (IoU) of boxes. Both sets of boxes are expected to be in (x1, y1, x2, y2) format. Based on https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py Args: box1 (torch.Tensor): A tensor of shape (N, 4) representing N bounding boxes. box2 (torch.Tensor): A tensor of shape (M, 4) representing M bounding boxes. eps (float, optional): A small value to avoid division by zero. Defaults to 1e-7. Returns: (torch.Tensor): An NxM tensor containing the pairwise IoU values for every element in box1 and box2. """ # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2) (a1, a2), (b1, b2) = box1.unsqueeze(1).chunk(2, 2), box2.unsqueeze(0).chunk(2, 2) inter = (torch.min(a2, b2) - torch.max(a1, b1)).clamp_(0).prod(2) # IoU = inter / (area1 + area2 - inter) return inter / ((a2 - a1).prod(2) + (b2 - b1).prod(2) - inter + eps) def non_max_suppression( prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, labels=(), max_det=300, nc=0, # number of classes (optional) max_time_img=0.05, max_nms=30000, max_wh=7680, ): """ Perform non-maximum suppression (NMS) on a set of boxes, with support for masks and multiple labels per box. Arguments: prediction (torch.Tensor): A tensor of shape (batch_size, num_classes + 4 + num_masks, num_boxes) containing the predicted boxes, classes, and masks. The tensor should be in the format output by a model, such as YOLO. conf_thres (float): The confidence threshold below which boxes will be filtered out. Valid values are between 0.0 and 1.0. iou_thres (float): The IoU threshold below which boxes will be filtered out during NMS. Valid values are between 0.0 and 1.0. classes (List[int]): A list of class indices to consider. If None, all classes will be considered. agnostic (bool): If True, the model is agnostic to the number of classes, and all classes will be considered as one. multi_label (bool): If True, each box may have multiple labels. labels (List[List[Union[int, float, torch.Tensor]]]): A list of lists, where each inner list contains the apriori labels for a given image. The list should be in the format output by a dataloader, with each label being a tuple of (class_index, x1, y1, x2, y2). max_det (int): The maximum number of boxes to keep after NMS. nc (int, optional): The number of classes output by the model. Any indices after this will be considered masks. max_time_img (float): The maximum time (seconds) for processing one image. max_nms (int): The maximum number of boxes into torchvision.ops.nms(). max_wh (int): The maximum box width and height in pixels Returns: (List[torch.Tensor]): A list of length batch_size, where each element is a tensor of shape (num_boxes, 6 + num_masks) containing the kept boxes, with columns (x1, y1, x2, y2, confidence, class, mask1, mask2, ...). """ # Checks assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0' assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0' if isinstance(prediction, (list, tuple)): # YOLOv8 model in validation model, output = (inference_out, loss_out) prediction = prediction[0] # select only inference output device = prediction.device mps = 'mps' in device.type # Apple MPS if mps: # MPS not fully supported yet, convert tensors to CPU before NMS prediction = prediction.cpu() bs = prediction.shape[0] # batch size nc = nc or (prediction.shape[1] - 4) # number of classes nm = prediction.shape[1] - nc - 4 mi = 4 + nc # mask start index xc = prediction[:, 4:mi].amax(1) > conf_thres # candidates # Settings # min_wh = 2 # (pixels) minimum box width and height time_limit = 0.5 + max_time_img * bs # seconds to quit after redundant = True # require redundant detections multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img) merge = False # use merge-NMS t = time.time() output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs for xi, x in enumerate(prediction): # image index, image inference # Apply constraints # x[((x[:, 2:4] < min_wh) | (x[:, 2:4] > max_wh)).any(1), 4] = 0 # width-height x = x.transpose(0, -1)[xc[xi]] # confidence # Cat apriori labels if autolabelling if labels and len(labels[xi]): lb = labels[xi] v = torch.zeros((len(lb), nc + nm + 5), device=x.device) v[:, :4] = lb[:, 1:5] # box v[range(len(lb)), lb[:, 0].long() + 4] = 1.0 # cls x = torch.cat((x, v), 0) # If none remain process next image if not x.shape[0]: continue # Detections matrix nx6 (xyxy, conf, cls) box, cls, mask = x.split((4, nc, nm), 1) box = xywh2xyxy(box) # center_x, center_y, width, height) to (x1, y1, x2, y2) if multi_label: i, j = (cls > conf_thres).nonzero(as_tuple=False).T x = torch.cat((box[i], x[i, 4 + j, None], j[:, None].float(), mask[i]), 1) else: # best class only conf, j = cls.max(1, keepdim=True) x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres] # Filter by class if classes is not None: x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] # Apply finite constraint # if not torch.isfinite(x).all(): # x = x[torch.isfinite(x).all(1)] # Check shape n = x.shape[0] # number of boxes if not n: # no boxes continue x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence and remove excess boxes # Batched NMS c = x[:, 5:6] * (0 if agnostic else max_wh) # classes boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS i = i[:max_det] # limit detections if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean) # Update boxes as boxes(i,4) = weights(i,n) * boxes(n,4) iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix weights = iou * scores[None] # box weights x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes if redundant: i = i[iou.sum(1) > 1] # require redundancy output[xi] = x[i] if mps: output[xi] = output[xi].to(device) return output def plot_one_box(x, img, color=None, label=None, line_thickness=3): # Plots one bounding box on image img tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness color = color or [random.randint(0, 255) for _ in range(3)] c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA) if label: tf = max(tl - 1, 1) # font thickness t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3 cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filled cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA) def softmax(z): e_z = np.exp(z - np.max(z)) return e_z / e_z.sum(axis=0) def dfl_conv(z): weights = np.expand_dims(np.arange(z.shape[-1]), axis=0) z = z * weights return np.sum(z, axis=-1) def dfl(res): res = np.expand_dims(res, axis=0) b, c, a = res.shape c1 = c // 4 res = res.reshape((b, 4, c1, a)) res = np.transpose(res, axes=[0, 3, 1, 2]) # res = parallelise(softmax, res.reshape((-1, c1))) # res = parallelise(dfl_conv, res) res = np.stack([softmax(i) for i in res.reshape((-1, c1))]) res = dfl_conv(res) return np.transpose(res.reshape((b, a, 4)), axes=[0, 2, 1]) def make_anchor(input_shape=net_shape, grid_cell_offset=0.5): anchor_points, stride = [], [] for i in strides: h, w = input_shape[0] // i, input_shape[1] // i sx = np.arange(w) + grid_cell_offset sy = np.arange(h) + grid_cell_offset sx, sy = np.meshgrid(sx, sy) anchor_points.append(np.stack((sx, sy), -1).reshape((-1, 2))) stride.append(np.full((h * w, 1), i)) return np.transpose(np.concatenate(anchor_points), axes=[1, 0]), np.transpose(np.concatenate(stride), axes=[1, 0]) def yolov8layer(res, c1=16): n, c = res[0].shape[:2] res = np.concatenate([xi.reshape((n, c, -1)) for xi in res], 2) res_dfl = parallelise(dfl, res[:, :4 * c1]) res = np.concatenate([np.concatenate(res_dfl, axis=0), res[:, 4 * c1:]], axis=1) anchor_points, stride = make_anchor() x1y1 = anchor_points - res[:, :2] x2y2 = anchor_points + res[:, 2:4] # x1y1x2y2 -> xywh c_xy = (x1y1 + x2y2) / 2 wh = x2y2 - x1y1 res[:, :4] = np.concatenate((c_xy, wh), axis=1) * stride return res