Skip to content

Detection

Loss

yolo.tasks.detection.loss

logger = logging.getLogger('yolo') module-attribute

Config dataclass

Source code in yolo/config/config.py
@dataclass
class Config:
    task: Union[TrainConfig, InferenceConfig, ValidationConfig]
    dataset: DatasetConfig
    model: ModelConfig
    name: str

    trainer: TrainerConfig

    image_size: List[int]

    out_path: str
    exist_ok: bool

    lucky_number: int
    use_wandb: bool
    use_tensorboard: bool

    task_type: str
    weight: Optional[str]

LossConfig dataclass

Source code in yolo/config/schemas/training.py
@dataclass
class LossConfig:
    objective: Dict[str, int]
    aux: Union[bool, float]
    matcher: MatcherConfig

BoxMatcher

Source code in yolo/tasks/detection/postprocess.py
class BoxMatcher:
    def __init__(self, cfg: MatcherConfig, class_num: int, vec2box, reg_max: int) -> None:
        self.class_num = class_num
        self.vec2box = vec2box
        self.reg_max = reg_max
        for attr_name in cfg:
            setattr(self, attr_name, cfg[attr_name])

    def get_valid_matrix(self, target_bbox: Tensor) -> Tensor:
        """
        Get a boolean mask that indicates whether each target bounding box overlaps with each anchor
        and is able to correctly predict it with the available reg_max value.

        Args:
            target_bbox (Tensor): The bounding box of each target, shape ``[batch, targets, 4]``.

        Returns:
            Tensor: Boolean mask of shape ``[batch, targets, anchors]`` — ``True`` where the
            target overlaps an anchor and the anchor can predict the target within ``reg_max``.
        """
        x_min, y_min, x_max, y_max = target_bbox[:, :, None].unbind(3)
        anchors = self.vec2box.anchor_grid[None, None]  # add a axis at first, second dimension
        anchors_x, anchors_y = anchors.unbind(dim=3)
        x_min_dist, x_max_dist = anchors_x - x_min, x_max - anchors_x
        y_min_dist, y_max_dist = anchors_y - y_min, y_max - anchors_y
        targets_dist = torch.stack((x_min_dist, y_min_dist, x_max_dist, y_max_dist), dim=-1)
        targets_dist /= self.vec2box.scaler[None, None, :, None]  # (1, 1, anchors, 1)
        min_reg_dist, max_reg_dist = targets_dist.amin(dim=-1), targets_dist.amax(dim=-1)
        target_on_anchor = min_reg_dist >= 0
        target_in_reg_max = max_reg_dist <= self.reg_max - 1.01
        return target_on_anchor & target_in_reg_max

    def get_cls_matrix(self, predict_cls: Tensor, target_cls: Tensor) -> Tensor:
        """
        Get the (predicted class' probabilities) corresponding to the target classes across all anchors

        Args:
            predict_cls (Tensor): Predicted class probabilities, shape ``[batch, anchors, classes]``.
            target_cls (Tensor): Ground-truth class indices, shape ``[batch, targets]``.

        Returns:
            Tensor: Class probabilities gathered for each target, shape ``[batch, targets, anchors]``.
        """
        predict_cls = predict_cls.transpose(1, 2)
        target_cls = target_cls.expand(-1, -1, predict_cls.size(2))
        cls_probabilities = torch.gather(predict_cls, 1, target_cls)
        return cls_probabilities

    def get_iou_matrix(self, predict_bbox, target_bbox) -> Tensor:
        """
        Get the IoU between each target bounding box and each predicted bounding box.

        Args:
            predict_bbox (Tensor): Predicted boxes in ``[x1, y1, x2, y2]`` format, shape ``[batch, predicts, 4]``.
            target_bbox (Tensor): Ground-truth boxes in ``[x1, y1, x2, y2]`` format, shape ``[batch, targets, 4]``.

        Returns:
            Tensor: IoU scores of shape ``[batch, targets, predicts]``.
        """
        return calculate_iou(target_bbox, predict_bbox, self.iou).clamp(0, 1)

    def filter_topk(self, target_matrix: Tensor, grid_mask: Tensor, topk: int = 10) -> Tuple[Tensor, Tensor]:
        """
        Filter the top-k suitability of targets for each anchor.

        Args:
            target_matrix (Tensor): Suitability scores, shape ``[batch, targets, anchors]``.
            grid_mask (Tensor): Validity mask, shape ``[batch, targets, anchors]``.
            topk (int): Number of top scores to retain per anchor.

        Returns:
            Tuple[Tensor, Tensor]: ``(topk_targets, topk_mask)`` both of shape
            ``[batch, targets, anchors]``.
        """
        masked_target_matrix = grid_mask * target_matrix
        values, indices = masked_target_matrix.topk(topk, dim=-1)
        topk_targets = torch.zeros_like(target_matrix, device=target_matrix.device)
        topk_targets.scatter_(dim=-1, index=indices, src=values)
        topk_mask = topk_targets > 0
        return topk_targets, topk_mask

    def ensure_one_anchor(self, target_matrix: Tensor, topk_mask: tensor) -> Tensor:
        """
        Ensures each valid target gets at least one anchor matched based on the unmasked target matrix,
        which enables an otherwise invalid match. This enables too small or too large targets to be
        learned as well, even if they can't be predicted perfectly.

        Args:
            target_matrix (Tensor): Suitability scores, shape ``[batch, targets, anchors]``.
            topk_mask (Tensor): Boolean top-k mask, shape ``[batch, targets, anchors]``.

        Returns:
            Tensor: Updated top-k mask of shape ``[batch, targets, anchors]``.
        """
        values, indices = target_matrix.max(dim=-1)
        best_anchor_mask = torch.zeros_like(target_matrix, dtype=torch.bool)
        best_anchor_mask.scatter_(-1, index=indices[..., None], src=~best_anchor_mask)
        matched_anchor_num = torch.sum(topk_mask, dim=-1)
        target_without_anchor = (matched_anchor_num == 0) & (values > 0)
        topk_mask = torch.where(target_without_anchor[..., None], best_anchor_mask, topk_mask)
        return topk_mask

    def filter_duplicates(self, iou_mat: Tensor, topk_mask: Tensor) -> Tuple[Tensor, Tensor, Tensor]:
        """
        Filter the maximum suitability target index of each anchor based on IoU.

        Args:
            iou_mat (Tensor): IoU scores, shape ``[batch, targets, anchors]``.
            topk_mask (Tensor): Boolean top-k mask, shape ``[batch, targets, anchors]``.

        Returns:
            Tuple[Tensor, Tensor, Tensor]: ``(unique_indices, valid_mask, topk_mask)`` with shapes
            ``[batch, anchors, 1]``, ``[batch, anchors]``, and ``[batch, targets, anchors]``.
        """
        duplicates = (topk_mask.sum(1, keepdim=True) > 1).repeat([1, topk_mask.size(1), 1])
        masked_iou_mat = topk_mask * iou_mat
        best_indices = masked_iou_mat.argmax(1)[:, None, :]
        best_target_mask = torch.zeros_like(duplicates, dtype=torch.bool)
        best_target_mask.scatter_(1, index=best_indices, src=~best_target_mask)
        topk_mask = torch.where(duplicates, best_target_mask, topk_mask)
        unique_indices = topk_mask.to(torch.uint8).argmax(dim=1)
        return unique_indices[..., None], topk_mask.any(dim=1), topk_mask

    def __call__(self, target: Tensor, predict: Tuple[Tensor]) -> Tuple[Tensor, Tensor]:
        """Matches each target to the most suitable anchor.
        1. For each anchor prediction, find the highest suitability targets.
        2. Match target to the best anchor.
        3. Noramlize the class probilities of targets.

        Args:
            target: The ground truth class and bounding box information
                as tensor of size [batch x targets x 5].
            predict: Tuple of predicted class and bounding box tensors.
                Class tensor is of size [batch x anchors x class]
                Bounding box tensor is of size [batch x anchors x 4].

        Returns:
            anchor_matched_targets: Tensor of size [batch x anchors x (class + 4)].
                A tensor assigning each target/gt to the best fitting anchor.
                The class probabilities are normalized.
            valid_mask: Bool tensor of shape [batch x anchors].
                True if a anchor has a target/gt assigned to it.
        """
        predict_cls, predict_bbox = predict

        # return if target has no gt information.
        n_targets = target.shape[1]
        if n_targets == 0:
            device = predict_bbox.device
            align_cls = torch.zeros_like(predict_cls, device=device)
            align_bbox = torch.zeros_like(predict_bbox, device=device)
            valid_mask = torch.zeros(predict_cls.shape[:2], dtype=bool, device=device)
            anchor_matched_targets = torch.cat([align_cls, align_bbox], dim=-1)
            return anchor_matched_targets, valid_mask

        target_cls, target_bbox = target.split([1, 4], dim=-1)  # B x N x (C B) -> B x N x C, B x N x B
        target_cls = target_cls.long().clamp(0)

        # get valid matrix (each gt appear in which anchor grid)
        grid_mask = self.get_valid_matrix(target_bbox)

        # get iou matrix (iou with each gt bbox and each predict anchor)
        iou_mat = self.get_iou_matrix(predict_bbox, target_bbox)

        # get cls matrix (cls prob with each gt class and each predict class)
        cls_mat = self.get_cls_matrix(predict_cls.sigmoid(), target_cls)

        target_matrix = (iou_mat ** self.factor["iou"]) * (cls_mat ** self.factor["cls"])

        # choose topk
        topk_targets, topk_mask = self.filter_topk(target_matrix, grid_mask, topk=self.topk)

        # match best anchor to valid targets without valid anchors
        topk_mask = self.ensure_one_anchor(target_matrix, topk_mask)

        # delete one anchor pred assign to mutliple gts
        unique_indices, valid_mask, topk_mask = self.filter_duplicates(iou_mat, topk_mask)

        align_bbox = torch.gather(target_bbox, 1, unique_indices.repeat(1, 1, 4))
        align_cls_indices = torch.gather(target_cls, 1, unique_indices)
        align_cls = torch.zeros_like(align_cls_indices, dtype=torch.bool).repeat(1, 1, self.class_num)
        align_cls.scatter_(-1, index=align_cls_indices, src=~align_cls)

        # normalize class ditribution
        iou_mat *= topk_mask
        target_matrix *= topk_mask
        max_target = target_matrix.amax(dim=-1, keepdim=True)
        max_iou = iou_mat.amax(dim=-1, keepdim=True)
        normalize_term = (target_matrix / (max_target + 1e-9)) * max_iou
        normalize_term = normalize_term.permute(0, 2, 1).gather(2, unique_indices)
        align_cls = align_cls * normalize_term * valid_mask[:, :, None]
        anchor_matched_targets = torch.cat([align_cls, align_bbox], dim=-1)
        return anchor_matched_targets, valid_mask

get_valid_matrix(target_bbox)

Get a boolean mask that indicates whether each target bounding box overlaps with each anchor and is able to correctly predict it with the available reg_max value.

Parameters:

Name Type Description Default
target_bbox Tensor

The bounding box of each target, shape [batch, targets, 4].

required

Returns:

Name Type Description
Tensor Tensor

Boolean mask of shape [batch, targets, anchors] — True where the

Tensor

target overlaps an anchor and the anchor can predict the target within reg_max.

Source code in yolo/tasks/detection/postprocess.py
def get_valid_matrix(self, target_bbox: Tensor) -> Tensor:
    """
    Get a boolean mask that indicates whether each target bounding box overlaps with each anchor
    and is able to correctly predict it with the available reg_max value.

    Args:
        target_bbox (Tensor): The bounding box of each target, shape ``[batch, targets, 4]``.

    Returns:
        Tensor: Boolean mask of shape ``[batch, targets, anchors]`` — ``True`` where the
        target overlaps an anchor and the anchor can predict the target within ``reg_max``.
    """
    x_min, y_min, x_max, y_max = target_bbox[:, :, None].unbind(3)
    anchors = self.vec2box.anchor_grid[None, None]  # add a axis at first, second dimension
    anchors_x, anchors_y = anchors.unbind(dim=3)
    x_min_dist, x_max_dist = anchors_x - x_min, x_max - anchors_x
    y_min_dist, y_max_dist = anchors_y - y_min, y_max - anchors_y
    targets_dist = torch.stack((x_min_dist, y_min_dist, x_max_dist, y_max_dist), dim=-1)
    targets_dist /= self.vec2box.scaler[None, None, :, None]  # (1, 1, anchors, 1)
    min_reg_dist, max_reg_dist = targets_dist.amin(dim=-1), targets_dist.amax(dim=-1)
    target_on_anchor = min_reg_dist >= 0
    target_in_reg_max = max_reg_dist <= self.reg_max - 1.01
    return target_on_anchor & target_in_reg_max

get_cls_matrix(predict_cls, target_cls)

Get the (predicted class' probabilities) corresponding to the target classes across all anchors

Parameters:

Name Type Description Default
predict_cls Tensor

Predicted class probabilities, shape [batch, anchors, classes].

required
target_cls Tensor

Ground-truth class indices, shape [batch, targets].

required

Returns:

Name Type Description
Tensor Tensor

Class probabilities gathered for each target, shape [batch, targets, anchors].

Source code in yolo/tasks/detection/postprocess.py
def get_cls_matrix(self, predict_cls: Tensor, target_cls: Tensor) -> Tensor:
    """
    Get the (predicted class' probabilities) corresponding to the target classes across all anchors

    Args:
        predict_cls (Tensor): Predicted class probabilities, shape ``[batch, anchors, classes]``.
        target_cls (Tensor): Ground-truth class indices, shape ``[batch, targets]``.

    Returns:
        Tensor: Class probabilities gathered for each target, shape ``[batch, targets, anchors]``.
    """
    predict_cls = predict_cls.transpose(1, 2)
    target_cls = target_cls.expand(-1, -1, predict_cls.size(2))
    cls_probabilities = torch.gather(predict_cls, 1, target_cls)
    return cls_probabilities

get_iou_matrix(predict_bbox, target_bbox)

Get the IoU between each target bounding box and each predicted bounding box.

Parameters:

Name Type Description Default
predict_bbox Tensor

Predicted boxes in [x1, y1, x2, y2] format, shape [batch, predicts, 4].

required
target_bbox Tensor

Ground-truth boxes in [x1, y1, x2, y2] format, shape [batch, targets, 4].

required

Returns:

Name Type Description
Tensor Tensor

IoU scores of shape [batch, targets, predicts].

Source code in yolo/tasks/detection/postprocess.py
def get_iou_matrix(self, predict_bbox, target_bbox) -> Tensor:
    """
    Get the IoU between each target bounding box and each predicted bounding box.

    Args:
        predict_bbox (Tensor): Predicted boxes in ``[x1, y1, x2, y2]`` format, shape ``[batch, predicts, 4]``.
        target_bbox (Tensor): Ground-truth boxes in ``[x1, y1, x2, y2]`` format, shape ``[batch, targets, 4]``.

    Returns:
        Tensor: IoU scores of shape ``[batch, targets, predicts]``.
    """
    return calculate_iou(target_bbox, predict_bbox, self.iou).clamp(0, 1)

filter_topk(target_matrix, grid_mask, topk=10)

Filter the top-k suitability of targets for each anchor.

Parameters:

Name Type Description Default
target_matrix Tensor

Suitability scores, shape [batch, targets, anchors].

required
grid_mask Tensor

Validity mask, shape [batch, targets, anchors].

required
topk int

Number of top scores to retain per anchor.

10

Returns:

Type Description
Tensor

Tuple[Tensor, Tensor]: (topk_targets, topk_mask) both of shape

Tensor

[batch, targets, anchors].

Source code in yolo/tasks/detection/postprocess.py
def filter_topk(self, target_matrix: Tensor, grid_mask: Tensor, topk: int = 10) -> Tuple[Tensor, Tensor]:
    """
    Filter the top-k suitability of targets for each anchor.

    Args:
        target_matrix (Tensor): Suitability scores, shape ``[batch, targets, anchors]``.
        grid_mask (Tensor): Validity mask, shape ``[batch, targets, anchors]``.
        topk (int): Number of top scores to retain per anchor.

    Returns:
        Tuple[Tensor, Tensor]: ``(topk_targets, topk_mask)`` both of shape
        ``[batch, targets, anchors]``.
    """
    masked_target_matrix = grid_mask * target_matrix
    values, indices = masked_target_matrix.topk(topk, dim=-1)
    topk_targets = torch.zeros_like(target_matrix, device=target_matrix.device)
    topk_targets.scatter_(dim=-1, index=indices, src=values)
    topk_mask = topk_targets > 0
    return topk_targets, topk_mask

ensure_one_anchor(target_matrix, topk_mask)

Ensures each valid target gets at least one anchor matched based on the unmasked target matrix, which enables an otherwise invalid match. This enables too small or too large targets to be learned as well, even if they can't be predicted perfectly.

Parameters:

Name Type Description Default
target_matrix Tensor

Suitability scores, shape [batch, targets, anchors].

required
topk_mask Tensor

Boolean top-k mask, shape [batch, targets, anchors].

required

Returns:

Name Type Description
Tensor Tensor

Updated top-k mask of shape [batch, targets, anchors].

Source code in yolo/tasks/detection/postprocess.py
def ensure_one_anchor(self, target_matrix: Tensor, topk_mask: tensor) -> Tensor:
    """
    Ensures each valid target gets at least one anchor matched based on the unmasked target matrix,
    which enables an otherwise invalid match. This enables too small or too large targets to be
    learned as well, even if they can't be predicted perfectly.

    Args:
        target_matrix (Tensor): Suitability scores, shape ``[batch, targets, anchors]``.
        topk_mask (Tensor): Boolean top-k mask, shape ``[batch, targets, anchors]``.

    Returns:
        Tensor: Updated top-k mask of shape ``[batch, targets, anchors]``.
    """
    values, indices = target_matrix.max(dim=-1)
    best_anchor_mask = torch.zeros_like(target_matrix, dtype=torch.bool)
    best_anchor_mask.scatter_(-1, index=indices[..., None], src=~best_anchor_mask)
    matched_anchor_num = torch.sum(topk_mask, dim=-1)
    target_without_anchor = (matched_anchor_num == 0) & (values > 0)
    topk_mask = torch.where(target_without_anchor[..., None], best_anchor_mask, topk_mask)
    return topk_mask

filter_duplicates(iou_mat, topk_mask)

Filter the maximum suitability target index of each anchor based on IoU.

Parameters:

Name Type Description Default
iou_mat Tensor

IoU scores, shape [batch, targets, anchors].

required
topk_mask Tensor

Boolean top-k mask, shape [batch, targets, anchors].

required

Returns:

Type Description
Tensor

Tuple[Tensor, Tensor, Tensor]: (unique_indices, valid_mask, topk_mask) with shapes

Tensor

[batch, anchors, 1], [batch, anchors], and [batch, targets, anchors].

Source code in yolo/tasks/detection/postprocess.py
def filter_duplicates(self, iou_mat: Tensor, topk_mask: Tensor) -> Tuple[Tensor, Tensor, Tensor]:
    """
    Filter the maximum suitability target index of each anchor based on IoU.

    Args:
        iou_mat (Tensor): IoU scores, shape ``[batch, targets, anchors]``.
        topk_mask (Tensor): Boolean top-k mask, shape ``[batch, targets, anchors]``.

    Returns:
        Tuple[Tensor, Tensor, Tensor]: ``(unique_indices, valid_mask, topk_mask)`` with shapes
        ``[batch, anchors, 1]``, ``[batch, anchors]``, and ``[batch, targets, anchors]``.
    """
    duplicates = (topk_mask.sum(1, keepdim=True) > 1).repeat([1, topk_mask.size(1), 1])
    masked_iou_mat = topk_mask * iou_mat
    best_indices = masked_iou_mat.argmax(1)[:, None, :]
    best_target_mask = torch.zeros_like(duplicates, dtype=torch.bool)
    best_target_mask.scatter_(1, index=best_indices, src=~best_target_mask)
    topk_mask = torch.where(duplicates, best_target_mask, topk_mask)
    unique_indices = topk_mask.to(torch.uint8).argmax(dim=1)
    return unique_indices[..., None], topk_mask.any(dim=1), topk_mask

__call__(target, predict)

Matches each target to the most suitable anchor. 1. For each anchor prediction, find the highest suitability targets. 2. Match target to the best anchor. 3. Noramlize the class probilities of targets.

Parameters:

Name Type Description Default
target Tensor

The ground truth class and bounding box information as tensor of size [batch x targets x 5].

required
predict Tuple[Tensor]

Tuple of predicted class and bounding box tensors. Class tensor is of size [batch x anchors x class] Bounding box tensor is of size [batch x anchors x 4].

required

Returns:

Name Type Description
anchor_matched_targets Tensor

Tensor of size [batch x anchors x (class + 4)]. A tensor assigning each target/gt to the best fitting anchor. The class probabilities are normalized.

valid_mask Tensor

Bool tensor of shape [batch x anchors]. True if a anchor has a target/gt assigned to it.

Source code in yolo/tasks/detection/postprocess.py
def __call__(self, target: Tensor, predict: Tuple[Tensor]) -> Tuple[Tensor, Tensor]:
    """Matches each target to the most suitable anchor.
    1. For each anchor prediction, find the highest suitability targets.
    2. Match target to the best anchor.
    3. Noramlize the class probilities of targets.

    Args:
        target: The ground truth class and bounding box information
            as tensor of size [batch x targets x 5].
        predict: Tuple of predicted class and bounding box tensors.
            Class tensor is of size [batch x anchors x class]
            Bounding box tensor is of size [batch x anchors x 4].

    Returns:
        anchor_matched_targets: Tensor of size [batch x anchors x (class + 4)].
            A tensor assigning each target/gt to the best fitting anchor.
            The class probabilities are normalized.
        valid_mask: Bool tensor of shape [batch x anchors].
            True if a anchor has a target/gt assigned to it.
    """
    predict_cls, predict_bbox = predict

    # return if target has no gt information.
    n_targets = target.shape[1]
    if n_targets == 0:
        device = predict_bbox.device
        align_cls = torch.zeros_like(predict_cls, device=device)
        align_bbox = torch.zeros_like(predict_bbox, device=device)
        valid_mask = torch.zeros(predict_cls.shape[:2], dtype=bool, device=device)
        anchor_matched_targets = torch.cat([align_cls, align_bbox], dim=-1)
        return anchor_matched_targets, valid_mask

    target_cls, target_bbox = target.split([1, 4], dim=-1)  # B x N x (C B) -> B x N x C, B x N x B
    target_cls = target_cls.long().clamp(0)

    # get valid matrix (each gt appear in which anchor grid)
    grid_mask = self.get_valid_matrix(target_bbox)

    # get iou matrix (iou with each gt bbox and each predict anchor)
    iou_mat = self.get_iou_matrix(predict_bbox, target_bbox)

    # get cls matrix (cls prob with each gt class and each predict class)
    cls_mat = self.get_cls_matrix(predict_cls.sigmoid(), target_cls)

    target_matrix = (iou_mat ** self.factor["iou"]) * (cls_mat ** self.factor["cls"])

    # choose topk
    topk_targets, topk_mask = self.filter_topk(target_matrix, grid_mask, topk=self.topk)

    # match best anchor to valid targets without valid anchors
    topk_mask = self.ensure_one_anchor(target_matrix, topk_mask)

    # delete one anchor pred assign to mutliple gts
    unique_indices, valid_mask, topk_mask = self.filter_duplicates(iou_mat, topk_mask)

    align_bbox = torch.gather(target_bbox, 1, unique_indices.repeat(1, 1, 4))
    align_cls_indices = torch.gather(target_cls, 1, unique_indices)
    align_cls = torch.zeros_like(align_cls_indices, dtype=torch.bool).repeat(1, 1, self.class_num)
    align_cls.scatter_(-1, index=align_cls_indices, src=~align_cls)

    # normalize class ditribution
    iou_mat *= topk_mask
    target_matrix *= topk_mask
    max_target = target_matrix.amax(dim=-1, keepdim=True)
    max_iou = iou_mat.amax(dim=-1, keepdim=True)
    normalize_term = (target_matrix / (max_target + 1e-9)) * max_iou
    normalize_term = normalize_term.permute(0, 2, 1).gather(2, unique_indices)
    align_cls = align_cls * normalize_term * valid_mask[:, :, None]
    anchor_matched_targets = torch.cat([align_cls, align_bbox], dim=-1)
    return anchor_matched_targets, valid_mask

Vec2Box

Source code in yolo/tasks/detection/postprocess.py
class Vec2Box:
    def __init__(self, model: YOLO, anchor_cfg: AnchorConfig, image_size, device):
        self.device = device

        if hasattr(anchor_cfg, "strides"):
            logger.info(f":japanese_not_free_of_charge_button: Found stride of model {anchor_cfg.strides}")
            self.strides = anchor_cfg.strides
        else:
            logger.info(":teddy_bear: Found no stride of model, performed a dummy test for auto-anchor size")
            self.strides = self.create_auto_anchor(model, image_size)

        anchor_grid, scaler = generate_anchors(image_size, self.strides)
        self.image_size = image_size
        self.anchor_grid, self.scaler = anchor_grid.to(device), scaler.to(device)

    def create_auto_anchor(self, model: YOLO, image_size):
        W, H = image_size
        # TODO: need accelerate dummy test
        device = next(model.parameters()).device
        dummy_input = torch.zeros(1, 3, H, W, device=device)
        dummy_output = model(dummy_input)
        strides = []
        for predict_head in dummy_output["Main"]:
            _, _, *anchor_num = predict_head[2].shape
            strides.append(W // anchor_num[1])
        return strides

    def update(self, image_size):
        """
        image_size: W, H
        """
        if self.image_size == image_size:
            return
        anchor_grid, scaler = generate_anchors(image_size, self.strides)
        self.image_size = image_size
        self.anchor_grid, self.scaler = anchor_grid.to(self.device), scaler.to(self.device)

    def __call__(self, predicts):
        preds_cls, preds_anc, preds_box = [], [], []
        for layer_output in predicts:
            pred_cls, pred_anc, pred_box = layer_output
            preds_cls.append(rearrange(pred_cls, "B C h w -> B (h w) C"))
            preds_anc.append(rearrange(pred_anc, "B A R h w -> B (h w) R A"))
            preds_box.append(rearrange(pred_box, "B X h w -> B (h w) X"))
        preds_cls = torch.concat(preds_cls, dim=1)
        preds_anc = torch.concat(preds_anc, dim=1)
        preds_box = torch.concat(preds_box, dim=1)

        pred_LTRB = preds_box * self.scaler.view(1, -1, 1)
        lt, rb = pred_LTRB.chunk(2, dim=-1)
        preds_box = torch.cat([self.anchor_grid - lt, self.anchor_grid + rb], dim=-1)
        return preds_cls, preds_anc, preds_box

update(image_size)

image_size: W, H

Source code in yolo/tasks/detection/postprocess.py
def update(self, image_size):
    """
    image_size: W, H
    """
    if self.image_size == image_size:
        return
    anchor_grid, scaler = generate_anchors(image_size, self.strides)
    self.image_size = image_size
    self.anchor_grid, self.scaler = anchor_grid.to(self.device), scaler.to(self.device)

BCELoss

Bases: Module

Source code in yolo/tasks/detection/loss.py
class BCELoss(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        # TODO: Refactor the device, should be assign by config
        # TODO: origin v9 assing pos_weight == 1?
        self.bce = BCEWithLogitsLoss(reduction="none")

    def forward(self, predicts_cls: Tensor, targets_cls: Tensor, cls_norm: Tensor) -> Any:
        return self.bce(predicts_cls, targets_cls).sum() / cls_norm

BoxLoss

Bases: Module

Source code in yolo/tasks/detection/loss.py
class BoxLoss(nn.Module):
    def __init__(self) -> None:
        super().__init__()

    def forward(
        self, predicts_bbox: Tensor, targets_bbox: Tensor, valid_masks: Tensor, box_norm: Tensor, cls_norm: Tensor
    ) -> Any:
        valid_bbox = valid_masks[..., None].expand(-1, -1, 4)
        picked_predict = predicts_bbox[valid_bbox].view(-1, 4)
        picked_targets = targets_bbox[valid_bbox].view(-1, 4)

        iou = calculate_iou(picked_predict, picked_targets, "ciou").diag()
        loss_iou = 1.0 - iou
        loss_iou = (loss_iou * box_norm).sum() / cls_norm
        return loss_iou

DFLoss

Bases: Module

Source code in yolo/tasks/detection/loss.py
class DFLoss(nn.Module):
    def __init__(self, vec2box: Vec2Box, reg_max: int) -> None:
        super().__init__()
        self.anchors_norm = (vec2box.anchor_grid / vec2box.scaler[:, None])[None]
        self.reg_max = reg_max

    def forward(
        self, predicts_anc: Tensor, targets_bbox: Tensor, valid_masks: Tensor, box_norm: Tensor, cls_norm: Tensor
    ) -> Any:
        valid_bbox = valid_masks[..., None].expand(-1, -1, 4)
        bbox_lt, bbox_rb = targets_bbox.chunk(2, -1)
        targets_dist = torch.cat(((self.anchors_norm - bbox_lt), (bbox_rb - self.anchors_norm)), -1).clamp(
            0, self.reg_max - 1.01
        )
        picked_targets = targets_dist[valid_bbox].view(-1)
        picked_predict = predicts_anc[valid_bbox].view(-1, self.reg_max)

        label_left, label_right = picked_targets.floor(), picked_targets.floor() + 1
        weight_left, weight_right = label_right - picked_targets, picked_targets - label_left

        loss_left = F.cross_entropy(picked_predict, label_left.to(torch.long), reduction="none")
        loss_right = F.cross_entropy(picked_predict, label_right.to(torch.long), reduction="none")
        loss_dfl = loss_left * weight_left + loss_right * weight_right
        loss_dfl = loss_dfl.view(-1, 4).mean(-1)
        loss_dfl = (loss_dfl * box_norm).sum() / cls_norm
        return loss_dfl

YOLOLoss

Source code in yolo/tasks/detection/loss.py
class YOLOLoss:
    def __init__(self, loss_cfg: LossConfig, vec2box: Vec2Box, class_num: int = 80, reg_max: int = 16) -> None:
        self.class_num = class_num
        self.vec2box = vec2box

        self.cls = BCELoss()
        self.dfl = DFLoss(vec2box, reg_max)
        self.iou = BoxLoss()

        self.matcher = BoxMatcher(loss_cfg.matcher, self.class_num, vec2box, reg_max)

    def separate_anchor(self, anchors):
        """
        separate anchor and bbouding box
        """
        anchors_cls, anchors_box = torch.split(anchors, (self.class_num, 4), dim=-1)
        anchors_box = anchors_box / self.vec2box.scaler[None, :, None]
        return anchors_cls, anchors_box

    def __call__(self, predicts: List[Tensor], targets: Tensor) -> Tuple[Tensor, Tensor, Tensor]:
        predicts_cls, predicts_anc, predicts_box = predicts
        # For each predicted targets, assign a best suitable ground truth box.
        align_targets, valid_masks = self.matcher(targets, (predicts_cls.detach(), predicts_box.detach()))

        targets_cls, targets_bbox = self.separate_anchor(align_targets)
        predicts_box = predicts_box / self.vec2box.scaler[None, :, None]

        cls_norm = max(targets_cls.sum(), 1)
        box_norm = targets_cls.sum(-1)[valid_masks]

        ## -- CLS -- ##
        loss_cls = self.cls(predicts_cls, targets_cls, cls_norm)
        ## -- IOU -- ##
        loss_iou = self.iou(predicts_box, targets_bbox, valid_masks, box_norm, cls_norm)
        ## -- DFL -- ##
        loss_dfl = self.dfl(predicts_anc, targets_bbox, valid_masks, box_norm, cls_norm)

        return loss_iou, loss_dfl, loss_cls

separate_anchor(anchors)

separate anchor and bbouding box

Source code in yolo/tasks/detection/loss.py
def separate_anchor(self, anchors):
    """
    separate anchor and bbouding box
    """
    anchors_cls, anchors_box = torch.split(anchors, (self.class_num, 4), dim=-1)
    anchors_box = anchors_box / self.vec2box.scaler[None, :, None]
    return anchors_cls, anchors_box

DualLoss

Source code in yolo/tasks/detection/loss.py
class DualLoss:
    def __init__(self, cfg: Config, vec2box) -> None:
        loss_cfg = cfg.task.loss
        self.loss = YOLOLoss(loss_cfg, vec2box, class_num=cfg.dataset.class_num, reg_max=cfg.model.anchor.reg_max)

        self.aux_rate = loss_cfg.aux

        self.iou_rate = loss_cfg.objective["BoxLoss"]
        self.dfl_rate = loss_cfg.objective["DFLoss"]
        self.cls_rate = loss_cfg.objective["BCELoss"]

    def __call__(
        self, aux_predicts: List[Tensor], main_predicts: List[Tensor], targets: Tensor
    ) -> Tuple[Tensor, Dict[str, float]]:
        # TODO: Need Refactor this region, make it flexible!
        aux_iou, aux_dfl, aux_cls = self.loss(aux_predicts, targets)
        main_iou, main_dfl, main_cls = self.loss(main_predicts, targets)

        total_loss = [
            self.iou_rate * (aux_iou * self.aux_rate + main_iou),
            self.dfl_rate * (aux_dfl * self.aux_rate + main_dfl),
            self.cls_rate * (aux_cls * self.aux_rate + main_cls),
        ]
        loss_dict = {
            f"Loss/{name}Loss": value.detach().item() for name, value in zip(["Box", "DFL", "BCE"], total_loss)
        }
        return sum(total_loss), loss_dict

calculate_iou(bbox1, bbox2, metrics='iou')

Source code in yolo/tasks/detection/postprocess.py
def calculate_iou(bbox1, bbox2, metrics="iou") -> Tensor:
    metrics = metrics.lower()
    EPS = 1e-7
    dtype = bbox1.dtype
    bbox1 = bbox1.to(torch.float32)
    bbox2 = bbox2.to(torch.float32)

    # Expand dimensions if necessary
    if bbox1.ndim == 2 and bbox2.ndim == 2:
        bbox1 = bbox1.unsqueeze(1)  # (Ax4) -> (Ax1x4)
        bbox2 = bbox2.unsqueeze(0)  # (Bx4) -> (1xBx4)
    elif bbox1.ndim == 3 and bbox2.ndim == 3:
        bbox1 = bbox1.unsqueeze(2)  # (BZxAx4) -> (BZxAx1x4)
        bbox2 = bbox2.unsqueeze(1)  # (BZxBx4) -> (BZx1xBx4)

    # Calculate intersection coordinates
    xmin_inter = torch.max(bbox1[..., 0], bbox2[..., 0])
    ymin_inter = torch.max(bbox1[..., 1], bbox2[..., 1])
    xmax_inter = torch.min(bbox1[..., 2], bbox2[..., 2])
    ymax_inter = torch.min(bbox1[..., 3], bbox2[..., 3])

    # Calculate intersection area
    intersection_area = torch.clamp(xmax_inter - xmin_inter, min=0) * torch.clamp(ymax_inter - ymin_inter, min=0)

    # Calculate area of each bbox
    area_bbox1 = (bbox1[..., 2] - bbox1[..., 0]) * (bbox1[..., 3] - bbox1[..., 1])
    area_bbox2 = (bbox2[..., 2] - bbox2[..., 0]) * (bbox2[..., 3] - bbox2[..., 1])

    # Calculate union area
    union_area = area_bbox1 + area_bbox2 - intersection_area

    # Calculate IoU
    iou = intersection_area / (union_area + EPS)
    if metrics == "iou":
        return iou.to(dtype)

    # Calculate centroid distance
    cx1 = (bbox1[..., 2] + bbox1[..., 0]) / 2
    cy1 = (bbox1[..., 3] + bbox1[..., 1]) / 2
    cx2 = (bbox2[..., 2] + bbox2[..., 0]) / 2
    cy2 = (bbox2[..., 3] + bbox2[..., 1]) / 2
    cent_dis = (cx1 - cx2) ** 2 + (cy1 - cy2) ** 2

    # Calculate diagonal length of the smallest enclosing box
    c_x = torch.max(bbox1[..., 2], bbox2[..., 2]) - torch.min(bbox1[..., 0], bbox2[..., 0])
    c_y = torch.max(bbox1[..., 3], bbox2[..., 3]) - torch.min(bbox1[..., 1], bbox2[..., 1])
    diag_dis = c_x**2 + c_y**2 + EPS

    diou = iou - (cent_dis / diag_dis)
    if metrics == "diou":
        return diou.to(dtype)

    # Compute aspect ratio penalty term
    arctan = torch.atan((bbox1[..., 2] - bbox1[..., 0]) / (bbox1[..., 3] - bbox1[..., 1] + EPS)) - torch.atan(
        (bbox2[..., 2] - bbox2[..., 0]) / (bbox2[..., 3] - bbox2[..., 1] + EPS)
    )
    v = (4 / (math.pi**2)) * (arctan**2)
    with torch.no_grad():
        alpha = v / (v - iou + 1 + EPS)
    # Compute CIoU
    ciou = diou - alpha * v
    return ciou.to(dtype)

create_loss_function(cfg, vec2box)

Source code in yolo/tasks/detection/loss.py
def create_loss_function(cfg: Config, vec2box) -> DualLoss:
    # TODO: make it flexible, if cfg doesn't contain aux, only use SingleLoss
    loss_function = DualLoss(cfg, vec2box)
    logger.info(":white_check_mark: Success load loss function")
    return loss_function

Post-processing

yolo.tasks.detection.postprocess

logger = logging.getLogger('yolo') module-attribute

AnchorConfig dataclass

Source code in yolo/config/schemas/model.py
@dataclass
class AnchorConfig:
    strides: List[int]
    reg_max: Optional[int]
    anchor_num: Optional[int]
    anchor: List[List[int]]

MatcherConfig dataclass

Source code in yolo/config/schemas/training.py
@dataclass
class MatcherConfig:
    iou: str
    topk: int
    factor: Dict[str, int]

NMSConfig dataclass

Source code in yolo/config/schemas/task.py
@dataclass
class NMSConfig:
    min_confidence: float
    min_iou: float
    max_bbox: int

YOLO

Bases: Module

A preliminary YOLO (You Only Look Once) model class still under development.

Parameters:

Name Type Description Default
model_cfg ModelConfig

Configuration for the YOLO model. Expected to define the layers, parameters, and any other relevant configuration details.

required
Source code in yolo/model/builder.py
class YOLO(nn.Module):
    """
    A preliminary YOLO (You Only Look Once) model class still under development.

    Parameters:
        model_cfg: Configuration for the YOLO model. Expected to define the layers,
                   parameters, and any other relevant configuration details.
    """

    def __init__(self, model_cfg: ModelConfig, class_num: int = 80):
        super(YOLO, self).__init__()
        self.num_classes = class_num
        self.layer_map = get_layer_map()  # Get the map Dict[str: Module]
        self.model: List[YOLOLayer] = nn.ModuleList()
        self.reg_max = getattr(model_cfg.anchor, "reg_max", 16)
        self.build_model(model_cfg.model)

    def build_model(self, model_arch: Dict[str, List[Dict[str, Dict[str, Dict]]]]):
        self.layer_index = {}
        output_dim, layer_idx = [3], 1
        logger.info(f":tractor: Building YOLO")
        for arch_name in model_arch:
            if model_arch[arch_name]:
                logger.info(f"  :building_construction:  Building {arch_name}")
            for layer_idx, layer_spec in enumerate(model_arch[arch_name], start=layer_idx):
                layer_type, layer_info = next(iter(layer_spec.items()))
                layer_args = layer_info.get("args", {})

                # Get input source
                source = self.get_source_idx(layer_info.get("source", -1), layer_idx)

                # Find in channels
                if any(module in layer_type for module in ["Conv", "ELAN", "ADown", "AConv", "CBLinear"]):
                    layer_args["in_channels"] = output_dim[source]
                if any(module in layer_type for module in ["Detection", "Segmentation", "Classification"]):
                    if isinstance(source, list):
                        layer_args["in_channels"] = [output_dim[idx] for idx in source]
                    else:
                        layer_args["in_channel"] = output_dim[source]
                    layer_args["num_classes"] = self.num_classes
                    layer_args["reg_max"] = self.reg_max

                # create layers
                layer = self.create_layer(layer_type, source, layer_info, **layer_args)
                self.model.append(layer)

                if layer.tags:
                    if layer.tags in self.layer_index:
                        raise ValueError(f"Duplicate tag '{layer_info['tags']}' found.")
                    self.layer_index[layer.tags] = layer_idx

                out_channels = self.get_out_channels(layer_type, layer_args, output_dim, source)
                output_dim.append(out_channels)
                setattr(layer, "out_c", out_channels)
            layer_idx += 1

    def forward(self, x, external: Optional[Dict] = None, shortcut: Optional[str] = None):
        y = {0: x, **(external or {})}
        output = dict()
        for index, layer in enumerate(self.model, start=1):
            if isinstance(layer.source, list):
                model_input = [y[idx] for idx in layer.source]
            else:
                model_input = y[layer.source]

            external_input = {source_name: y[source_name] for source_name in layer.external}

            x = layer(model_input, **external_input)
            y[-1] = x
            if layer.usable:
                y[index] = x
            if layer.output:
                output[layer.tags] = x
                if layer.tags == shortcut:
                    return output
        return output

    def get_out_channels(self, layer_type: str, layer_args: dict, output_dim: list, source: Union[int, list]):
        if hasattr(layer_args, "out_channels"):
            return layer_args["out_channels"]
        if layer_type == "CBFuse":
            return output_dim[source[-1]]
        if isinstance(source, int):
            return output_dim[source]
        if isinstance(source, list):
            return sum(output_dim[idx] for idx in source)

    def get_source_idx(self, source: Union[ListConfig, str, int], layer_idx: int):
        if isinstance(source, ListConfig):
            return [self.get_source_idx(index, layer_idx) for index in source]
        if isinstance(source, str):
            source = self.layer_index[source]
        if source < -1:
            source += layer_idx
        if source > 0:  # Using Previous Layer's Output
            self.model[source - 1].usable = True
        return source

    def create_layer(self, layer_type: str, source: Union[int, list], layer_info: Dict, **kwargs) -> YOLOLayer:
        if layer_type in self.layer_map:
            layer = self.layer_map[layer_type](**kwargs)
            setattr(layer, "layer_type", layer_type)
            setattr(layer, "source", source)
            setattr(layer, "in_c", kwargs.get("in_channels", None))
            setattr(layer, "output", layer_info.get("output", False))
            setattr(layer, "tags", layer_info.get("tags", None))
            setattr(layer, "external", layer_info.get("external", []))
            setattr(layer, "usable", 0)
            return layer
        else:
            raise ValueError(f"Unsupported layer type: {layer_type}")

    def save_load_weights(self, weights: Union[Path, OrderedDict]):
        """
        Update the model's weights with the provided weights.

        args:
            weights: A OrderedDict containing the new weights.
        """
        if isinstance(weights, Path):
            weights = torch.load(weights, map_location=torch.device("cpu"), weights_only=False)
        if "state_dict" in weights:
            weights = {name.removeprefix("model.model."): key for name, key in weights["state_dict"].items()}
        model_state_dict = self.model.state_dict()

        # TODO1: autoload old version weight
        # TODO2: weight transform if num_class difference

        error_dict = {"Mismatch": set(), "Not Found": set()}
        for model_key, model_weight in model_state_dict.items():
            if model_key not in weights:
                error_dict["Not Found"].add(tuple(model_key.split(".")[:-2]))
                continue
            if model_weight.shape != weights[model_key].shape:
                error_dict["Mismatch"].add(tuple(model_key.split(".")[:-2]))
                continue
            model_state_dict[model_key] = weights[model_key]

        for error_name, error_set in error_dict.items():
            error_dict = dict()
            for layer_idx, *layer_name in error_set:
                if layer_idx not in error_dict:
                    error_dict[layer_idx] = [".".join(layer_name)]
                else:
                    error_dict[layer_idx].append(".".join(layer_name))
            for layer_idx, layer_name in error_dict.items():
                layer_name.sort()
                logger.warning(f":warning: Weight {error_name} for Layer {layer_idx}: {', '.join(layer_name)}")

        self.model.load_state_dict(model_state_dict)

save_load_weights(weights)

Update the model's weights with the provided weights.

Parameters:

Name Type Description Default
weights Union[Path, OrderedDict]

A OrderedDict containing the new weights.

required
Source code in yolo/model/builder.py
def save_load_weights(self, weights: Union[Path, OrderedDict]):
    """
    Update the model's weights with the provided weights.

    args:
        weights: A OrderedDict containing the new weights.
    """
    if isinstance(weights, Path):
        weights = torch.load(weights, map_location=torch.device("cpu"), weights_only=False)
    if "state_dict" in weights:
        weights = {name.removeprefix("model.model."): key for name, key in weights["state_dict"].items()}
    model_state_dict = self.model.state_dict()

    # TODO1: autoload old version weight
    # TODO2: weight transform if num_class difference

    error_dict = {"Mismatch": set(), "Not Found": set()}
    for model_key, model_weight in model_state_dict.items():
        if model_key not in weights:
            error_dict["Not Found"].add(tuple(model_key.split(".")[:-2]))
            continue
        if model_weight.shape != weights[model_key].shape:
            error_dict["Mismatch"].add(tuple(model_key.split(".")[:-2]))
            continue
        model_state_dict[model_key] = weights[model_key]

    for error_name, error_set in error_dict.items():
        error_dict = dict()
        for layer_idx, *layer_name in error_set:
            if layer_idx not in error_dict:
                error_dict[layer_idx] = [".".join(layer_name)]
            else:
                error_dict[layer_idx].append(".".join(layer_name))
        for layer_idx, layer_name in error_dict.items():
            layer_name.sort()
            logger.warning(f":warning: Weight {error_name} for Layer {layer_idx}: {', '.join(layer_name)}")

    self.model.load_state_dict(model_state_dict)

BoxMatcher

Source code in yolo/tasks/detection/postprocess.py
class BoxMatcher:
    def __init__(self, cfg: MatcherConfig, class_num: int, vec2box, reg_max: int) -> None:
        self.class_num = class_num
        self.vec2box = vec2box
        self.reg_max = reg_max
        for attr_name in cfg:
            setattr(self, attr_name, cfg[attr_name])

    def get_valid_matrix(self, target_bbox: Tensor) -> Tensor:
        """
        Get a boolean mask that indicates whether each target bounding box overlaps with each anchor
        and is able to correctly predict it with the available reg_max value.

        Args:
            target_bbox (Tensor): The bounding box of each target, shape ``[batch, targets, 4]``.

        Returns:
            Tensor: Boolean mask of shape ``[batch, targets, anchors]`` — ``True`` where the
            target overlaps an anchor and the anchor can predict the target within ``reg_max``.
        """
        x_min, y_min, x_max, y_max = target_bbox[:, :, None].unbind(3)
        anchors = self.vec2box.anchor_grid[None, None]  # add a axis at first, second dimension
        anchors_x, anchors_y = anchors.unbind(dim=3)
        x_min_dist, x_max_dist = anchors_x - x_min, x_max - anchors_x
        y_min_dist, y_max_dist = anchors_y - y_min, y_max - anchors_y
        targets_dist = torch.stack((x_min_dist, y_min_dist, x_max_dist, y_max_dist), dim=-1)
        targets_dist /= self.vec2box.scaler[None, None, :, None]  # (1, 1, anchors, 1)
        min_reg_dist, max_reg_dist = targets_dist.amin(dim=-1), targets_dist.amax(dim=-1)
        target_on_anchor = min_reg_dist >= 0
        target_in_reg_max = max_reg_dist <= self.reg_max - 1.01
        return target_on_anchor & target_in_reg_max

    def get_cls_matrix(self, predict_cls: Tensor, target_cls: Tensor) -> Tensor:
        """
        Get the (predicted class' probabilities) corresponding to the target classes across all anchors

        Args:
            predict_cls (Tensor): Predicted class probabilities, shape ``[batch, anchors, classes]``.
            target_cls (Tensor): Ground-truth class indices, shape ``[batch, targets]``.

        Returns:
            Tensor: Class probabilities gathered for each target, shape ``[batch, targets, anchors]``.
        """
        predict_cls = predict_cls.transpose(1, 2)
        target_cls = target_cls.expand(-1, -1, predict_cls.size(2))
        cls_probabilities = torch.gather(predict_cls, 1, target_cls)
        return cls_probabilities

    def get_iou_matrix(self, predict_bbox, target_bbox) -> Tensor:
        """
        Get the IoU between each target bounding box and each predicted bounding box.

        Args:
            predict_bbox (Tensor): Predicted boxes in ``[x1, y1, x2, y2]`` format, shape ``[batch, predicts, 4]``.
            target_bbox (Tensor): Ground-truth boxes in ``[x1, y1, x2, y2]`` format, shape ``[batch, targets, 4]``.

        Returns:
            Tensor: IoU scores of shape ``[batch, targets, predicts]``.
        """
        return calculate_iou(target_bbox, predict_bbox, self.iou).clamp(0, 1)

    def filter_topk(self, target_matrix: Tensor, grid_mask: Tensor, topk: int = 10) -> Tuple[Tensor, Tensor]:
        """
        Filter the top-k suitability of targets for each anchor.

        Args:
            target_matrix (Tensor): Suitability scores, shape ``[batch, targets, anchors]``.
            grid_mask (Tensor): Validity mask, shape ``[batch, targets, anchors]``.
            topk (int): Number of top scores to retain per anchor.

        Returns:
            Tuple[Tensor, Tensor]: ``(topk_targets, topk_mask)`` both of shape
            ``[batch, targets, anchors]``.
        """
        masked_target_matrix = grid_mask * target_matrix
        values, indices = masked_target_matrix.topk(topk, dim=-1)
        topk_targets = torch.zeros_like(target_matrix, device=target_matrix.device)
        topk_targets.scatter_(dim=-1, index=indices, src=values)
        topk_mask = topk_targets > 0
        return topk_targets, topk_mask

    def ensure_one_anchor(self, target_matrix: Tensor, topk_mask: tensor) -> Tensor:
        """
        Ensures each valid target gets at least one anchor matched based on the unmasked target matrix,
        which enables an otherwise invalid match. This enables too small or too large targets to be
        learned as well, even if they can't be predicted perfectly.

        Args:
            target_matrix (Tensor): Suitability scores, shape ``[batch, targets, anchors]``.
            topk_mask (Tensor): Boolean top-k mask, shape ``[batch, targets, anchors]``.

        Returns:
            Tensor: Updated top-k mask of shape ``[batch, targets, anchors]``.
        """
        values, indices = target_matrix.max(dim=-1)
        best_anchor_mask = torch.zeros_like(target_matrix, dtype=torch.bool)
        best_anchor_mask.scatter_(-1, index=indices[..., None], src=~best_anchor_mask)
        matched_anchor_num = torch.sum(topk_mask, dim=-1)
        target_without_anchor = (matched_anchor_num == 0) & (values > 0)
        topk_mask = torch.where(target_without_anchor[..., None], best_anchor_mask, topk_mask)
        return topk_mask

    def filter_duplicates(self, iou_mat: Tensor, topk_mask: Tensor) -> Tuple[Tensor, Tensor, Tensor]:
        """
        Filter the maximum suitability target index of each anchor based on IoU.

        Args:
            iou_mat (Tensor): IoU scores, shape ``[batch, targets, anchors]``.
            topk_mask (Tensor): Boolean top-k mask, shape ``[batch, targets, anchors]``.

        Returns:
            Tuple[Tensor, Tensor, Tensor]: ``(unique_indices, valid_mask, topk_mask)`` with shapes
            ``[batch, anchors, 1]``, ``[batch, anchors]``, and ``[batch, targets, anchors]``.
        """
        duplicates = (topk_mask.sum(1, keepdim=True) > 1).repeat([1, topk_mask.size(1), 1])
        masked_iou_mat = topk_mask * iou_mat
        best_indices = masked_iou_mat.argmax(1)[:, None, :]
        best_target_mask = torch.zeros_like(duplicates, dtype=torch.bool)
        best_target_mask.scatter_(1, index=best_indices, src=~best_target_mask)
        topk_mask = torch.where(duplicates, best_target_mask, topk_mask)
        unique_indices = topk_mask.to(torch.uint8).argmax(dim=1)
        return unique_indices[..., None], topk_mask.any(dim=1), topk_mask

    def __call__(self, target: Tensor, predict: Tuple[Tensor]) -> Tuple[Tensor, Tensor]:
        """Matches each target to the most suitable anchor.
        1. For each anchor prediction, find the highest suitability targets.
        2. Match target to the best anchor.
        3. Noramlize the class probilities of targets.

        Args:
            target: The ground truth class and bounding box information
                as tensor of size [batch x targets x 5].
            predict: Tuple of predicted class and bounding box tensors.
                Class tensor is of size [batch x anchors x class]
                Bounding box tensor is of size [batch x anchors x 4].

        Returns:
            anchor_matched_targets: Tensor of size [batch x anchors x (class + 4)].
                A tensor assigning each target/gt to the best fitting anchor.
                The class probabilities are normalized.
            valid_mask: Bool tensor of shape [batch x anchors].
                True if a anchor has a target/gt assigned to it.
        """
        predict_cls, predict_bbox = predict

        # return if target has no gt information.
        n_targets = target.shape[1]
        if n_targets == 0:
            device = predict_bbox.device
            align_cls = torch.zeros_like(predict_cls, device=device)
            align_bbox = torch.zeros_like(predict_bbox, device=device)
            valid_mask = torch.zeros(predict_cls.shape[:2], dtype=bool, device=device)
            anchor_matched_targets = torch.cat([align_cls, align_bbox], dim=-1)
            return anchor_matched_targets, valid_mask

        target_cls, target_bbox = target.split([1, 4], dim=-1)  # B x N x (C B) -> B x N x C, B x N x B
        target_cls = target_cls.long().clamp(0)

        # get valid matrix (each gt appear in which anchor grid)
        grid_mask = self.get_valid_matrix(target_bbox)

        # get iou matrix (iou with each gt bbox and each predict anchor)
        iou_mat = self.get_iou_matrix(predict_bbox, target_bbox)

        # get cls matrix (cls prob with each gt class and each predict class)
        cls_mat = self.get_cls_matrix(predict_cls.sigmoid(), target_cls)

        target_matrix = (iou_mat ** self.factor["iou"]) * (cls_mat ** self.factor["cls"])

        # choose topk
        topk_targets, topk_mask = self.filter_topk(target_matrix, grid_mask, topk=self.topk)

        # match best anchor to valid targets without valid anchors
        topk_mask = self.ensure_one_anchor(target_matrix, topk_mask)

        # delete one anchor pred assign to mutliple gts
        unique_indices, valid_mask, topk_mask = self.filter_duplicates(iou_mat, topk_mask)

        align_bbox = torch.gather(target_bbox, 1, unique_indices.repeat(1, 1, 4))
        align_cls_indices = torch.gather(target_cls, 1, unique_indices)
        align_cls = torch.zeros_like(align_cls_indices, dtype=torch.bool).repeat(1, 1, self.class_num)
        align_cls.scatter_(-1, index=align_cls_indices, src=~align_cls)

        # normalize class ditribution
        iou_mat *= topk_mask
        target_matrix *= topk_mask
        max_target = target_matrix.amax(dim=-1, keepdim=True)
        max_iou = iou_mat.amax(dim=-1, keepdim=True)
        normalize_term = (target_matrix / (max_target + 1e-9)) * max_iou
        normalize_term = normalize_term.permute(0, 2, 1).gather(2, unique_indices)
        align_cls = align_cls * normalize_term * valid_mask[:, :, None]
        anchor_matched_targets = torch.cat([align_cls, align_bbox], dim=-1)
        return anchor_matched_targets, valid_mask

get_valid_matrix(target_bbox)

Get a boolean mask that indicates whether each target bounding box overlaps with each anchor and is able to correctly predict it with the available reg_max value.

Parameters:

Name Type Description Default
target_bbox Tensor

The bounding box of each target, shape [batch, targets, 4].

required

Returns:

Name Type Description
Tensor Tensor

Boolean mask of shape [batch, targets, anchors] — True where the

Tensor

target overlaps an anchor and the anchor can predict the target within reg_max.

Source code in yolo/tasks/detection/postprocess.py
def get_valid_matrix(self, target_bbox: Tensor) -> Tensor:
    """
    Get a boolean mask that indicates whether each target bounding box overlaps with each anchor
    and is able to correctly predict it with the available reg_max value.

    Args:
        target_bbox (Tensor): The bounding box of each target, shape ``[batch, targets, 4]``.

    Returns:
        Tensor: Boolean mask of shape ``[batch, targets, anchors]`` — ``True`` where the
        target overlaps an anchor and the anchor can predict the target within ``reg_max``.
    """
    x_min, y_min, x_max, y_max = target_bbox[:, :, None].unbind(3)
    anchors = self.vec2box.anchor_grid[None, None]  # add a axis at first, second dimension
    anchors_x, anchors_y = anchors.unbind(dim=3)
    x_min_dist, x_max_dist = anchors_x - x_min, x_max - anchors_x
    y_min_dist, y_max_dist = anchors_y - y_min, y_max - anchors_y
    targets_dist = torch.stack((x_min_dist, y_min_dist, x_max_dist, y_max_dist), dim=-1)
    targets_dist /= self.vec2box.scaler[None, None, :, None]  # (1, 1, anchors, 1)
    min_reg_dist, max_reg_dist = targets_dist.amin(dim=-1), targets_dist.amax(dim=-1)
    target_on_anchor = min_reg_dist >= 0
    target_in_reg_max = max_reg_dist <= self.reg_max - 1.01
    return target_on_anchor & target_in_reg_max

get_cls_matrix(predict_cls, target_cls)

Get the (predicted class' probabilities) corresponding to the target classes across all anchors

Parameters:

Name Type Description Default
predict_cls Tensor

Predicted class probabilities, shape [batch, anchors, classes].

required
target_cls Tensor

Ground-truth class indices, shape [batch, targets].

required

Returns:

Name Type Description
Tensor Tensor

Class probabilities gathered for each target, shape [batch, targets, anchors].

Source code in yolo/tasks/detection/postprocess.py
def get_cls_matrix(self, predict_cls: Tensor, target_cls: Tensor) -> Tensor:
    """
    Get the (predicted class' probabilities) corresponding to the target classes across all anchors

    Args:
        predict_cls (Tensor): Predicted class probabilities, shape ``[batch, anchors, classes]``.
        target_cls (Tensor): Ground-truth class indices, shape ``[batch, targets]``.

    Returns:
        Tensor: Class probabilities gathered for each target, shape ``[batch, targets, anchors]``.
    """
    predict_cls = predict_cls.transpose(1, 2)
    target_cls = target_cls.expand(-1, -1, predict_cls.size(2))
    cls_probabilities = torch.gather(predict_cls, 1, target_cls)
    return cls_probabilities

get_iou_matrix(predict_bbox, target_bbox)

Get the IoU between each target bounding box and each predicted bounding box.

Parameters:

Name Type Description Default
predict_bbox Tensor

Predicted boxes in [x1, y1, x2, y2] format, shape [batch, predicts, 4].

required
target_bbox Tensor

Ground-truth boxes in [x1, y1, x2, y2] format, shape [batch, targets, 4].

required

Returns:

Name Type Description
Tensor Tensor

IoU scores of shape [batch, targets, predicts].

Source code in yolo/tasks/detection/postprocess.py
def get_iou_matrix(self, predict_bbox, target_bbox) -> Tensor:
    """
    Get the IoU between each target bounding box and each predicted bounding box.

    Args:
        predict_bbox (Tensor): Predicted boxes in ``[x1, y1, x2, y2]`` format, shape ``[batch, predicts, 4]``.
        target_bbox (Tensor): Ground-truth boxes in ``[x1, y1, x2, y2]`` format, shape ``[batch, targets, 4]``.

    Returns:
        Tensor: IoU scores of shape ``[batch, targets, predicts]``.
    """
    return calculate_iou(target_bbox, predict_bbox, self.iou).clamp(0, 1)

filter_topk(target_matrix, grid_mask, topk=10)

Filter the top-k suitability of targets for each anchor.

Parameters:

Name Type Description Default
target_matrix Tensor

Suitability scores, shape [batch, targets, anchors].

required
grid_mask Tensor

Validity mask, shape [batch, targets, anchors].

required
topk int

Number of top scores to retain per anchor.

10

Returns:

Type Description
Tensor

Tuple[Tensor, Tensor]: (topk_targets, topk_mask) both of shape

Tensor

[batch, targets, anchors].

Source code in yolo/tasks/detection/postprocess.py
def filter_topk(self, target_matrix: Tensor, grid_mask: Tensor, topk: int = 10) -> Tuple[Tensor, Tensor]:
    """
    Filter the top-k suitability of targets for each anchor.

    Args:
        target_matrix (Tensor): Suitability scores, shape ``[batch, targets, anchors]``.
        grid_mask (Tensor): Validity mask, shape ``[batch, targets, anchors]``.
        topk (int): Number of top scores to retain per anchor.

    Returns:
        Tuple[Tensor, Tensor]: ``(topk_targets, topk_mask)`` both of shape
        ``[batch, targets, anchors]``.
    """
    masked_target_matrix = grid_mask * target_matrix
    values, indices = masked_target_matrix.topk(topk, dim=-1)
    topk_targets = torch.zeros_like(target_matrix, device=target_matrix.device)
    topk_targets.scatter_(dim=-1, index=indices, src=values)
    topk_mask = topk_targets > 0
    return topk_targets, topk_mask

ensure_one_anchor(target_matrix, topk_mask)

Ensures each valid target gets at least one anchor matched based on the unmasked target matrix, which enables an otherwise invalid match. This enables too small or too large targets to be learned as well, even if they can't be predicted perfectly.

Parameters:

Name Type Description Default
target_matrix Tensor

Suitability scores, shape [batch, targets, anchors].

required
topk_mask Tensor

Boolean top-k mask, shape [batch, targets, anchors].

required

Returns:

Name Type Description
Tensor Tensor

Updated top-k mask of shape [batch, targets, anchors].

Source code in yolo/tasks/detection/postprocess.py
def ensure_one_anchor(self, target_matrix: Tensor, topk_mask: tensor) -> Tensor:
    """
    Ensures each valid target gets at least one anchor matched based on the unmasked target matrix,
    which enables an otherwise invalid match. This enables too small or too large targets to be
    learned as well, even if they can't be predicted perfectly.

    Args:
        target_matrix (Tensor): Suitability scores, shape ``[batch, targets, anchors]``.
        topk_mask (Tensor): Boolean top-k mask, shape ``[batch, targets, anchors]``.

    Returns:
        Tensor: Updated top-k mask of shape ``[batch, targets, anchors]``.
    """
    values, indices = target_matrix.max(dim=-1)
    best_anchor_mask = torch.zeros_like(target_matrix, dtype=torch.bool)
    best_anchor_mask.scatter_(-1, index=indices[..., None], src=~best_anchor_mask)
    matched_anchor_num = torch.sum(topk_mask, dim=-1)
    target_without_anchor = (matched_anchor_num == 0) & (values > 0)
    topk_mask = torch.where(target_without_anchor[..., None], best_anchor_mask, topk_mask)
    return topk_mask

filter_duplicates(iou_mat, topk_mask)

Filter the maximum suitability target index of each anchor based on IoU.

Parameters:

Name Type Description Default
iou_mat Tensor

IoU scores, shape [batch, targets, anchors].

required
topk_mask Tensor

Boolean top-k mask, shape [batch, targets, anchors].

required

Returns:

Type Description
Tensor

Tuple[Tensor, Tensor, Tensor]: (unique_indices, valid_mask, topk_mask) with shapes

Tensor

[batch, anchors, 1], [batch, anchors], and [batch, targets, anchors].

Source code in yolo/tasks/detection/postprocess.py
def filter_duplicates(self, iou_mat: Tensor, topk_mask: Tensor) -> Tuple[Tensor, Tensor, Tensor]:
    """
    Filter the maximum suitability target index of each anchor based on IoU.

    Args:
        iou_mat (Tensor): IoU scores, shape ``[batch, targets, anchors]``.
        topk_mask (Tensor): Boolean top-k mask, shape ``[batch, targets, anchors]``.

    Returns:
        Tuple[Tensor, Tensor, Tensor]: ``(unique_indices, valid_mask, topk_mask)`` with shapes
        ``[batch, anchors, 1]``, ``[batch, anchors]``, and ``[batch, targets, anchors]``.
    """
    duplicates = (topk_mask.sum(1, keepdim=True) > 1).repeat([1, topk_mask.size(1), 1])
    masked_iou_mat = topk_mask * iou_mat
    best_indices = masked_iou_mat.argmax(1)[:, None, :]
    best_target_mask = torch.zeros_like(duplicates, dtype=torch.bool)
    best_target_mask.scatter_(1, index=best_indices, src=~best_target_mask)
    topk_mask = torch.where(duplicates, best_target_mask, topk_mask)
    unique_indices = topk_mask.to(torch.uint8).argmax(dim=1)
    return unique_indices[..., None], topk_mask.any(dim=1), topk_mask

__call__(target, predict)

Matches each target to the most suitable anchor. 1. For each anchor prediction, find the highest suitability targets. 2. Match target to the best anchor. 3. Noramlize the class probilities of targets.

Parameters:

Name Type Description Default
target Tensor

The ground truth class and bounding box information as tensor of size [batch x targets x 5].

required
predict Tuple[Tensor]

Tuple of predicted class and bounding box tensors. Class tensor is of size [batch x anchors x class] Bounding box tensor is of size [batch x anchors x 4].

required

Returns:

Name Type Description
anchor_matched_targets Tensor

Tensor of size [batch x anchors x (class + 4)]. A tensor assigning each target/gt to the best fitting anchor. The class probabilities are normalized.

valid_mask Tensor

Bool tensor of shape [batch x anchors]. True if a anchor has a target/gt assigned to it.

Source code in yolo/tasks/detection/postprocess.py
def __call__(self, target: Tensor, predict: Tuple[Tensor]) -> Tuple[Tensor, Tensor]:
    """Matches each target to the most suitable anchor.
    1. For each anchor prediction, find the highest suitability targets.
    2. Match target to the best anchor.
    3. Noramlize the class probilities of targets.

    Args:
        target: The ground truth class and bounding box information
            as tensor of size [batch x targets x 5].
        predict: Tuple of predicted class and bounding box tensors.
            Class tensor is of size [batch x anchors x class]
            Bounding box tensor is of size [batch x anchors x 4].

    Returns:
        anchor_matched_targets: Tensor of size [batch x anchors x (class + 4)].
            A tensor assigning each target/gt to the best fitting anchor.
            The class probabilities are normalized.
        valid_mask: Bool tensor of shape [batch x anchors].
            True if a anchor has a target/gt assigned to it.
    """
    predict_cls, predict_bbox = predict

    # return if target has no gt information.
    n_targets = target.shape[1]
    if n_targets == 0:
        device = predict_bbox.device
        align_cls = torch.zeros_like(predict_cls, device=device)
        align_bbox = torch.zeros_like(predict_bbox, device=device)
        valid_mask = torch.zeros(predict_cls.shape[:2], dtype=bool, device=device)
        anchor_matched_targets = torch.cat([align_cls, align_bbox], dim=-1)
        return anchor_matched_targets, valid_mask

    target_cls, target_bbox = target.split([1, 4], dim=-1)  # B x N x (C B) -> B x N x C, B x N x B
    target_cls = target_cls.long().clamp(0)

    # get valid matrix (each gt appear in which anchor grid)
    grid_mask = self.get_valid_matrix(target_bbox)

    # get iou matrix (iou with each gt bbox and each predict anchor)
    iou_mat = self.get_iou_matrix(predict_bbox, target_bbox)

    # get cls matrix (cls prob with each gt class and each predict class)
    cls_mat = self.get_cls_matrix(predict_cls.sigmoid(), target_cls)

    target_matrix = (iou_mat ** self.factor["iou"]) * (cls_mat ** self.factor["cls"])

    # choose topk
    topk_targets, topk_mask = self.filter_topk(target_matrix, grid_mask, topk=self.topk)

    # match best anchor to valid targets without valid anchors
    topk_mask = self.ensure_one_anchor(target_matrix, topk_mask)

    # delete one anchor pred assign to mutliple gts
    unique_indices, valid_mask, topk_mask = self.filter_duplicates(iou_mat, topk_mask)

    align_bbox = torch.gather(target_bbox, 1, unique_indices.repeat(1, 1, 4))
    align_cls_indices = torch.gather(target_cls, 1, unique_indices)
    align_cls = torch.zeros_like(align_cls_indices, dtype=torch.bool).repeat(1, 1, self.class_num)
    align_cls.scatter_(-1, index=align_cls_indices, src=~align_cls)

    # normalize class ditribution
    iou_mat *= topk_mask
    target_matrix *= topk_mask
    max_target = target_matrix.amax(dim=-1, keepdim=True)
    max_iou = iou_mat.amax(dim=-1, keepdim=True)
    normalize_term = (target_matrix / (max_target + 1e-9)) * max_iou
    normalize_term = normalize_term.permute(0, 2, 1).gather(2, unique_indices)
    align_cls = align_cls * normalize_term * valid_mask[:, :, None]
    anchor_matched_targets = torch.cat([align_cls, align_bbox], dim=-1)
    return anchor_matched_targets, valid_mask

Vec2Box

Source code in yolo/tasks/detection/postprocess.py
class Vec2Box:
    def __init__(self, model: YOLO, anchor_cfg: AnchorConfig, image_size, device):
        self.device = device

        if hasattr(anchor_cfg, "strides"):
            logger.info(f":japanese_not_free_of_charge_button: Found stride of model {anchor_cfg.strides}")
            self.strides = anchor_cfg.strides
        else:
            logger.info(":teddy_bear: Found no stride of model, performed a dummy test for auto-anchor size")
            self.strides = self.create_auto_anchor(model, image_size)

        anchor_grid, scaler = generate_anchors(image_size, self.strides)
        self.image_size = image_size
        self.anchor_grid, self.scaler = anchor_grid.to(device), scaler.to(device)

    def create_auto_anchor(self, model: YOLO, image_size):
        W, H = image_size
        # TODO: need accelerate dummy test
        device = next(model.parameters()).device
        dummy_input = torch.zeros(1, 3, H, W, device=device)
        dummy_output = model(dummy_input)
        strides = []
        for predict_head in dummy_output["Main"]:
            _, _, *anchor_num = predict_head[2].shape
            strides.append(W // anchor_num[1])
        return strides

    def update(self, image_size):
        """
        image_size: W, H
        """
        if self.image_size == image_size:
            return
        anchor_grid, scaler = generate_anchors(image_size, self.strides)
        self.image_size = image_size
        self.anchor_grid, self.scaler = anchor_grid.to(self.device), scaler.to(self.device)

    def __call__(self, predicts):
        preds_cls, preds_anc, preds_box = [], [], []
        for layer_output in predicts:
            pred_cls, pred_anc, pred_box = layer_output
            preds_cls.append(rearrange(pred_cls, "B C h w -> B (h w) C"))
            preds_anc.append(rearrange(pred_anc, "B A R h w -> B (h w) R A"))
            preds_box.append(rearrange(pred_box, "B X h w -> B (h w) X"))
        preds_cls = torch.concat(preds_cls, dim=1)
        preds_anc = torch.concat(preds_anc, dim=1)
        preds_box = torch.concat(preds_box, dim=1)

        pred_LTRB = preds_box * self.scaler.view(1, -1, 1)
        lt, rb = pred_LTRB.chunk(2, dim=-1)
        preds_box = torch.cat([self.anchor_grid - lt, self.anchor_grid + rb], dim=-1)
        return preds_cls, preds_anc, preds_box

update(image_size)

image_size: W, H

Source code in yolo/tasks/detection/postprocess.py
def update(self, image_size):
    """
    image_size: W, H
    """
    if self.image_size == image_size:
        return
    anchor_grid, scaler = generate_anchors(image_size, self.strides)
    self.image_size = image_size
    self.anchor_grid, self.scaler = anchor_grid.to(self.device), scaler.to(self.device)

Anc2Box

Source code in yolo/tasks/detection/postprocess.py
class Anc2Box:
    def __init__(self, model: YOLO, anchor_cfg: AnchorConfig, image_size, device):
        self.device = device

        if hasattr(anchor_cfg, "strides"):
            logger.info(f":japanese_not_free_of_charge_button: Found stride of model {anchor_cfg.strides}")
            self.strides = anchor_cfg.strides
        else:
            logger.info(":teddy_bear: Found no stride of model, performed a dummy test for auto-anchor size")
            self.strides = self.create_auto_anchor(model, image_size)

        self.head_num = len(anchor_cfg.anchor)
        self.anchor_grids = self.generate_anchors(image_size)
        self.anchor_scale = tensor(anchor_cfg.anchor, device=device).view(self.head_num, 1, -1, 1, 1, 2)
        self.anchor_num = self.anchor_scale.size(2)
        self.class_num = model.num_classes

    def create_auto_anchor(self, model: YOLO, image_size):
        W, H = image_size
        dummy_input = torch.zeros(1, 3, H, W, device=self.device)
        dummy_output = model(dummy_input)
        strides = []
        for predict_head in dummy_output["Main"]:
            _, _, *anchor_num = predict_head.shape
            strides.append(W // anchor_num[1])
        return strides

    def generate_anchors(self, image_size: List[int]):
        anchor_grids = []
        for stride in self.strides:
            W, H = image_size[0] // stride, image_size[1] // stride
            anchor_h, anchor_w = torch.meshgrid([torch.arange(H), torch.arange(W)], indexing="ij")
            anchor_grid = torch.stack((anchor_w, anchor_h), 2).view((1, 1, H, W, 2)).float().to(self.device)
            anchor_grids.append(anchor_grid)
        return anchor_grids

    def update(self, image_size):
        self.anchor_grids = self.generate_anchors(image_size)

    def __call__(self, predicts: List[Tensor]):
        preds_box, preds_cls, preds_cnf = [], [], []
        for layer_idx, predict in enumerate(predicts):
            predict = rearrange(predict, "B (L C) h w -> B L h w C", L=self.anchor_num)
            pred_box, pred_cnf, pred_cls = predict.split((4, 1, self.class_num), dim=-1)
            pred_box = pred_box.sigmoid()
            pred_box[..., 0:2] = (pred_box[..., 0:2] * 2.0 - 0.5 + self.anchor_grids[layer_idx]) * self.strides[
                layer_idx
            ]
            pred_box[..., 2:4] = (pred_box[..., 2:4] * 2) ** 2 * self.anchor_scale[layer_idx]
            preds_box.append(rearrange(pred_box, "B L h w A -> B (L h w) A"))
            preds_cls.append(rearrange(pred_cls, "B L h w C -> B (L h w) C"))
            preds_cnf.append(rearrange(pred_cnf, "B L h w C -> B (L h w) C"))

        preds_box = torch.concat(preds_box, dim=1)
        preds_cls = torch.concat(preds_cls, dim=1)
        preds_cnf = torch.concat(preds_cnf, dim=1)

        preds_box = transform_bbox(preds_box, "xycwh -> xyxy")
        return preds_cls, None, preds_box, preds_cnf.sigmoid()

calculate_iou(bbox1, bbox2, metrics='iou')

Source code in yolo/tasks/detection/postprocess.py
def calculate_iou(bbox1, bbox2, metrics="iou") -> Tensor:
    metrics = metrics.lower()
    EPS = 1e-7
    dtype = bbox1.dtype
    bbox1 = bbox1.to(torch.float32)
    bbox2 = bbox2.to(torch.float32)

    # Expand dimensions if necessary
    if bbox1.ndim == 2 and bbox2.ndim == 2:
        bbox1 = bbox1.unsqueeze(1)  # (Ax4) -> (Ax1x4)
        bbox2 = bbox2.unsqueeze(0)  # (Bx4) -> (1xBx4)
    elif bbox1.ndim == 3 and bbox2.ndim == 3:
        bbox1 = bbox1.unsqueeze(2)  # (BZxAx4) -> (BZxAx1x4)
        bbox2 = bbox2.unsqueeze(1)  # (BZxBx4) -> (BZx1xBx4)

    # Calculate intersection coordinates
    xmin_inter = torch.max(bbox1[..., 0], bbox2[..., 0])
    ymin_inter = torch.max(bbox1[..., 1], bbox2[..., 1])
    xmax_inter = torch.min(bbox1[..., 2], bbox2[..., 2])
    ymax_inter = torch.min(bbox1[..., 3], bbox2[..., 3])

    # Calculate intersection area
    intersection_area = torch.clamp(xmax_inter - xmin_inter, min=0) * torch.clamp(ymax_inter - ymin_inter, min=0)

    # Calculate area of each bbox
    area_bbox1 = (bbox1[..., 2] - bbox1[..., 0]) * (bbox1[..., 3] - bbox1[..., 1])
    area_bbox2 = (bbox2[..., 2] - bbox2[..., 0]) * (bbox2[..., 3] - bbox2[..., 1])

    # Calculate union area
    union_area = area_bbox1 + area_bbox2 - intersection_area

    # Calculate IoU
    iou = intersection_area / (union_area + EPS)
    if metrics == "iou":
        return iou.to(dtype)

    # Calculate centroid distance
    cx1 = (bbox1[..., 2] + bbox1[..., 0]) / 2
    cy1 = (bbox1[..., 3] + bbox1[..., 1]) / 2
    cx2 = (bbox2[..., 2] + bbox2[..., 0]) / 2
    cy2 = (bbox2[..., 3] + bbox2[..., 1]) / 2
    cent_dis = (cx1 - cx2) ** 2 + (cy1 - cy2) ** 2

    # Calculate diagonal length of the smallest enclosing box
    c_x = torch.max(bbox1[..., 2], bbox2[..., 2]) - torch.min(bbox1[..., 0], bbox2[..., 0])
    c_y = torch.max(bbox1[..., 3], bbox2[..., 3]) - torch.min(bbox1[..., 1], bbox2[..., 1])
    diag_dis = c_x**2 + c_y**2 + EPS

    diou = iou - (cent_dis / diag_dis)
    if metrics == "diou":
        return diou.to(dtype)

    # Compute aspect ratio penalty term
    arctan = torch.atan((bbox1[..., 2] - bbox1[..., 0]) / (bbox1[..., 3] - bbox1[..., 1] + EPS)) - torch.atan(
        (bbox2[..., 2] - bbox2[..., 0]) / (bbox2[..., 3] - bbox2[..., 1] + EPS)
    )
    v = (4 / (math.pi**2)) * (arctan**2)
    with torch.no_grad():
        alpha = v / (v - iou + 1 + EPS)
    # Compute CIoU
    ciou = diou - alpha * v
    return ciou.to(dtype)

transform_bbox(bbox, indicator='xywh -> xyxy')

Source code in yolo/tasks/detection/postprocess.py
def transform_bbox(bbox: Tensor, indicator="xywh -> xyxy"):
    data_type = bbox.dtype
    in_type, out_type = indicator.replace(" ", "").split("->")

    if in_type not in ["xyxy", "xywh", "xycwh"] or out_type not in ["xyxy", "xywh", "xycwh"]:
        raise ValueError("Invalid input or output format")

    if in_type == "xywh":
        x_min = bbox[..., 0]
        y_min = bbox[..., 1]
        x_max = bbox[..., 0] + bbox[..., 2]
        y_max = bbox[..., 1] + bbox[..., 3]
    elif in_type == "xyxy":
        x_min = bbox[..., 0]
        y_min = bbox[..., 1]
        x_max = bbox[..., 2]
        y_max = bbox[..., 3]
    elif in_type == "xycwh":
        x_min = bbox[..., 0] - bbox[..., 2] / 2
        y_min = bbox[..., 1] - bbox[..., 3] / 2
        x_max = bbox[..., 0] + bbox[..., 2] / 2
        y_max = bbox[..., 1] + bbox[..., 3] / 2

    if out_type == "xywh":
        bbox = torch.stack([x_min, y_min, x_max - x_min, y_max - y_min], dim=-1)
    elif out_type == "xyxy":
        bbox = torch.stack([x_min, y_min, x_max, y_max], dim=-1)
    elif out_type == "xycwh":
        bbox = torch.stack([(x_min + x_max) / 2, (y_min + y_max) / 2, x_max - x_min, y_max - y_min], dim=-1)

    return bbox.to(dtype=data_type)

generate_anchors(image_size, strides)

Find the anchor maps for each w, h.

Parameters:

Name Type Description Default
image_size List[int]

The image size [W, H] of the augmented input.

required
strides List[int]

The stride for each prediction layer, e.g. [8, 16, 32].

required

Returns:

Type Description
Tensor

Tuple[Tensor, Tensor]: (all_anchors, all_scalers) where all_anchors

Tensor

has shape [HW, 2] and all_scalers has shape [HW].

Source code in yolo/tasks/detection/postprocess.py
def generate_anchors(image_size: List[int], strides: List[int]) -> Tuple[Tensor, Tensor]:
    """
    Find the anchor maps for each w, h.

    Args:
        image_size (List[int]): The image size ``[W, H]`` of the augmented input.
        strides (List[int]): The stride for each prediction layer, e.g. ``[8, 16, 32]``.

    Returns:
        Tuple[Tensor, Tensor]: ``(all_anchors, all_scalers)`` where ``all_anchors``
        has shape ``[HW, 2]`` and ``all_scalers`` has shape ``[HW]``.
    """
    W, H = image_size
    anchors = []
    scaler = []
    for stride in strides:
        anchor_num = W // stride * H // stride
        scaler.append(torch.full((anchor_num,), stride))
        shift = stride // 2
        h = torch.arange(0, H, stride) + shift
        w = torch.arange(0, W, stride) + shift
        if torch.__version__ >= "2.3.0":
            anchor_h, anchor_w = torch.meshgrid(h, w, indexing="ij")
        else:
            anchor_h, anchor_w = torch.meshgrid(h, w)
        anchor = torch.stack([anchor_w.flatten(), anchor_h.flatten()], dim=-1)
        anchors.append(anchor)
    all_anchors = torch.cat(anchors, dim=0)
    all_scalers = torch.cat(scaler, dim=0)
    return all_anchors, all_scalers

create_converter(model_version='v9-c', *args, **kwargs)

Source code in yolo/tasks/detection/postprocess.py
def create_converter(model_version: str = "v9-c", *args, **kwargs) -> Union[Anc2Box, Vec2Box]:
    if "v7" in model_version:  # check model if v7
        converter = Anc2Box(*args, **kwargs)
    else:
        converter = Vec2Box(*args, **kwargs)
    return converter

bbox_nms(cls_dist, bbox, nms_cfg, confidence=None)

Source code in yolo/tasks/detection/postprocess.py
def bbox_nms(cls_dist: Tensor, bbox: Tensor, nms_cfg: NMSConfig, confidence: Optional[Tensor] = None):
    cls_dist = cls_dist.sigmoid() * (1 if confidence is None else confidence)

    batch_idx, valid_grid, valid_cls = torch.where(cls_dist > nms_cfg.min_confidence)
    valid_con = cls_dist[batch_idx, valid_grid, valid_cls]
    valid_box = bbox[batch_idx, valid_grid]

    nms_idx = batched_nms(valid_box, valid_con, batch_idx + valid_cls * bbox.size(0), nms_cfg.min_iou)
    predicts_nms = []
    for idx in range(cls_dist.size(0)):
        instance_idx = nms_idx[idx == batch_idx[nms_idx]]

        predict_nms = torch.cat(
            [valid_cls[instance_idx][:, None], valid_box[instance_idx], valid_con[instance_idx][:, None]], dim=-1
        )

        predicts_nms.append(predict_nms[: nms_cfg.max_bbox])
    return predicts_nms

calculate_map(predictions, ground_truths)

Source code in yolo/tasks/detection/postprocess.py
def calculate_map(predictions, ground_truths) -> Dict[str, Tensor]:
    metric = MeanAveragePrecision(iou_type="bbox", box_format="xyxy")
    mAP = metric([to_metrics_format(predictions)], [to_metrics_format(ground_truths)])
    return mAP

to_metrics_format(prediction)

Source code in yolo/tasks/detection/postprocess.py
def to_metrics_format(prediction: Tensor) -> Dict[str, Union[float, Tensor]]:
    prediction = prediction[prediction[:, 0] != -1]
    bbox = {"boxes": prediction[:, 1:5], "labels": prediction[:, 0].int()}
    if prediction.size(1) == 6:
        bbox["scores"] = prediction[:, 5]
    return bbox