from quantization import * from yolo import * import numpy as np import math, random random.seed(0) import cv2, sys, os, argparse, glob, shutil from get_map import letterbox, scale_boxes from yolo import non_max_suppression, plot_one_box, yolov8layer from config import names, net_shape, bin_path, output_names, fl, model_type os.environ['GLOG_minloglevel'] = '3' PROJECT_NAME = os.environ['PROJECT_NAME'] qb_file = PROJECT_NAME+'/model_quantization/checkpoint_quan.qb' qk_file = PROJECT_NAME+'/model_quantization/checkpoint_quan.qk' infer_img = PROJECT_NAME+'/ezb_640x640.png' video_save_path = PROJECT_NAME+'/video_result.mp4' sh_commad = 'sh sh/ezb_inference.sh' color_arr = [[random.randint(0, 255) for _ in range(3)] for i in names] def parse_opt(): parser = argparse.ArgumentParser() parser.add_argument('--path', type=str, required=True, help='image_path/image_folder_path/video_path') parser.add_argument('--type', type=str, choices=['caffe', 'qkqb', 'ezb', 'all'], required=True, help='caffe, qkqb, ezb') parser.add_argument('--iou', type=float, default=0.45, help='iou threshold') parser.add_argument('--conf', type=float, default=0.25, help='conf threshold') parser.add_argument('--video', action='store_true', help='video inference') parser.add_argument('--video_batch_size', type=int, default=32, help='video batch size') parser.add_argument('--device', type=int, default=1, help='device') opt = parser.parse_known_args()[0] return opt def letterbox_resize(image_path): ori_img = cv2.imdecode(np.fromfile(image_path, np.uint8), cv2.IMREAD_COLOR) ori_ima_shape = ori_img.shape img, ratio, (dw, dh) = letterbox(ori_img, new_shape=net_shape, auto=False) cv2.imwrite(infer_img, img) print(f'image_path:{image_path} ori_shape:{ori_ima_shape} resize_shape:(w:{int(ratio[1] * ori_ima_shape[1])}, h:{int(ratio[0] * ori_ima_shape[0])}) padding:(dw:{dw}, dh:{dh}) save_success -> {infer_img}.') return ori_img class CustomDataset(BaseDataset): def __init__(self, image_list, is_path_arr=False): super().__init__() self.image_list = image_list self.is_path_arr = is_path_arr def __getitem__(self, item): if self.is_path_arr: process_image = cv2.imdecode(np.fromfile(self.image_list[item], np.uint8), cv2.IMREAD_COLOR) process_image, ratio, (dw, dh) = letterbox(process_image, new_shape=net_shape, auto=False) else: # letterbox process_image = self.image_list[item] process_image, ratio, (dw, dh) = letterbox(process_image, new_shape=net_shape, auto=False) # process_image = cv2.resize(process_image, (640, 640)) # srcnp = cv2.cvtColor(process_image, cv2.COLOR_BGR2RGB) srcnp = process_image srcnp = srcnp.astype(np.float32) / 256 srcnp = np.array(srcnp) srcnp = np.transpose(srcnp, [2,0,1]) return srcnp def __len__(self): return len(self.image_list) class Base_Detector: def __init__(self, opt, im_shape=net_shape) -> None: self.opt = opt self.im_shape = im_shape def __call__(self, image): pass def post_processing(self, ori_image, result): result = yolov8layer(result) pred = non_max_suppression([torch.from_numpy(result)], conf_thres=self.opt.conf, iou_thres=self.opt.iou)[0] pred[:, :4] = scale_boxes(self.im_shape, pred[:, :4], ori_image.shape[:2]) for *xyxy, conf, cls in reversed(pred): x1, y1, x2, y2 = xyxy # print(x1, y1, x2, y2, conf, cls) plot_one_box([int(x1), int(y1), int(x2), int(y2)], ori_image, label=f'{names[int(cls)]} {float(conf):.2f}', color=color_arr[int(cls)]) return ori_image def inference(self, image=None): pass class Caffe_Detector(Base_Detector): def __init__(self, opt, im_shape=net_shape) -> None: super().__init__(opt, im_shape) self.net = Net(opt.device) self.prototxt_file = f'{PROJECT_NAME}/model_caffe/model_0.prototxt' self.caffemodel_file = f'{PROJECT_NAME}/model_caffe/model_0.caffemodel' def __call__(self, path): image = cv2.imdecode(np.fromfile(path, np.uint8), cv2.IMREAD_COLOR) pred = self.net.src_forward(self.prototxt_file, self.caffemodel_file, CustomDataset([image]), 1) result = self.post_processing(image, [pred['output0'].copy(),pred['output1'].copy(),pred['output2'].copy()]) return result class Caffe_Batch_Detector(Base_Detector): def __init__(self, opt, im_shape=net_shape) -> None: super().__init__(opt, im_shape) self.net = Net(opt.device) self.prototxt_file = f'{PROJECT_NAME}/model_caffe/model_0.prototxt' self.caffemodel_file = f'{PROJECT_NAME}/model_caffe/model_0.caffemodel' def __call__(self, path): pred = self.net.src_forward(self.prototxt_file, self.caffemodel_file, CustomDataset(path, is_path_arr=True), len(path)) result = self.post_processing(path, [pred['output0'].copy(),pred['output1'].copy(),pred['output2'].copy()]) return result def post_processing(self, image_path, result): pred = yolov8layer(result) pred = non_max_suppression(pred, conf_thres=self.opt.conf, iou_thres=self.opt.iou) image_result = [] for i in range(len(pred)): pred_temp = pred[i] ori_image = cv2.imdecode(np.fromfile(image_path[i], np.uint8), cv2.IMREAD_COLOR) pred_temp[:, :4] = scale_boxes(self.im_shape, pred_temp[:, :4], ori_image.shape[:2]) for *xyxy, conf, cls in reversed(pred_temp): x1, y1, x2, y2 = xyxy plot_one_box([int(x1), int(y1), int(x2), int(y2)], ori_image, label=f'{names[int(cls)]} {float(conf):.2f}', color=color_arr[int(cls)]) image_result.append(ori_image.copy()) return image_result class Qkqb_Detector(Base_Detector): def __init__(self, opt, im_shape=net_shape) -> None: super().__init__(opt, im_shape) self.net = Net(opt.device) self.qk_file = f'{PROJECT_NAME}/model_quantization/checkpoint_quan.qk' self.qb_file = f'{PROJECT_NAME}/model_quantization/checkpoint_quan.qb' def __call__(self, path): image = cv2.imdecode(np.fromfile(path, np.uint8), cv2.IMREAD_COLOR) pred = self.net.easy_forward(self.qk_file, self.qb_file, CustomDataset([image]), 1) result = self.post_processing(image, [pred[output_names[0]].copy(),pred[output_names[1]].copy(),pred[output_names[2]].copy()]) return result class Qkqb_Batch_Detector(Base_Detector): def __init__(self, opt, im_shape=net_shape) -> None: super().__init__(opt, im_shape) self.net = Net(opt.device) self.qk_file = f'{PROJECT_NAME}/model_quantization/checkpoint_quan.qk' self.qb_file = f'{PROJECT_NAME}/model_quantization/checkpoint_quan.qb' def __call__(self, path): pred = self.net.easy_forward(self.qk_file, self.qb_file, CustomDataset(path, is_path_arr=True), len(path)) result = self.post_processing(path, [pred[output_names[0]].copy(),pred[output_names[1]].copy(),pred[output_names[2]].copy()]) return result def post_processing(self, image_path, result): pred = yolov8layer(result) pred = non_max_suppression(pred, conf_thres=self.opt.conf, iou_thres=self.opt.iou) image_result = [] for i in range(len(pred)): pred_temp = pred[i] ori_image = cv2.imdecode(np.fromfile(image_path[i], np.uint8), cv2.IMREAD_COLOR) pred_temp[:, :4] = scale_boxes(self.im_shape, pred_temp[:, :4], ori_image.shape[:2]) for *xyxy, conf, cls in reversed(pred_temp): x1, y1, x2, y2 = xyxy plot_one_box([int(x1), int(y1), int(x2), int(y2)], ori_image, label=f'{names[int(cls)]} {float(conf):.2f}', color=color_arr[int(cls)]) image_result.append(ori_image.copy()) return image_result class Ezb_Detector(Base_Detector): def __init__(self, opt, im_shape=net_shape) -> None: super().__init__(opt, im_shape) def __call__(self, path): image = letterbox_resize(path) os.system(sh_commad) result = self.post_processing(image, self.get_single_result_yolov8()) return result def get_single_result_yolov8(self): shapes = Helper.get_caffe_output_shapes(qk_file) sim_res = np.fromfile(PROJECT_NAME+bin_path[0], dtype=np.int8 if Helper.get_quantize_out_bw(qk_file, output_names[0]) == 8 else np.int16) output1 = Helper.hw_data_to_caffe_int_data(sim_res, shapes[output_names[0]]) / math.pow(2, fl[0]) sim_res = np.fromfile(PROJECT_NAME+bin_path[1], dtype=np.int8 if Helper.get_quantize_out_bw(qk_file, output_names[1]) == 8 else np.int16) output2 = Helper.hw_data_to_caffe_int_data(sim_res, shapes[output_names[1]]) / math.pow(2, fl[1]) sim_res = np.fromfile(PROJECT_NAME+bin_path[2], dtype=np.int8 if Helper.get_quantize_out_bw(qk_file, output_names[2]) == 8 else np.int16) output3 = Helper.hw_data_to_caffe_int_data(sim_res, shapes[output_names[2]]) / math.pow(2, fl[2]) return [output1, output2, output3] if __name__ == '__main__': opt = parse_opt() # 根据type 选择模型 if opt.type != 'all': if opt.type == 'caffe': if opt.video: model = Caffe_Batch_Detector(opt) else: model = Caffe_Detector(opt) elif opt.type == 'qkqb': if opt.video: model = Qkqb_Batch_Detector(opt) else: model = Qkqb_Detector(opt) else: model = Ezb_Detector(opt) else: model = [Caffe_Detector(opt), Qkqb_Detector(opt), Ezb_Detector(opt)] # 判断设定的path是否是文件 if os.path.isfile(opt.path): # 判断是否是视频推理 if opt.video: time_arr, batch_image_path = [], [] cap = cv2.VideoCapture(opt.path) fourcc, size = cv2.VideoWriter_fourcc(*'XVID'), (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) out = cv2.VideoWriter(video_save_path, fourcc, 25.0, size) count, video_count = 0, int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) folder_save_path = f'{PROJECT_NAME}/inference_video_save/' if os.path.exists(folder_save_path): shutil.rmtree(folder_save_path) os.makedirs(folder_save_path, exist_ok=True) video_temp_save_path = f'{PROJECT_NAME}/video_temp/' if os.path.exists(video_temp_save_path): shutil.rmtree(video_temp_save_path) os.makedirs(video_temp_save_path, exist_ok=True) if cap.isOpened(): while True: flag, image = cap.read() if image is None: break cv2.imwrite(video_temp_save_path+f'{len(os.listdir(video_temp_save_path))}.jpg', image) batch_image_path.append(video_temp_save_path+f'{len(batch_image_path)}.jpg') if len(batch_image_path) == opt.video_batch_size: since = time.time() image_arr = model(batch_image_path) time_arr.append(time.time() - since) batch_image_path = [] if os.path.exists(video_temp_save_path): shutil.rmtree(video_temp_save_path) os.makedirs(video_temp_save_path, exist_ok=True) for image in image_arr: out.write(image) cv2.imwrite(f'{folder_save_path + str(count + 1)}.jpg', image) count += 1 print(f'video:{count}/{video_count}, img shape:{image.shape}, using time:{time_arr[-1]:.4f}s, mean time:{np.mean(time_arr):.4f}s, save frame in {folder_save_path + str(count + 1)}.jpg') if len(batch_image_path) != 0: since = time.time() image_arr = model(batch_image_path) time_arr.append(time.time() - since) for image in image_arr: out.write(image) cv2.imwrite(f'{folder_save_path + str(count + 1)}.jpg', image) count += 1 print(f'video:{count}/{video_count}, img shape:{image.shape}, using time:{time_arr[-1]:.4f}s, mean time:{np.mean(time_arr):.4f}s, save frame in {folder_save_path + str(count + 1)}.jpg') if os.path.exists(video_temp_save_path): shutil.rmtree(video_temp_save_path) out.release() print('done.') else: # 判断是否全模型推理 if type(model) is list: image = np.concatenate([m(opt.path) for m in model], axis=1) # 单模型推理 else: image = model(opt.path) cv2.imwrite(infer_img, image) print(f'inference success. save image in {infer_img}') # 文件夹推理 else: folder_save_path = f'{PROJECT_NAME}/inference_img_save/' if os.path.exists(folder_save_path): shutil.rmtree(folder_save_path) os.makedirs(folder_save_path, exist_ok=True) image_listdir = glob.glob(f'{opt.path}/*') for idx, img_path in enumerate(image_listdir): if type(model) is list: image = np.concatenate([m(img_path) for m in model], axis=1) else: image = model(img_path) cv2.imwrite(f'{folder_save_path + str(idx + 1)}.jpg', image) print('-'*20 + f'{idx + 1}/{len(image_listdir)} inference success. save image in {folder_save_path + str(idx + 1)}.jpg' + '-'*20) if type(model) is list: for model_ in model: if isinstance(model_, (Caffe_Detector, Caffe_Batch_Detector, Qkqb_Detector, Qkqb_Batch_Detector)): model_.net.release() else: if isinstance(model, (Caffe_Detector, Caffe_Batch_Detector, Qkqb_Detector, Qkqb_Batch_Detector)): model.net.release()