123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- from quantization import *
- from yolo import *
- import numpy as np
- import math, random
- random.seed(0)
- import cv2, sys, os, argparse, glob, shutil
- from get_map import letterbox, scale_boxes
- from yolo import non_max_suppression, plot_one_box, yolov8layer
- from config import names, net_shape, bin_path, output_names, fl, model_type
- os.environ['GLOG_minloglevel'] = '3'
- PROJECT_NAME = os.environ['PROJECT_NAME']
- qb_file = PROJECT_NAME+'/model_quantization/checkpoint_quan.qb'
- qk_file = PROJECT_NAME+'/model_quantization/checkpoint_quan.qk'
- infer_img = PROJECT_NAME+'/ezb_640x640.png'
- video_save_path = PROJECT_NAME+'/video_result.mp4'
- sh_commad = 'sh sh/ezb_inference.sh'
- color_arr = [[random.randint(0, 255) for _ in range(3)] for i in names]
- def parse_opt():
- parser = argparse.ArgumentParser()
- parser.add_argument('--path', type=str, required=True, help='image_path/image_folder_path/video_path')
- parser.add_argument('--type', type=str, choices=['caffe', 'qkqb', 'ezb', 'all'], required=True, help='caffe, qkqb, ezb')
- parser.add_argument('--iou', type=float, default=0.45, help='iou threshold')
- parser.add_argument('--conf', type=float, default=0.25, help='conf threshold')
- parser.add_argument('--video', action='store_true', help='video inference')
- parser.add_argument('--video_batch_size', type=int, default=32, help='video batch size')
- parser.add_argument('--device', type=int, default=1, help='device')
- opt = parser.parse_known_args()[0]
- return opt
- def letterbox_resize(image_path):
- ori_img = cv2.imdecode(np.fromfile(image_path, np.uint8), cv2.IMREAD_COLOR)
-
- ori_ima_shape = ori_img.shape
- img, ratio, (dw, dh) = letterbox(ori_img, new_shape=net_shape, auto=False)
- cv2.imwrite(infer_img, img)
- print(f'image_path:{image_path} ori_shape:{ori_ima_shape} resize_shape:(w:{int(ratio[1] * ori_ima_shape[1])}, h:{int(ratio[0] * ori_ima_shape[0])}) padding:(dw:{dw}, dh:{dh}) save_success -> {infer_img}.')
- return ori_img
- class CustomDataset(BaseDataset):
- def __init__(self, image_list, is_path_arr=False):
- super().__init__()
- self.image_list = image_list
- self.is_path_arr = is_path_arr
- def __getitem__(self, item):
- if self.is_path_arr:
- process_image = cv2.imdecode(np.fromfile(self.image_list[item], np.uint8), cv2.IMREAD_COLOR)
- process_image, ratio, (dw, dh) = letterbox(process_image, new_shape=net_shape, auto=False)
- else:
- # letterbox
- process_image = self.image_list[item]
- process_image, ratio, (dw, dh) = letterbox(process_image, new_shape=net_shape, auto=False)
- # process_image = cv2.resize(process_image, (640, 640))
- # srcnp = cv2.cvtColor(process_image, cv2.COLOR_BGR2RGB)
- srcnp = process_image
- srcnp = srcnp.astype(np.float32) / 256
- srcnp = np.array(srcnp)
- srcnp = np.transpose(srcnp, [2,0,1])
- return srcnp
-
- def __len__(self):
- return len(self.image_list)
-
- class Base_Detector:
- def __init__(self, opt, im_shape=net_shape) -> None:
- self.opt = opt
- self.im_shape = im_shape
-
- def __call__(self, image):
- pass
-
- def post_processing(self, ori_image, result):
- result = yolov8layer(result)
- pred = non_max_suppression([torch.from_numpy(result)], conf_thres=self.opt.conf, iou_thres=self.opt.iou)[0]
- pred[:, :4] = scale_boxes(self.im_shape, pred[:, :4], ori_image.shape[:2])
- for *xyxy, conf, cls in reversed(pred):
- x1, y1, x2, y2 = xyxy
- # print(x1, y1, x2, y2, conf, cls)
- plot_one_box([int(x1), int(y1), int(x2), int(y2)], ori_image, label=f'{names[int(cls)]} {float(conf):.2f}', color=color_arr[int(cls)])
- return ori_image
-
- def inference(self, image=None):
- pass
- class Caffe_Detector(Base_Detector):
- def __init__(self, opt, im_shape=net_shape) -> None:
- super().__init__(opt, im_shape)
- self.net = Net(opt.device)
- self.prototxt_file = f'{PROJECT_NAME}/model_caffe/model_0.prototxt'
- self.caffemodel_file = f'{PROJECT_NAME}/model_caffe/model_0.caffemodel'
-
- def __call__(self, path):
- image = cv2.imdecode(np.fromfile(path, np.uint8), cv2.IMREAD_COLOR)
- pred = self.net.src_forward(self.prototxt_file, self.caffemodel_file, CustomDataset([image]), 1)
- result = self.post_processing(image, [pred['output0'].copy(),pred['output1'].copy(),pred['output2'].copy()])
- return result
- class Caffe_Batch_Detector(Base_Detector):
- def __init__(self, opt, im_shape=net_shape) -> None:
- super().__init__(opt, im_shape)
- self.net = Net(opt.device)
- self.prototxt_file = f'{PROJECT_NAME}/model_caffe/model_0.prototxt'
- self.caffemodel_file = f'{PROJECT_NAME}/model_caffe/model_0.caffemodel'
-
- def __call__(self, path):
- pred = self.net.src_forward(self.prototxt_file, self.caffemodel_file, CustomDataset(path, is_path_arr=True), len(path))
- result = self.post_processing(path, [pred['output0'].copy(),pred['output1'].copy(),pred['output2'].copy()])
- return result
-
- def post_processing(self, image_path, result):
- pred = yolov8layer(result)
- pred = non_max_suppression(pred, conf_thres=self.opt.conf, iou_thres=self.opt.iou)
- image_result = []
- for i in range(len(pred)):
- pred_temp = pred[i]
- ori_image = cv2.imdecode(np.fromfile(image_path[i], np.uint8), cv2.IMREAD_COLOR)
- pred_temp[:, :4] = scale_boxes(self.im_shape, pred_temp[:, :4], ori_image.shape[:2])
- for *xyxy, conf, cls in reversed(pred_temp):
- x1, y1, x2, y2 = xyxy
- plot_one_box([int(x1), int(y1), int(x2), int(y2)], ori_image, label=f'{names[int(cls)]} {float(conf):.2f}', color=color_arr[int(cls)])
- image_result.append(ori_image.copy())
- return image_result
- class Qkqb_Detector(Base_Detector):
-
- def __init__(self, opt, im_shape=net_shape) -> None:
- super().__init__(opt, im_shape)
- self.net = Net(opt.device)
- self.qk_file = f'{PROJECT_NAME}/model_quantization/checkpoint_quan.qk'
- self.qb_file = f'{PROJECT_NAME}/model_quantization/checkpoint_quan.qb'
-
- def __call__(self, path):
- image = cv2.imdecode(np.fromfile(path, np.uint8), cv2.IMREAD_COLOR)
- pred = self.net.easy_forward(self.qk_file, self.qb_file, CustomDataset([image]), 1)
- result = self.post_processing(image, [pred[output_names[0]].copy(),pred[output_names[1]].copy(),pred[output_names[2]].copy()])
- return result
- class Qkqb_Batch_Detector(Base_Detector):
- def __init__(self, opt, im_shape=net_shape) -> None:
- super().__init__(opt, im_shape)
- self.net = Net(opt.device)
- self.qk_file = f'{PROJECT_NAME}/model_quantization/checkpoint_quan.qk'
- self.qb_file = f'{PROJECT_NAME}/model_quantization/checkpoint_quan.qb'
-
- def __call__(self, path):
- pred = self.net.easy_forward(self.qk_file, self.qb_file, CustomDataset(path, is_path_arr=True), len(path))
- result = self.post_processing(path, [pred[output_names[0]].copy(),pred[output_names[1]].copy(),pred[output_names[2]].copy()])
- return result
-
- def post_processing(self, image_path, result):
- pred = yolov8layer(result)
- pred = non_max_suppression(pred, conf_thres=self.opt.conf, iou_thres=self.opt.iou)
- image_result = []
- for i in range(len(pred)):
- pred_temp = pred[i]
- ori_image = cv2.imdecode(np.fromfile(image_path[i], np.uint8), cv2.IMREAD_COLOR)
- pred_temp[:, :4] = scale_boxes(self.im_shape, pred_temp[:, :4], ori_image.shape[:2])
- for *xyxy, conf, cls in reversed(pred_temp):
- x1, y1, x2, y2 = xyxy
- plot_one_box([int(x1), int(y1), int(x2), int(y2)], ori_image, label=f'{names[int(cls)]} {float(conf):.2f}', color=color_arr[int(cls)])
- image_result.append(ori_image.copy())
- return image_result
- class Ezb_Detector(Base_Detector):
- def __init__(self, opt, im_shape=net_shape) -> None:
- super().__init__(opt, im_shape)
-
- def __call__(self, path):
- image = letterbox_resize(path)
- os.system(sh_commad)
- result = self.post_processing(image, self.get_single_result_yolov8())
- return result
- def get_single_result_yolov8(self):
- shapes = Helper.get_caffe_output_shapes(qk_file)
- sim_res = np.fromfile(PROJECT_NAME+bin_path[0], dtype=np.int8 if Helper.get_quantize_out_bw(qk_file, output_names[0]) == 8 else np.int16)
- output1 = Helper.hw_data_to_caffe_int_data(sim_res, shapes[output_names[0]]) / math.pow(2, fl[0])
- sim_res = np.fromfile(PROJECT_NAME+bin_path[1], dtype=np.int8 if Helper.get_quantize_out_bw(qk_file, output_names[1]) == 8 else np.int16)
- output2 = Helper.hw_data_to_caffe_int_data(sim_res, shapes[output_names[1]]) / math.pow(2, fl[1])
- sim_res = np.fromfile(PROJECT_NAME+bin_path[2], dtype=np.int8 if Helper.get_quantize_out_bw(qk_file, output_names[2]) == 8 else np.int16)
- output3 = Helper.hw_data_to_caffe_int_data(sim_res, shapes[output_names[2]]) / math.pow(2, fl[2])
-
- return [output1, output2, output3]
-
- if __name__ == '__main__':
- opt = parse_opt()
- # 根据type 选择模型
- if opt.type != 'all':
- if opt.type == 'caffe':
- if opt.video:
- model = Caffe_Batch_Detector(opt)
- else:
- model = Caffe_Detector(opt)
- elif opt.type == 'qkqb':
- if opt.video:
- model = Qkqb_Batch_Detector(opt)
- else:
- model = Qkqb_Detector(opt)
- else:
- model = Ezb_Detector(opt)
- else:
- model = [Caffe_Detector(opt), Qkqb_Detector(opt), Ezb_Detector(opt)]
-
- # 判断设定的path是否是文件
- if os.path.isfile(opt.path):
- # 判断是否是视频推理
- if opt.video:
- time_arr, batch_image_path = [], []
- cap = cv2.VideoCapture(opt.path)
- fourcc, size = cv2.VideoWriter_fourcc(*'XVID'), (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
- out = cv2.VideoWriter(video_save_path, fourcc, 25.0, size)
- count, video_count = 0, int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
-
- folder_save_path = f'{PROJECT_NAME}/inference_video_save/'
- if os.path.exists(folder_save_path):
- shutil.rmtree(folder_save_path)
- os.makedirs(folder_save_path, exist_ok=True)
-
- video_temp_save_path = f'{PROJECT_NAME}/video_temp/'
- if os.path.exists(video_temp_save_path):
- shutil.rmtree(video_temp_save_path)
- os.makedirs(video_temp_save_path, exist_ok=True)
-
- if cap.isOpened():
- while True:
- flag, image = cap.read()
- if image is None:
- break
- cv2.imwrite(video_temp_save_path+f'{len(os.listdir(video_temp_save_path))}.jpg', image)
- batch_image_path.append(video_temp_save_path+f'{len(batch_image_path)}.jpg')
- if len(batch_image_path) == opt.video_batch_size:
- since = time.time()
- image_arr = model(batch_image_path)
- time_arr.append(time.time() - since)
-
- batch_image_path = []
- if os.path.exists(video_temp_save_path):
- shutil.rmtree(video_temp_save_path)
- os.makedirs(video_temp_save_path, exist_ok=True)
-
- for image in image_arr:
- out.write(image)
- cv2.imwrite(f'{folder_save_path + str(count + 1)}.jpg', image)
- count += 1
- print(f'video:{count}/{video_count}, img shape:{image.shape}, using time:{time_arr[-1]:.4f}s, mean time:{np.mean(time_arr):.4f}s, save frame in {folder_save_path + str(count + 1)}.jpg')
- if len(batch_image_path) != 0:
- since = time.time()
- image_arr = model(batch_image_path)
- time_arr.append(time.time() - since)
-
- for image in image_arr:
- out.write(image)
- cv2.imwrite(f'{folder_save_path + str(count + 1)}.jpg', image)
- count += 1
- print(f'video:{count}/{video_count}, img shape:{image.shape}, using time:{time_arr[-1]:.4f}s, mean time:{np.mean(time_arr):.4f}s, save frame in {folder_save_path + str(count + 1)}.jpg')
- if os.path.exists(video_temp_save_path):
- shutil.rmtree(video_temp_save_path)
- out.release()
- print('done.')
- else:
- # 判断是否全模型推理
- if type(model) is list:
- image = np.concatenate([m(opt.path) for m in model], axis=1)
- # 单模型推理
- else:
- image = model(opt.path)
- cv2.imwrite(infer_img, image)
- print(f'inference success. save image in {infer_img}')
- # 文件夹推理
- else:
- folder_save_path = f'{PROJECT_NAME}/inference_img_save/'
- if os.path.exists(folder_save_path):
- shutil.rmtree(folder_save_path)
- os.makedirs(folder_save_path, exist_ok=True)
- image_listdir = glob.glob(f'{opt.path}/*')
- for idx, img_path in enumerate(image_listdir):
- if type(model) is list:
- image = np.concatenate([m(img_path) for m in model], axis=1)
- else:
- image = model(img_path)
- cv2.imwrite(f'{folder_save_path + str(idx + 1)}.jpg', image)
- print('-'*20 + f'{idx + 1}/{len(image_listdir)} inference success. save image in {folder_save_path + str(idx + 1)}.jpg' + '-'*20)
-
- if type(model) is list:
- for model_ in model:
- if isinstance(model_, (Caffe_Detector, Caffe_Batch_Detector, Qkqb_Detector, Qkqb_Batch_Detector)):
- model_.net.release()
- else:
- if isinstance(model, (Caffe_Detector, Caffe_Batch_Detector, Qkqb_Detector, Qkqb_Batch_Detector)):
- model.net.release()
|