สร้าง Object Detection ด้วย PyTorch
Thanawat Srithongkul
Posted on April 17, 2024
การตรวจจับวัตถุในภาพเป็นหนึ่งในศาสตร์ในการประมวลผลภาพ โดยโมเดล Object Detection ช่วยให้เราสามารถตรวจจับวัตถุและหมวดหมู่วัตถุต่างๆ ในภาพได้อย่างมีประสิทธิภาพ ไม่ว่าจะเป็นการตรวจจับรถยนต์ในถนน, การตรวจจับใบหน้าในภาพ, หรือการตรวจจับวัตถุในภาพทางด้านการแพทย์ เราสามารถประยุกต์ใช้งาน Object Detection ได้ในหลายด้านของชีวิตประจำวันและอุตสาหกรรมต่าง ๆ
ในบล็อกนี้ ท่านจะได้เรียนรู้ขั้นตอนการสร้างโมเดล Object Detection ใน PyTorch ซึ่งเป็นไลบรารีที่มีความนิยมและมีความสามารถที่ยอดเยี่ยมในการพัฒนาโมเดลปัญญาประดิษฐ์
- ขั้นตอนการทำ
ขั้นตอนที่ 1 นำเข้า libraries ที่ต้องใช้ทั้งหมด
!pip install xmltodict
import os, cv2, random, xmltodict, torch, pandas as pd, numpy as np
from glob import glob
from PIL import Image
from torch.utils.data import random_split, Dataset, DataLoader
from matplotlib import pyplot as plt
from torch import Tensor
from torchvision.transforms import functional as F, transforms as T
from typing import Dict, Optional, Tuple, List
from google.colab import drive
drive.mount('/content/drive')
from torchvision.models.detection import fasterrcnn_resnet50_fpn
!pip install pycocotools
import torch.distributed as dist, datetime
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import pycocotools.mask as mask_util
from contextlib import redirect_stdout
import copy, io
import math, sys, time, torch, torchvision
import torchvision.models.detection.mask_rcnn
from pycocotools.coco import COCO
from pycocotools import mask as coco_mask
from collections import defaultdict, deque
ขั้นตอนที่ 2 โหลดข้อมูล และสร้าง DataLoader สำหรับการ Train โมเดล Object Detection ด้วย PyTorch เพื่อนำไปใช้ในภายหลัง
class ObjectDetectionCustomDataset(Dataset):
def __init__(self, root, transformations = None, size = 224, im_files = [".jpg", ".png", ".jpeg"]):
self.root, self.size, self.transformations = root, size, transformations
self.class_names, self.class_counts = {}, 0
self.data_info = sorted(glob(f"{root}/annotations/*.xml"))
for idx, data_info in enumerate(self.data_info):
info = xmltodict.parse(open(f"{data_info}","r").read())
bbox_info = info["annotation"]["object"]
if isinstance(bbox_info, list):
for bb_info in bbox_info:
if class_name not in self.class_names: self.class_names[class_name] = self.class_counts; self.class_counts += 1
elif isinstance(bbox_info, dict):
for bb_info in bbox_info:
class_name = bbox_info["name"]
if class_name not in self.class_names: self.class_names[class_name] = self.class_counts; self.class_counts += 1
def __len__(self): return len(self.data_info)
def get_info(self, idx): return xmltodict.parse(open(f"{self.data_info[idx]}","r").read())
def read_im(self, info): return Image.open(f"{self.root}/images/{info['annotation']['filename']}").convert("RGB")
def get_coordinates(self, bbox): return float(bbox["xmin"]), float(bbox["ymin"]), float(bbox["xmax"]), float(bbox["ymax"])
def get_bboxes(self, info):
bboxes, class_names = [], []
bbox_info = info["annotation"]["object"]
if isinstance(bbox_info, list):
for bb_info in bbox_info:
bbox = bb_info["bndbox"]
class_name = bb_info["name"]
class_names.append(class_name)
bboxes.append(self.get_coordinates(bbox))
elif isinstance(bbox_info, dict):
for bb_info in bbox_info:
if bb_info == "bndbox":
bbox = bbox_info[bb_info]
class_name = bbox_info["name"]
bboxes.append(self.get_coordinates(bbox))
class_names.append(class_name)
return torch.as_tensor(bboxes, dtype = torch.float32), class_names
def get_label(self, class_name): return self.class_names[class_name]
def get_area(self, bboxes): return (bboxes[:, 3] - bboxes[:, 1]) * (bboxes[:, 2] - bboxes[:, 0])
def create_target(self, bboxes, labels, rasm_id, area, is_crowd): target = {}; target["boxes"] = bboxes; target["labels"] = labels; target["image_id"] = rasm_id; target["area"] = area; target["iscrowd"] = is_crowd; return target
def __getitem__(self, idx):
info = self.get_info(idx)
im = self.read_im(info)
bboxes, class_names = self.get_bboxes(info)
labels = torch.tensor([self.get_label(class_name) for class_name in class_names], dtype = torch.int64)
rasm_id = torch.tensor([idx])
area = self.get_area(bboxes)
is_crowd = torch.zeros((len(bboxes), ), dtype = torch.int64)
target = self.create_target(bboxes, labels, rasm_id, area, is_crowd)
if self.transformations is not None: im, target = self.transformations(im, target)
return im, target
def custom_collate_fn(batch): return tuple(zip(*batch))
def get_dls(root, transformations, bs, split = [0.8, 0.1], ns = 4):
ds = ObjectDetectionCustomDataset(root = root, transformations = transformations)
all_len = len(ds); tr_len = int(all_len * split[0]); val_len = int(all_len * split[1])
tr_ds, val_ds, ts_ds = random_split(dataset = ds, lengths = [tr_len, val_len, all_len - tr_len - val_len])
tr_dl, val_dl, ts_dl = DataLoader(tr_ds, batch_size = bs, collate_fn = custom_collate_fn, shuffle = True, num_workers = ns), DataLoader(val_ds, batch_size = bs, collate_fn = custom_collate_fn, shuffle = False, num_workers = ns), DataLoader(ts_ds, batch_size = 1, collate_fn = custom_collate_fn, shuffle = False, num_workers = ns)
return tr_dl, val_dl, ts_dl, ds.class_names
ขั้นตอนที่ 3 สร้างเทรนส์ฟอร์เมชันและ เตรียมข้อมูลสำหรับใช้ Train โมเดล Object Detection
ข้อมูลเซ็ตรูปภาพ: Download
class Compose:
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, image, target):
for t in self.transforms:
image, target = t(image, target)
return image, target
class PILToTensor(torch.nn.Module):
def forward(
self, image: Tensor, target: Optional[Dict[str, Tensor]] = None
) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
image = F.pil_to_tensor(image)
return image, target
class Normalize(torch.nn.Module):
def forward(self, image: Tensor, target: Optional, mean: List[float] = [0.485, 0.456, 0.406], std: List[float] = [0.229, 0.224, 0.225]):
image = F.normalize(tensor = image, mean = mean, std = std)
return image, target
class ConvertImageDtype(torch.nn.Module):
def __init__(self, dtype: torch.dtype) -> None:
super().__init__()
self.dtype = dtype
def forward(
self, image: Tensor, target: Optional[Dict[str, Tensor]] = None
) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
image = F.convert_image_dtype(image, self.dtype)
return image, target
class RandomHorizontalFlip(T.RandomHorizontalFlip):
def forward(
self, image: Tensor, target: Optional[Dict[str, Tensor]] = None
) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
if torch.rand(1) < self.p:
image = F.hflip(image)
if target is not None:
_, _, width = F.get_dimensions(image)
if "masks" in target:
target["masks"] = target["masks"].flip(-1)
if "keypoints" in target:
keypoints = target["keypoints"]
keypoints = _flip_coco_person_keypoints(keypoints, width)
target["keypoints"] = keypoints
return image, target
def get_transform(train):
transformations = []
transformations.append(PILToTensor())
transformations.append(ConvertImageDtype(torch.float))
transformations.append(Normalize())
if train: transformations.append(RandomHorizontalFlip(0.5))
return Compose(transformations)
tfs = get_transform(train = False)
data_path = "/content/drive/My Drive/dataSet/archive"
tr_dl, val_dl, ts_dl, class_names = get_dls(root = data_path, transformations = tfs, bs = 4)
print(len(tr_dl), len(val_dl), len(ts_dl), class_names)
ขั้นตอนที่ 4 แปลง Tensor เป็นภาพ และแสดงผลภาพ 20 ภาพ
def tensor_2_im(t, t_type = "rgb", inv_trans = False):
assert t_type in ["rgb", "gray"], "Rasm RGB yoki grayscale ekanligini aniqlashtirib bering."
gray_tfs = T.Compose([T.Normalize(mean = [ 0.], std = [1/0.5]), T.Normalize(mean = [-0.5], std = [1])])
rgb_tfs = T.Compose([T.Normalize(mean = [ 0., 0., 0. ], std = [ 1/0.229, 1/0.224, 1/0.225 ]), T.Normalize(mean = [ -0.485, -0.456, -0.406 ], std = [ 1., 1., 1. ])])
invTrans = gray_tfs if t_type == "gray" else rgb_tfs
return (invTrans(t) * 255).detach().cpu().permute(1,2,0).numpy().astype(np.uint8) if inv_trans else (t * 255).detach().cpu().permute(1,2,0).numpy().astype(np.uint8)
def visualize(data, n_ims, rows, cmap = None):
assert cmap in ["rgb", "gray"], "Rasmni oq-qora yoki rangli ekanini aniqlashtirib bering!"
if cmap == "rgb": cmap = "viridis"
plt.figure(figsize = (20, 10))
indekslar = [random.randint(0, len(data) - 1) for _ in range(n_ims)]
for idx, indeks in enumerate(indekslar):
im, target = data[indeks]
img = tensor_2_im(im, inv_trans = True)
bbox_class_names = []
for i, cntr in enumerate(target["boxes"]):
r, g, b = [random.randint(0, 255) for _ in range(3)]
x, y, w, h = [round(c.item()) for c in cntr]
bbox_class_names.append(list(class_names.keys())[target["labels"][i].item()])
cv2.rectangle(img = img, pt1 = (x, y), pt2 = (w, h), color = (r, g, b), thickness = 2)
bbox_class_names = [bbox_class_name for bbox_class_name in bbox_class_names]
plt.subplot(rows, n_ims // rows, idx + 1)
plt.imshow(img); plt.title(f"{bbox_class_names}")
plt.axis("off")
visualize(tr_dl.dataset, 20, 4, "rgb")
ผลที่ได้จาก code
ขั้นตอนที่ 5 การเตรียมโมเดลสำหรับการ Fine-tuning
ขั้นตอนนี้เป็นการเตรียมโมเดล Faster R-CNN ด้วย ResNet-50-FPN สำหรับการ Fine-tuning เพื่อให้มีประสิทธิภาพต่อข้อมูลและคลาสที่เรามีใน class_names
m = fasterrcnn_resnet50_fpn(weights = "DEFAULT")
device, num_classes = "cuda", len(class_names)
# get number of input features for the classifier
in_features = m.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
m.roi_heads.box_predictor.cls_score = torch.nn.Linear(in_features, num_classes)
m.roi_heads.box_predictor.bbox_pred = torch.nn.Linear(in_features, num_classes * 4)
m.to(device)
optimizer = torch.optim.SGD(m.parameters(), lr = 0.001, momentum = 0.9, weight_decay = 0.0005)
# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 3, gamma = 0.1)
ขั้นตอนที่ 6 สร้าง Class CocoEvaluator สำหรับวัดผลโมเดล Object Detection
class CocoEvaluator:
def __init__(self, coco_gt, iou_types):
if not isinstance(iou_types, (list, tuple)):
raise TypeError(f"This constructor expects iou_types of type list or tuple, instead got {type(iou_types)}")
coco_gt = copy.deepcopy(coco_gt)
self.coco_gt = coco_gt
self.iou_types = iou_types
self.coco_eval = {}
for iou_type in iou_types:
self.coco_eval[iou_type] = COCOeval(coco_gt, iouType=iou_type)
self.img_ids = []
self.eval_imgs = {k: [] for k in iou_types}
def update(self, predictions):
img_ids = list(np.unique(list(predictions.keys())))
self.img_ids.extend(img_ids)
for iou_type in self.iou_types:
results = self.prepare(predictions, iou_type)
with redirect_stdout(io.StringIO()):
coco_dt = COCO.loadRes(self.coco_gt, results) if results else COCO()
coco_eval = self.coco_eval[iou_type]
coco_eval.cocoDt = coco_dt
coco_eval.params.imgIds = list(img_ids)
img_ids, eval_imgs = evaluate_coco(coco_eval)
self.eval_imgs[iou_type].append(eval_imgs)
def synchronize_between_processes(self):
for iou_type in self.iou_types:
self.eval_imgs[iou_type] = np.concatenate(self.eval_imgs[iou_type], 2)
create_common_coco_eval(self.coco_eval[iou_type], self.img_ids, self.eval_imgs[iou_type])
def accumulate(self):
for coco_eval in self.coco_eval.values():
coco_eval.accumulate()
def summarize(self):
for iou_type, coco_eval in self.coco_eval.items():
print(f"IoU metric: {iou_type}")
coco_eval.summarize()
def prepare(self, predictions, iou_type):
if iou_type == "bbox":
return self.prepare_for_coco_detection(predictions)
if iou_type == "segm":
return self.prepare_for_coco_segmentation(predictions)
if iou_type == "keypoints":
return self.prepare_for_coco_keypoint(predictions)
raise ValueError(f"Unknown iou type {iou_type}")
def prepare_for_coco_detection(self, predictions):
coco_results = []
for original_id, prediction in predictions.items():
if len(prediction) == 0:
continue
boxes = prediction["boxes"]
boxes = convert_to_xywh(boxes).tolist()
scores = prediction["scores"].tolist()
labels = prediction["labels"].tolist()
coco_results.extend(
[
{
"image_id": original_id,
"category_id": labels[k],
"bbox": box,
"score": scores[k],
}
for k, box in enumerate(boxes)
]
)
return coco_results
def prepare_for_coco_segmentation(self, predictions):
coco_results = []
for original_id, prediction in predictions.items():
if len(prediction) == 0:
continue
scores = prediction["scores"]
labels = prediction["labels"]
masks = prediction["masks"]
masks = masks > 0.5
scores = prediction["scores"].tolist()
labels = prediction["labels"].tolist()
rles = [
mask_util.encode(np.array(mask[0, :, :, np.newaxis], dtype=np.uint8, order="F"))[0] for mask in masks
]
for rle in rles:
rle["counts"] = rle["counts"].decode("utf-8")
coco_results.extend(
[
{
"image_id": original_id,
"category_id": labels[k],
"segmentation": rle,
"score": scores[k],
}
for k, rle in enumerate(rles)
]
)
return coco_results
def prepare_for_coco_keypoint(self, predictions):
coco_results = []
for original_id, prediction in predictions.items():
if len(prediction) == 0:
continue
boxes = prediction["boxes"]
boxes = convert_to_xywh(boxes).tolist()
scores = prediction["scores"].tolist()
labels = prediction["labels"].tolist()
keypoints = prediction["keypoints"]
keypoints = keypoints.flatten(start_dim=1).tolist()
coco_results.extend(
[
{
"image_id": original_id,
"category_id": labels[k],
"keypoints": keypoint,
"score": scores[k],
}
for k, keypoint in enumerate(keypoints)
]
)
return coco_results
def convert_to_xywh(boxes):
xmin, ymin, xmax, ymax = boxes.unbind(1)
return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1)
def merge(img_ids, eval_imgs):
all_img_ids = all_gather(img_ids)
all_eval_imgs = all_gather(eval_imgs)
merged_img_ids = []
for p in all_img_ids:
merged_img_ids.extend(p)
merged_eval_imgs = []
for p in all_eval_imgs:
merged_eval_imgs.append(p)
merged_img_ids = np.array(merged_img_ids)
merged_eval_imgs = np.concatenate(merged_eval_imgs, 2)
# keep only unique (and in sorted order) images
merged_img_ids, idx = np.unique(merged_img_ids, return_index=True)
merged_eval_imgs = merged_eval_imgs[..., idx]
return merged_img_ids, merged_eval_imgs
def create_common_coco_eval(coco_eval, img_ids, eval_imgs):
img_ids, eval_imgs = merge(img_ids, eval_imgs)
img_ids = list(img_ids)
eval_imgs = list(eval_imgs.flatten())
coco_eval.evalImgs = eval_imgs
coco_eval.params.imgIds = img_ids
coco_eval._paramsEval = copy.deepcopy(coco_eval.params)
def evaluate_coco(imgs):
with redirect_stdout(io.StringIO()):
imgs.evaluate()
return imgs.params.imgIds, np.asarray(imgs.evalImgs).reshape(-1, len(imgs.params.areaRng), len(imgs.params.imgIds))
def all_gather(data):
"""
Run all_gather on arbitrary picklable data (not necessarily tensors)
Args:
data: any picklable object
Returns:
list[data]: list of data gathered from each rank
"""
world_size = get_world_size()
if world_size == 1:
return [data]
data_list = [None] * world_size
dist.all_gather_object(data_list, data)
return data_list
ขั้นตอนที่ 7 เตรียมฟังก์ชั่นและคลาสที่ใช้ในการ Train โมเดล Object Detection
def convert_to_coco_api(ds):
coco_ds = COCO()
# annotation IDs need to start at 1, not 0, see torchvision issue #1530
ann_id = 1
dataset = {"images": [], "categories": [], "annotations": []}
categories = set()
for img_idx in range(len(ds)):
# find better way to get target
# targets = ds.get_annotations(img_idx)
img, targets = ds[img_idx]
image_id = targets["image_id"].item()
img_dict = {}
img_dict["id"] = image_id
img_dict["height"] = img.shape[-2]
img_dict["width"] = img.shape[-1]
dataset["images"].append(img_dict)
bboxes = targets["boxes"].clone()
bboxes[:, 2:] -= bboxes[:, :2]
bboxes = bboxes.tolist()
labels = targets["labels"].tolist()
areas = targets["area"].tolist()
iscrowd = targets["iscrowd"].tolist()
if "masks" in targets:
masks = targets["masks"]
# make masks Fortran contiguous for coco_mask
masks = masks.permute(0, 2, 1).contiguous().permute(0, 2, 1)
if "keypoints" in targets:
keypoints = targets["keypoints"]
keypoints = keypoints.reshape(keypoints.shape[0], -1).tolist()
num_objs = len(bboxes)
for i in range(num_objs):
ann = {}
ann["image_id"] = image_id
ann["bbox"] = bboxes[i]
ann["category_id"] = labels[i]
categories.add(labels[i])
ann["area"] = areas[i]
ann["iscrowd"] = iscrowd[i]
ann["id"] = ann_id
if "masks" in targets:
ann["segmentation"] = coco_mask.encode(masks[i].numpy())
if "keypoints" in targets:
ann["keypoints"] = keypoints[i]
ann["num_keypoints"] = sum(k != 0 for k in keypoints[i][2::3])
dataset["annotations"].append(ann)
ann_id += 1
dataset["categories"] = [{"id": i} for i in sorted(categories)]
coco_ds.dataset = dataset
coco_ds.createIndex()
return coco_ds
def get_coco_api_from_dataset(dataset):
for _ in range(10):
if isinstance(dataset, torchvision.datasets.CocoDetection):
break
if isinstance(dataset, torch.utils.data.Subset):
dataset = dataset.dataset
if isinstance(dataset, torchvision.datasets.CocoDetection):
return dataset.coco
return convert_to_coco_api(dataset)
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq, scaler = None):
model.train()
metric_logger = MetricLogger(delimiter=" | ")
metric_logger.add_meter("lr", SmoothedValue(window_size=1, fmt="{value:.6f}"))
header = f"{epoch + 1}-epochdagi "
lr_scheduler = None
if epoch == 0:
warmup_factor = 1.0 / 1000
warmup_iters = min(1000, len(data_loader) - 1)
lr_scheduler = torch.optim.lr_scheduler.LinearLR(
optimizer, start_factor=warmup_factor, total_iters=warmup_iters
)
for idx, (images, targets) in enumerate(metric_logger.log_every(data_loader, print_freq, header)):
images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets] # change here
# targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
with torch.cuda.amp.autocast(enabled=scaler is not None):
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
# reduce losses over all GPUs for logging purposes
loss_dict_reduced = reduce_dict(loss_dict)
losses_reduced = sum(loss for loss in loss_dict_reduced.values())
loss_value = losses_reduced.item()
if not math.isfinite(loss_value):
print(f"Loss {loss_value} bo'lgani uchun train jarayonini to'xtatamiz...")
print(loss_dict_reduced)
sys.exit(1)
optimizer.zero_grad()
if scaler is not None:
scaler.scale(losses).backward()
scaler.step(optimizer)
scaler.update()
else:
losses.backward()
optimizer.step()
if lr_scheduler is not None:
lr_scheduler.step()
metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
metric_logger.update(lr=optimizer.param_groups[0]["lr"])
return metric_logger
def _get_iou_types(model):
model_without_ddp = model
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
model_without_ddp = model.module
iou_types = ["bbox"]
if isinstance(model_without_ddp, torchvision.models.detection.MaskRCNN):
iou_types.append("segm")
if isinstance(model_without_ddp, torchvision.models.detection.KeypointRCNN):
iou_types.append("keypoints")
return iou_types
# @torch.inference_mode()
def evaluate(model, data_loader, device):
n_threads = torch.get_num_threads()
# FIXME remove this and make paste_masks_in_image run on the GPU
torch.set_num_threads(1)
cpu_device = torch.device("cpu")
model.eval()
metric_logger = MetricLogger(delimiter=" | ")
header = "Test jarayoni: "
coco = get_coco_api_from_dataset(data_loader.dataset)
iou_types = _get_iou_types(model)
coco_evaluator = CocoEvaluator(coco, iou_types)
for images, targets in metric_logger.log_every(data_loader, 100, header):
images = list(img.to(device) for img in images)
if torch.cuda.is_available(): torch.cuda.synchronize()
model_time = time.time()
outputs = model(images)
outputs = [{k: v.to(cpu_device) for k, v in t.items()} for t in outputs]
model_time = time.time() - model_time
res = {target["image_id"].item(): output for target, output in zip(targets, outputs)}
evaluator_time = time.time()
coco_evaluator.update(res)
evaluator_time = time.time() - evaluator_time
metric_logger.update(model_vaqt=model_time, baholash_vaqt=evaluator_time)
# gather the stats from all processes
metric_logger.synchronize_between_processes()
print(f"\nTest jarayoni statistikasi: {metric_logger}\n")
coco_evaluator.synchronize_between_processes()
# accumulate predictions from all images
coco_evaluator.accumulate()
coco_evaluator.summarize()
torch.set_num_threads(n_threads)
return coco_evaluator
class SmoothedValue:
"""Track a series of values and provide access to smoothed values over a
window or the global series average.
"""
def __init__(self, window_size=20, fmt=None):
if fmt is None:
fmt = "{median:.4f} ({global_avg:.4f})"
self.deque = deque(maxlen=window_size)
self.total = 0.0
self.count = 0
self.fmt = fmt
def update(self, value, n=1):
self.deque.append(value)
self.count += n
self.total += value * n
def synchronize_between_processes(self):
"""
Warning: does not synchronize the deque!
"""
if not is_dist_avail_and_initialized():
return
t = torch.tensor([self.count, self.total], dtype=torch.float64, device="cuda")
dist.barrier()
dist.all_reduce(t)
t = t.tolist()
self.count = int(t[0])
self.total = t[1]
@property
def median(self):
d = torch.tensor(list(self.deque))
return d.median().item()
@property
def avg(self):
d = torch.tensor(list(self.deque), dtype=torch.float32)
return d.mean().item()
@property
def global_avg(self):
return self.total / self.count
@property
def max(self):
return max(self.deque)
@property
def value(self):
return self.deque[-1]
def __str__(self):
return self.fmt.format(
median=self.median, avg=self.avg, global_avg=self.global_avg, max=self.max, value=self.value
)
class MetricLogger:
def __init__(self, delimiter=" | "):
self.meters = defaultdict(SmoothedValue)
self.delimiter = delimiter
def update(self, **kwargs):
for k, v in kwargs.items():
if isinstance(v, torch.Tensor):
v = v.item()
assert isinstance(v, (float, int))
self.meters[k].update(v)
def __getattr__(self, attr):
if attr in self.meters:
return self.meters[attr]
if attr in self.__dict__:
return self.__dict__[attr]
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{attr}'")
def __str__(self):
loss_str = []
for name, meter in self.meters.items():
loss_str.append(f"{name}: {str(meter)}")
return self.delimiter.join(loss_str)
def synchronize_between_processes(self):
for meter in self.meters.values():
meter.synchronize_between_processes()
def add_meter(self, name, meter):
self.meters[name] = meter
def log_every(self, iterable, print_freq, header=None):
i = 0
if not header:
header = ""
start_time = time.time()
end = time.time()
iter_time = SmoothedValue(fmt="{avg:.4f}")
data_time = SmoothedValue(fmt="{avg:.4f}")
space_fmt = ":" + str(len(str(len(iterable)))) + "d"
if torch.cuda.is_available():
init_msg = "".join(
[
f"\n{header}",
"{1}ta batchdan {0" + space_fmt +"}-sining natijasi: \n",
]
)
log_msg = self.delimiter.join(
[
"{meters}",
"vaqt -> {time}",
]
)
else:
log_msg = self.delimiter.join(
[header, "[{0" + space_fmt + "}/{1}]", "eta: {eta}", "{meters}", "vaqt: {vaqt}", "data: {data}"]
)
MB = 1024.0 * 1024.0
for obj in iterable:
data_time.update(time.time() - end)
yield obj
iter_time.update(time.time() - end)
if i % print_freq == 0 or i == len(iterable) - 1:
eta_seconds = iter_time.global_avg * (len(iterable) - i)
eta_string = str(datetime.timedelta(seconds=int(eta_seconds)))
if torch.cuda.is_available():
print(init_msg.format(i + 1, len(iterable)))
print(
log_msg.format(
meters=str(self),
time=str(iter_time),
)
)
else:
print(
log_msg.format(
i, len(iterable), eta=eta_string, meters=str(self), time=str(iter_time), data=str(data_time)
)
)
i += 1
end = time.time()
total_time = time.time() - start_time
total_time_str = str(datetime.timedelta(seconds=int(total_time)))
print(f"{header} umumiy vaqt: {total_time_str} ({total_time / len(iterable):.4f} s / rasm)")
def reduce_dict(input_dict, average=True):
"""
Args:
input_dict (dict): all the values will be reduced
average (bool): whether to do average or sum
Reduce the values in the dictionary from all processes so that all processes
have the averaged results. Returns a dict with the same fields as
input_dict, after reduction.
"""
world_size = get_world_size()
if world_size < 2:
return input_dict
with torch.inference_mode():
names = []
values = []
# sort the keys so that they are consistent across processes
for k in sorted(input_dict.keys()):
names.append(k)
values.append(input_dict[k])
values = torch.stack(values, dim=0)
dist.all_reduce(values)
if average:
values /= world_size
reduced_dict = {k: v for k, v in zip(names, values)}
return reduced_dict
def get_world_size():
if not is_dist_avail_and_initialized():
return 1
return dist.get_world_size()
def is_dist_avail_and_initialized():
if not dist.is_available():
return False
if not dist.is_initialized():
return False
return True
ขั้นตอนที่ 8 Train โมเดล
model_prefix = "road"
epochs = 15
save_model_path = "saved_models"
for epoch in range(epochs):
# train for one epoch, printing every 10 iterations
train_one_epoch(m, optimizer, tr_dl, device, epoch, print_freq = 10)
# update the learning rate
lr_scheduler.step()
# evaluate on the test dataset
evaluate(m, val_dl, device = device)
os.makedirs(f"{save_model_path}", exist_ok = True)
torch.save(m, f"{save_model_path}/{model_prefix}_best_model.pt")
print("Finish!")
ผลที่ได้
- สรุป การสร้าง Object Detection ด้วย PyTorch ซึ่งในตัวอย่างมีการ Train Model แค่ 15 epoch จึงเป็นกระบวนการที่มีประสิทธิภาพและยืดหยุ่น โดยการ Train ด้วยชุดข้อมูล COCO ช่วยเสริมสร้างความแม่นยำและการทำงานที่มีประสิทธิภาพในการตรวจจับวัตถุในภาพ
References
https://www.youtube.com/watch?v=PPpKlPYL95c
https://www.geeksforgeeks.org/implementation-of-a-cnn-based-image-classifier-using-pytorch/?ref=lbp
Posted on April 17, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 29, 2024