# Imports

In [None]:
import numpy as np
import tensorflow as tf

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
              tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

# Model

In [None]:
def get_name(prefix: str | None = None, suffix: str | None = None, separator: str = '.') -> str | None:
    return prefix and prefix + separator + suffix or suffix or None

In [None]:
def get_conv_bn(
    x: tf.Tensor,
    channels: int,
    kernel_size: int,
    padding: str,
    strides: int,
    activation: str | tf.keras.layers.Activation | None = 'silu',
    use_bias: bool = True,
    name: str | None = None
) -> tf.Tensor:
    """Applies 2D Convolution followed by BatchNormalization and (possibly) `activation`

    Arguments:
        x: tensor to apply the operation to
        channels: number of output channels
        kernel_size: kernel size of the convolution
        padding: either 'same' or 'valid', padding mode for the convolution
        strides: stride size for the convolution
        activation: activation to apply after batch normalization
        use_bias: whether to learn bias in the convolution
        name: name of the operation

    Returns:
        The resulting tensor"""

    x = tf.keras.layers.Conv2D(channels, kernel_size=kernel_size, padding=padding, use_bias=use_bias, strides=strides, name=get_name(name, 'conv'))(x)
    x = tf.keras.layers.BatchNormalization(name=get_name(name, 'bn'))(x)
    
    if activation is not None:
        x = tf.keras.layers.Activation(activation, name=get_name(name, 'activation'))(x)

    return x

In [None]:
def get_bottleneck(
    x: tf.Tensor,
    has_skip_connection: bool,
    activation: str | tf.keras.layers.Activation | None = 'silu',
    name: str | None = None
) -> tf.Tensor:
    """Applies bottleneck block (see YOLOv5 architecture for details)

    Arguments:
        x: tensor to apply the operation to
        has_skip_connection: whether to use skip connection
        activation: activation to apply in internal convolutions
        name: name of the operation

    Returns:
        The resulting tensor"""
    
    channels = x.shape[-1]
    y = get_conv_bn(x, channels, kernel_size=1, padding='valid', strides=1, activation=activation, name=get_name(name, 'conv1'))
    y = get_conv_bn(y, channels, kernel_size=3, padding='same',  strides=1, activation=activation, name=get_name(name, 'conv2'))
    
    if has_skip_connection:
        y = tf.keras.layers.Add(name=get_name(name, 'skip_connection'))([y, x])

    return y

In [None]:
def get_C3_block(
    x: tf.Tensor,
    size: int,
    half_size: bool,
    has_skip_connection: bool,
    activation: str | tf.keras.layers.Activation | None = 'silu',
    name: str | None = None
) -> tf.Tensor:
    """Applies C3 block (see YOLOv5 architecture for details)

    Arguments:
        x: tensor to apply the operation to
        size: number of bottlenecks
        half_size: whether the output channels are half of the input ones
        activation: activation to apply in internal convolutions
        name: name of the operation

    Returns:
        The resulting tensor"""

    channels = x.shape[-1] // 2
    y = get_conv_bn(x, channels, kernel_size=1, padding='valid', strides=1, activation=activation, name=get_name(name, 'conv1'))
    x = get_conv_bn(x, channels, kernel_size=1, padding='valid', strides=1, activation=activation, name=get_name(name, 'conv2'))
    
    for i in range(size):
        x = get_bottleneck(x, has_skip_connection, activation=activation, name=get_name(name, f'bottleneck_{i + 1}'))

    x = tf.keras.layers.Concatenate(name=get_name(name, 'concatenate'))([x, y])

    multiplier = 1
    if not half_size:
        multiplier = 2

    x = get_conv_bn(x, channels * multiplier, kernel_size=1, padding='valid', strides=1, activation=activation, name=get_name(name, 'out_conv')) 

    return x     

In [None]:
def get_yolo_backbone(
    channels: int,
    block_sizes: list[int],
    pyramid_sizes: list[int],
    activation: str | tf.keras.layers.Activation | None = 'silu',
    name: str | None = None
) -> tf.keras.Model:
    """Builds YOLOv5 style model.

    Arguments:
        channels: number of channels for the first convolution
        block_sizes: number of bottleneck layers in each down block
        pyramid_sizes: number of bottleneck layers in each pyramid level
        activation: activation to apply in internal convolutions
        name: name of the model

    Return:
        The model"""

    inputs = tf.keras.layers.Input((None, None, 3), dtype=tf.uint8, name=get_name(name, 'input'))
    x = tf.keras.layers.Rescaling(scale=1. / 127.5, offset=-1, name=get_name(name, 'rescaling'))(inputs)
    
    x = get_conv_bn(x, channels, kernel_size=6, padding='same', strides=2, activation=activation, name=get_name(name, 'stem'))

    levels = []
    for i, s in enumerate(block_sizes):
        channels *= 2
        block_name = get_name(name, f'block_{i + 1}')
        
        x = get_conv_bn(x, channels, kernel_size=3, padding='same', strides=2, activation=activation, name=get_name(block_name, 'stem'))
        x = get_C3_block(x, block_sizes[i], has_skip_connection=True, half_size=False, activation=activation, name=get_name(block_name, 'C3'))
        levels.append(x)

    sppf_name = get_name(name, 'sppf')
    x = get_conv_bn(x, channels // 2, kernel_size=1, padding='same', strides=1, activation=activation, name=get_name(sppf_name, 'in_conv'))

    max_pools = [x]
    for i in range(3):
        x = tf.keras.layers.MaxPooling2D(pool_size=5, strides=1, padding='same', name=get_name(sppf_name, f'max_pool_{i + 1}'))(x)
        max_pools.append(x)

    x = tf.keras.layers.Concatenate(name=get_name(sppf_name, 'concatenate'))(max_pools)
    x = get_conv_bn(x, channels, kernel_size=1, padding='valid', strides=1, activation=activation, name=get_name(sppf_name, 'out_conv'))

    pyramid_levels = [x]
    pyramid_name = get_name(name, 'up_pyramid')
    
    for i, j in enumerate(pyramid_sizes):
        block_name = get_name(pyramid_name, f'block_{i + 1}')
        
        y = levels[-i - 2]
        x = pyramid_levels[-1] = get_conv_bn(pyramid_levels[-1], y.shape[-1], kernel_size=1, padding='valid', strides=1, activation=activation, name=get_name(block_name, 'conv'))
        
        x = tf.keras.layers.UpSampling2D(size=2, interpolation='nearest', name=get_name(block_name, 'upsampling'))(x)
        x = tf.keras.layers.Concatenate(name=get_name(block_name, 'concatenate'))([x, y])

        x = get_C3_block(x, j, has_skip_connection=False, half_size=True, activation=activation, name=get_name(block_name, 'C3'))

        pyramid_levels.append(x)

    outputs = [pyramid_levels[-1]]
    pyramid_name = get_name(name, 'down_pyramid')

    for i, j in enumerate(pyramid_sizes[::-1]):
        channels = outputs[-1].shape[-1]
        block_name = get_name(pyramid_name, f'block_{i + 1}')
        
        x = get_conv_bn(outputs[-1], channels, kernel_size=3, padding='same', strides=2, activation=activation, name=get_name(block_name, 'conv'))

        x = tf.keras.layers.Concatenate(name=get_name(block_name, 'concatenate'))([x, pyramid_levels[-i - 2]])

        x = get_C3_block(x, j, has_skip_connection=False, half_size=False, activation=activation, name=get_name(block_name, 'C3'))

        outputs.append(x)

        
    return tf.keras.Model(inputs=inputs, outputs=outputs, name=name)

# Head

In [None]:
def depthwise_separable_conv(
    x: tf.Tensor,
    channels: int,
    kernel_size: int,
    strides: int,
    activation: str | tf.keras.layers.Activation | None = 'silu',
    name: str | None = None
) -> tf.Tensor:
    """Applies depthwise separable convolution.

    Arguments:
        x: tensor to apply the operation to
        channels: number of output channels
        kernel_size: kernel size for the depthwise convolution
        strides: stride size for the convolutions
        activation: activation to apply in internal convolutions
        name: name of the operation

    Returns:
        The resulting tensor"""

    x = tf.keras.layers.DepthwiseConv2D(kernel_size, use_bias=False, strides=strides, padding='same', name=get_name(name, 'depthwise_conv'))(x)
    x = tf.keras.layers.BatchNormalization(name=get_name(name, 'depthwise_batchnorm'))(x)

    if activation is not None:
        x = tf.keras.layers.Activation(activation=activation, name=get_name(name, 'depthwise_activation'))(x)

    x = get_conv_bn(x, channels, kernel_size=1, use_bias=False, strides=strides, padding='valid', activation=activation, name=get_name(name, 'pointwise'))

    return x

In [None]:
def depthwise_separable_convs(
    x: tf.Tensor,
    channels: int,
    out_channels: int,
    n_convs: int,
    activation: str | tf.keras.layers.Activation | None = 'silu',
    name: str | None = None
):
    """Applies depthwise separable convolutions.

    Arguments:
        x: tensor to apply the operation to
        channels: number of output channels
        out_channels: number of output channels
        n_convs: number of convolutions
        activation: activation to apply in internal convolutions
        name: name of the operation

    Returns:
        The resulting tensor"""
    
    for i in range(n_convs):
        x = depthwise_separable_conv(x, channels, 3, 1, name=get_name(name, f'conv_{i}'), activation=activation)

    x = tf.keras.layers.Conv2D(out_channels, 3, padding='same', name=get_name(name, 'out_conv'))(x)

    return x

In [None]:
class DFLInitializer(tf.keras.initializers.Initializer):
    def __call__(self, shape, dtype=None, **kwargs) -> tf.Tensor:
        return tf.reshape(tf.range(shape[2], dtype=dtype), (1, 1, -1, 1))

In [None]:
def yolo_head(
    x: list[tf.Tensor],
    n_anchors_per_pixel: int,
    n_convs: int,
    n_reg: int,
    n_classes: int,
    activation: str | tf.keras.layers.Activation | None = 'silu',
    name: str | None = None,
) -> list[tuple[tf.Tensor, tf.Tensor]]:
    """Applies YOLO detection head.

    Arguments:
        x: list of outputs from backbone levels
        n_anchors_per_pixel: number of anchors per output pixel
        n_convs: number of convolutions for each output
        n_reg: number of DFL points
        n_classes: number of detected classes
        activation: activation to apply in internal convolutions
        name: name of the operation

    Returns:
        list of pairs predicted boxes and predicted classes"""

    outputs = []

    for i, y in enumerate(x):
        boxes = depthwise_separable_convs(y, max(16, y.shape[-1] // n_reg, n_reg * 4),  n_anchors_per_pixel * n_reg * 4, n_convs, activation, get_name(name, f'prediction_box_{i}'))
        boxes = tf.keras.layers.Lambda(lambda x: tf.reshape(x, (tf.shape(x)[0], -1, 4, n_reg)), name=get_name(name, f'reshape_{i}'))(boxes)
        boxes = tf.keras.layers.Softmax(name=get_name(name, f'softmax_{i}'))(boxes)

        # DFL
        boxes = tf.keras.layers.Conv2D(1, 1, kernel_initializer=DFLInitializer(), use_bias=False, padding='valid', name=get_name(name, f'integrate_{i}'), trainable=False)(boxes)
        boxes = tf.keras.layers.Lambda(lambda x: tf.reshape(x, (tf.shape(x)[0], -1, 4),), name=get_name(name, f'boxes_{i}_flatten'))(boxes)
        
        classes = depthwise_separable_convs(y,  max(y.shape[-1], min(n_classes, 128)), n_classes, n_convs, activation, get_name(name, f'prediction_class_{i}'))
        classes = tf.keras.layers.Lambda(lambda x: tf.reshape(x, (tf.shape(x)[0], -1, n_classes),), name=get_name(name, f'classes_{i}_flatten'))(classes)

        outputs.append((boxes, classes))

    
    return outputs

# Build model

You can use smaller model to make training easier

In [None]:
backbone = get_yolo_backbone(64, [3, 6, 9, 3], [3, 3], name='detection_backbone')

In [None]:
predictions = yolo_head(backbone.outputs, n_anchors_per_pixel=2, n_convs=2, n_reg=8, n_classes=1, name='detection_head', activation='silu') # don't forget to change the number of classes

In [None]:
model = tf.keras.Model(inputs=backbone.inputs, outputs=predictions, name='detection_model')

In [None]:
model.summary()

# Dataset

In [None]:
def iou(a: tf.Tensor, b: tf.Tensor) -> tf.Tensor:
    """Computes IOS between boxes"""

    xt = tf.maximum(a[..., 0], b[..., 0])
    yt = tf.maximum(a[..., 1], b[..., 1])
    xb = tf.minimum(a[..., 2], b[..., 2])
    yb = tf.minimum(a[..., 3], b[..., 3])

    intersection = tf.maximum(xb - xt, 0) * tf.maximum(yb - yt, 0)
    sa = (a[..., 2] - a[..., 0]) * (a[..., 3] - a[..., 1])
    sb = (b[..., 2] - b[..., 0]) * (b[..., 3] - b[..., 1])

    union = sa + sb - intersection

    return intersection / (union + 1e-10)

In [None]:
def atss(
    boxes: tf.Tensor | np.ndarray,
    labels: tf.Tensor | np.ndarray,
    anchors: list[tf.Tensor] | list[np.ndarray],
    k: int = 9
) -> tuple[list[tf.Tensor], list[tf.Tensor]]:
    """Adaptive Training Sample Selection
    
    Arguments:
        boxes: 2D array of ground-truth boxes specifications (N, 4) in xyxy format
        labels: 1D array of box labels (N, )
        anchors: list of 2D array anchor specifications (M, 4) in xyxy format  
        k: number of closest anchors in each level for ground-truth boxes     
    Returns:
        labels: list of 1D arrays of labels assigned to each anchor
        offsets: list of 2D arrays of offsests assigned to each anchor
    """
    
    boxes = tf.cast(boxes, tf.float32)
    anchors = [tf.cast(anchor, tf.float32) for anchor in anchors]
    
    boxes_centers = tf.stack([boxes[:, 2] + boxes[:, 0], boxes[:, 3] + boxes[:, 1]], axis=-1) / 2
    anchors_centers = [
        tf.stack([anchor[:, 2] + anchor[:, 0], anchor[:, 3] + anchor[:, 1]], axis=-1) / 2 
        for anchor in anchors
    ]

    distances_between_centers = [
        tf.norm(tf.expand_dims(boxes_centers, axis=1) - tf.expand_dims(anchor_centers, axis=0), axis=-1) 
        for anchor_centers in anchors_centers
    ]

    min_indices = [tf.math.top_k(-i, k=k, sorted=False)[1] for i in distances_between_centers]

    selected_anchors = tf.concat([
        tf.reshape(tf.gather(anchor, tf.reshape(i, (-1, ))), (-1, k, 4)) 
        for i, anchor in zip(min_indices, anchors)
    ], axis=1)

    selected_anchor_centers = tf.concat([
        tf.tile(tf.reshape(tf.gather(centers, tf.reshape(i, (-1, ))), (-1, k, 2)), (1, 1, 2))
        for i, centers in zip(min_indices, anchors_centers)
    ], axis=1)

    # not too efficient, but should be easier to understand

    expanded_boxes = tf.expand_dims(boxes, axis=1)
    l = selected_anchor_centers[..., 0] - expanded_boxes[..., 0]
    t = selected_anchor_centers[..., 1] - expanded_boxes[..., 1]
    r = expanded_boxes[..., 2] - selected_anchor_centers[..., 2]
    b = expanded_boxes[..., 3] - selected_anchor_centers[..., 3]

    is_in_box = tf.math.reduce_min(tf.stack([l, t, r, b], axis=-1), axis=-1) > 0
    
    ious = iou(selected_anchors, tf.expand_dims(boxes, axis=1))
    mean = tf.math.reduce_mean(ious, axis=-1)
    std  = tf.math.reduce_std(ious, axis=-1)

    threshold = mean + std

    mask = tf.math.logical_and(ious >= tf.expand_dims(threshold, axis=-1), is_in_box)

    labels = tf.tile(tf.expand_dims(labels, axis=-1), (1, len(anchors) * k))

    # normalize offsets, so it doesn't depend on the anchor size
    
    offsets = expanded_boxes - selected_anchor_centers
    offsets = offsets / tf.tile(tf.stack([selected_anchors[..., 2] - selected_anchors[..., 0], selected_anchors[..., 3] - selected_anchors[..., 1]], axis=-1), (1, 1, 2))

    # flatten everything for easier assignment
    
    anchors_per_level = [tf.shape(i)[0] for i in anchors]
    total_anchros = tf.math.reduce_sum(anchors_per_level)

    # adjust indices to index the flattened anchors
    min_indices = tf.stack(min_indices, axis=1) + tf.reshape(tf.math.cumsum(anchors_per_level, exclusive=True), (1, -1, 1))

    mask = tf.reshape(mask, (-1, ))
    labels = tf.reshape(labels, (-1, ))[mask]
    offsets = tf.reshape(offsets, (-1, 4))[mask]
    min_indices = tf.reshape(min_indices, (-1, 1))[mask]

    offsets = tf.split(tf.scatter_nd(min_indices, offsets, (total_anchros, 4)), anchors_per_level)
    labels = tf.split(tf.scatter_nd(min_indices, labels, (total_anchros,)), anchors_per_level)

    return offsets, labels

In [None]:
def make_anchors_model(size: int, scales: list[float], stride: int, name: str | None = None) -> tf.keras.Model:
    """Builds a model to obtain anchor specification for the given output sise

    Arguments:
        size: anchor size
        scales: anchor scales
        stride: anchor stride
        name: name of the model

    Returns:
        The model"""
    
    inputs = tf.keras.layers.Input((), batch_size=2, name=get_name(name, 'input'))
    
    shift_x = tf.keras.layers.Lambda(lambda x: tf.range(x[1], dtype=tf.int32) * stride, name=get_name(name, 'center_x'))(inputs)
    shift_y = tf.keras.layers.Lambda(lambda x: tf.range(x[0], dtype=tf.int32) * stride, name=get_name(name, 'center_y'))(inputs)
    
    answ_result = []
    for i, scale in enumerate(scales):
        anchor = int(-size // 2 * scale)
        
        coords_x = tf.keras.layers.Lambda(lambda x: x[0] + x[1], name=get_name(name, f'{i}/coords_x'))([shift_x, anchor])
        coords_y = tf.keras.layers.Lambda(lambda x: x[0] + x[1], name=get_name(name, f'{i}/coords_y'))([shift_y, anchor])

        x, y = tf.keras.layers.Lambda(lambda x: tf.meshgrid(x[0], x[1]), name=get_name(name, f'{i}/meshgrid'))([coords_x, coords_y])

        ss = int(size * scale)
        result = tf.keras.layers.Lambda(lambda x: tf.stack([x[0], x[1], x[0] + x[2], x[1] + x[2]], axis=-1), name=get_name(name, f'{i}/result'))([x, y, ss])

        last_result = tf.keras.layers.Lambda(lambda x: tf.reshape(x, (-1, 4)),name=get_name(name, f'{i}/flatten'))(result)
        
        answ_result.append(last_result)

    outputs = tf.keras.layers.Lambda(lambda x: tf.reshape(tf.concat(x, axis=1), (-1, 4)), name=get_name(name, f'output'))(answ_result)

    return tf.keras.Model(inputs=inputs, outputs=outputs, name=name)

In [None]:
anchors_models = [make_anchors_model(i, [1, 2], j) for i, j in zip([16, 64, 256], [8, 16, 32])] # adjust if need be

In [None]:
anchors = [model(np.array(output.shape[1:3])) for model, output in zip(anchors_models, backbone(np.zeros((1, 640, 640, 3))))]

Build you dataset. Images should be cropped and/or resized to a common size (don't forget to adjust boxes too, can some augmentations be applied?). Keep in mind that ground truth values are corner offsets relative to anchor center w.r.t anchor size and classes for each of the output levels

# Losses

Implement losses from the paper https://arxiv.org/pdf/2006.04388.pdf . Since tensorflow doesn't allow applying one loss on multiple outputs, implement losses as layers and use model output as loss

Loss between predicted and true boxes

In [None]:
class CIouLoss(tf.keras.losses.Layer):
    
    def __init__(self, eps: float = 1e-6, name: str | None = None):
        super().__init__(name=name)
        ...

    def call(self, y_true: tf.Tensor, y_pred: tf.Tensor) -> tf.Tensor:
        ...
        
    def get_config(self):
        ...

Loss between predicted and true labels

In [None]:
class FocalLoss(tf.keras.layers.Layer):
    
    def __init__(self, gamma: float = 2, name: str | None = None):
        super().__init__(name=name)
        ...
        
    def call(self, y_true: tf.Tensor, y_pred: tf.Tensor) -> tf.Tensor:
        ...
    
    def get_config(self):
        ...

Loss between predicted distance decomposition and actual (see article for details)

In [None]:
class DistributionFocalLoss(tf.keras.layers.Layer):
    
    def __init__(self, name: str | None = None):
        super().__init__(name=name)
        ...
        
    def call(self, y_true: tf.Tensor, y_pred: tf.Tensor) -> tf.Tensor:
        ...
    
    def get_config(self):
        ...

Combined loss. Class loss is applied for every anchor. The rest of the losses are applied for non-background anchors

In [None]:
class DetectionLoss(tf.keras.layers.Layer):
    
    def __init__(
        self,
        anchors: list[np.ndarray | tf.Tensor],
        strides: list[int],
        cls_weight: float,
        box_weight: float,
        dfl_weight: float,
        cls_loss: tf.keras.losses.Layer,
        box_loss: tf.keras.losses.Layer,
        dfl_loss: tf.keras.losses.Layer,
        name: str | None = None,
):
        super().__init__(name=name)
        ...
        
    def get_config(self):
        ...
          
    def call(self, y_true: list[tuple[tf.Tensor, tf.Tensor]], y_pred: list[tuple[tf.Tensor, tf.Tensor]]) -> tf.Tensor:
        ...

# Training

# Testing

Don't forget to use NMS during prediction [link](https://www.tensorflow.org/api_docs/python/tf/image/non_max_suppression_with_scores)

In [None]:
def predict(image: np.ndarray, detection_threshold: float, iou_threshold: float) -> tuple[np.ndarray, np.ndarray]:
    """Predicts boxes for the image.

    Returns:
        predicted boxes and their respective scores"""
    ...

Plot some detections on images not present in the dataset