【目标检测算法实现系列】Keras实现Faster R-CNN算法(二)

1,741 阅读6分钟

【目标检测算法实现系列】Keras实现Faster R-CNN算法(一)

上篇文章中,我们主要实现了相关数据的解析,预处理等准备工作,这次我们来搭建相关网络模块

一、共享网络模块搭建

我们使用VGG16网络模型的卷积模块(去掉最后一个池化层)作为共享网络,用来进行提取feature map。具体代码如下:

from keras.layers import Input, Conv2D, MaxPool2D, Flatten, Dense
from keras import backend as K
from keras_faster_rcnn import RoiPoolingConv


def base_net_vgg(input_tensor):
    if input_tensor is None:
        input_tensor = Input(shape=(None,None,3))
    else:
        if not K.is_keras_tensor(input_tensor):
            input_tensor = Input(tensor=input_tensor, shape=(None,None,3))

    #开始构造基础模型(VGG16的卷积模块),到block5_conv3层,用来提取feature map

    # Block 1
    X = Conv2D(filters=64, kernel_size=(3,3), activation="relu",
               padding="same", name="block1_conv1")(input_tensor)
    X = Conv2D(filters=64, kernel_size=(3, 3), activation="relu",
               padding="same", name="block1_conv2")(X)
    X = MaxPool2D(pool_size=(2,2), strides=(2,2), name="block1_pool")(X)

    # Block 2
    X = Conv2D(filters=128, kernel_size=(3, 3), activation="relu",
               padding="same", name="block2_conv1")(X)    X = Conv2D(filters=128, kernel_size=(3, 3), activation="relu",
               padding="same", name="block2_conv2")(X)
    X = MaxPool2D(pool_size=(2, 2), strides=(2, 2), name="block2_pool")(X)

    # Block 3
    X = Conv2D(filters=256, kernel_size=(3, 3), activation="relu",
               padding="same", name="block3_conv1")(X)    X = Conv2D(filters=256, kernel_size=(3, 3), activation="relu",
               padding="same", name="block3_conv2")(X)
    X = Conv2D(filters=256, kernel_size=(3, 3), activation="relu",
               padding="same", name="block3_conv3")(X)
    X = MaxPool2D(pool_size=(2, 2), strides=(2, 2), name="block3_pool")(X)

    # Block 4
    X = Conv2D(filters=512, kernel_size=(3, 3), activation="relu",
               padding="same", name="block4_conv1")(X)    X = Conv2D(filters=512, kernel_size=(3, 3), activation="relu",
               padding="same", name="block4_conv2")(X)
    X = Conv2D(filters=512, kernel_size=(3, 3), activation="relu",
               padding="same", name="block4_conv3")(X)
    X = MaxPool2D(pool_size=(2, 2), strides=(2, 2), name="block4_pool")(X)

    # Block 5
    X = Conv2D(filters=512, kernel_size=(3, 3), activation="relu",
               padding="same", name="block5_conv1")(X)    X = Conv2D(filters=512, kernel_size=(3, 3), activation="relu",
               padding="same", name="block5_conv2")(X)
    X = Conv2D(filters=512, kernel_size=(3, 3), activation="relu",
               padding="same", name="block5_conv3")(X)
    return X

二、RPN网络搭建

RPN网络的构造很简单,代码实现如下,具体原理部分,可以看下之前的文章【目标检测算法系列】四、Faster R-CNN算法

def rpn_net(shared_layers, num_anchors):
    '''
    RPN网络
    :param shared_layers: 共享层的输出,作为RPN网络的输入(也就是VGG的卷积模块提取出来的feature map)
    :param num_anchors:  feature map中每个位置所对应的anchor个数(这块为9个)
    :return:
    [X_class, X_regr, shared_layers]:分类层输出(二分类,这块使用sigmoid),回归层输出,共享层
    '''
    X = Conv2D(512, (3,3), padding="same", activation="relu",
               kernel_initializer="normal", name="rpn_conv1")(shared_layers)
    #采用多任务进行分类和回归
    X_class = Conv2D(num_anchors, (1,1), activation="sigmoid",
                     kernel_initializer="uniform", name="rpn_out_class")(X)
    X_regr = Conv2D(num_anchors*4, (1,1), activation="linear",
                    kernel_initializer="zero",name="rpn_out_regress")(X)
    return [X_class, X_regr, shared_layers]

三、自定义ROI Pooling层

Keras框架中并没有现成的ROI pooling层来直接使用,需要我们自定义,创建RoiPoolingConv类,继承Layer,通过重写相关方法来实现ROI pooling层的相关逻辑,具体代码如下:

'''
自定义ROI池化层
'''

from keras.engine.topology import Layer
import keras.backend as K
import tensorflow as tf
import numpy as np

class RoiPoolingConv(Layer):
    '''
    自定义ROIPooling层
    '''
    def __init__(self, pool_size, num_rois, **kwargs):

        self.pool_size = pool_size
        self.num_rois = num_rois
        self.dim_ordering = "tf"

        super(RoiPoolingConv, self).__init__(**kwargs)

    def build(self, input_shape):
        self.nb_channles = input_shape[0][3]


    def compute_output_shape(self, input_shape):
        '''
        在compute_output_shape方法中实现ROIPooling层的输出
        :param input_shape:
        :return:
        '''
        # 输出5个维度,分别为:[一个batch中的样本个数(图片个数),一个样本对应roi个数,
        #                      每个roi高度,每个roi宽度,通道数]
        return None, self.num_rois, self.pool_size, self.pool_size, self.nb_channles

    def call(self, x, mask=None):
        '''
        在call方法中实现ROIpooling层的具体逻辑
        :param x:
        :param mask:
        :return:
        '''
        # x 即为传入的模型的输入
        assert(len(x) == 2)

        feature_map = x[0]  #feature map
        rois = x[1]  #输入的所有roi shape=(batchsize, None, 4),最后一维4,代表着对应roi在feature map中的四个坐标值(左上点坐标和宽高)

        input_shape = K.shape(feature_map)
        roi_out_put = []
        for roi_index in range(self.num_rois):
            # print("roi_index=={}".format(roi_index))
            x = rois[0, roi_index, 0]
            y = rois[0, roi_index, 1]
            w = rois[0, roi_index, 2]
            h = rois[0, roi_index, 3]

            x = K.cast(x, 'int32')
            y = K.cast(y, 'int32')
            w = K.cast(w, 'int32')
            h = K.cast(h, 'int32')
            one_roi_out = tf.image.resize_images(feature_map[:, y:y+h, x:x+w, :], (self.pool_size, self.pool_size))
            roi_out_put.append(one_roi_out)

        roi_out_put = tf.reshape(roi_out_put, (self.num_rois, self.pool_size, self.pool_size, self.nb_channles))
        roi_out_put = tf.expand_dims(roi_out_put, axis=0)
        return roi_out_put

我们来对定义好的RoiPoolingConv进行测试看看:

if __name__ == '__main__':
    batch_size = 2
    img_height = 200
    img_width = 100
    n_channels = 1
    n_rois = 2
    pooled_size = 7
    feature_maps_shape = (batch_size, img_height, img_width, n_channels)
    feature_maps_tf = tf.placeholder(tf.float32, shape=feature_maps_shape)
    feature_maps_np = np.ones(feature_maps_tf.shape, dtype='float32')
    print(f"feature_maps_np.shape = {feature_maps_np.shape}")
    roiss_tf = tf.placeholder(tf.float32, shape=(batch_size, n_rois, 4))
    roiss_np = np.asarray([[[50, 40, 30, 90], [0, 0, 100, 200]], [[50, 40, 30, 90], [0, 0, 100, 200]]],
                          dtype='float32')
    print(f"roiss_np.shape = {roiss_np.shape}")
    # 创建ROI Pooling层
    roi_layer = RoiPoolingConv(pooled_size, 2)
    pooled_features = roi_layer([feature_maps_tf, roiss_tf])
    print(f"output shape of layer call = {pooled_features.shape}")
    # Run tensorflow session
    with tf.Session() as session:
        result = session.run(pooled_features,
                             feed_dict={feature_maps_tf: feature_maps_np,
                                        roiss_tf: roiss_np})

    print(f"result.shape = {result.shape}")

四、最后的检测网络(包含ROI池化层 和 全连接层)

def roi_classifier(shared_layers, input_rois, num_rois, nb_classes=21):
    '''
    最后的检测网络(包含ROI池化层 和 全连接层),进行最终的精分类和精回归
    :param shared_layers: 进行特征提取的基础网络(VGG的卷积模块)
    :param input_rois:  roi输入 shape=(None, 4)
    :param num_rois:  roi数量
    :param nb_classes: 总共的待检测类别,需要算上 背景类
    :return:  [out_class, out_regr]:最终分类层输出和回归层输出
    '''
    #ROI pooling层
    print("roi_classifier")
    pooling_regions = 7
    roi_pool_out = RoiPoolingConv.RoiPoolingConv(pooling_regions, num_rois)([shared_layers, input_rois])

    #全连接层
    out = TimeDistributed(Flatten(name="flatten"))(roi_pool_out)
    out = TimeDistributed(Dense(4096, activation="relu", name="fc1"))(out)
    out = TimeDistributed(Dense(4096, activation="relu", name="fc2"))(out)

    out_class = TimeDistributed(Dense(nb_classes, activation="softmax",  kernel_initializer='zero'),name='dense_class_{}'.format(nb_classes))(out)
    out_regr = TimeDistributed(Dense(4 * (nb_classes-1), activation="linear",  kernel_initializer='zero'), name='dense_regress_{}'.format(nb_classes))(out)
    print(K.shape(out_class), K.shape(out_regr))
    return [out_class, out_regr]   

五、自定义相关损失函数

1. RPN网络回归层的损失函数

def rpn_regr_loss(num_anchors):
    '''
    计算RPN网络回归的损失
    :param num_anchors:
    :return:
    '''
    def rpn_loss_regr_fixed_num(y_true, y_pred):
        '''
        对应实际实现的计算RPN网络回归的损失方法
        :param y_true:
            即为之前构造的rpn回归层的标签Y值,shape=(batch_size,height,width, num_anchors*4*2)
            对于y_true来说,最后一个通道的,前4 * num_anchors为是否是正例样本的标记,
            后4 * num_anchors 为实际样本对应真实值。所有最后一个通道个数总共为num_anchors*4*2
        :param y_pred:
            即为样本X经过basenet-rpn网络回归层后的输出值,shape=(batch_size,height,width, num_anchors*4)
        :return:
        '''
        # 这块主要,对于y_true来说,最后一个通道的,前4 * num_anchors为是否是正例正样本的标记,
        # 后4 * num_anchors 为实际样本对应真实值。
        x = y_true[:, :, :, 4 * num_anchors:] - y_pred
        x_abs = K.abs(x)
        x_bool = K.cast(K.less_equal(x_abs, 1.0), tf.float32)

        return lambda_rpn_regr * K.sum(
            y_true[:, :, :, :4 * num_anchors] * (x_bool * (0.5 * x * x) + (1 - x_bool) * (x_abs - 0.5))) / K.sum(
            epsilon + y_true[:, :, :, :4 * num_anchors])


    return rpn_loss_regr_fixed_num

2. RPN网络分类层的损失函数

def rpn_cls_loss(num_anchors):
    '''
    计算RPN网络分类的损失
    :param num_anchors:
    :return:
    '''
    def rpn_loss_cls_fixed_num(y_true, y_pred):
        # y_true最后一维是2*num_anchors,其中前num_anchors个用来标记对应anchor是否为丢弃不进行训练的anchor
        # 后num_anchors个数据才是真正表示对应anchor是正样本还是负样本
        return lambda_rpn_class * K.sum(
            y_true[:, :, :, :num_anchors] * K.binary_crossentropy(y_pred[:, :, :, :],y_true[:, :, :,num_anchors:])) \
               / K.sum(epsilon + y_true[:, :, :, :num_anchors])
    return rpn_loss_cls_fixed_num

3. 整个网络最后的回归层对应的损失函数

def rpn_cls_loss(num_anchors):
    '''
    计算RPN网络分类的损失
    :param num_anchors:
    :return:
    '''
    def rpn_loss_cls_fixed_num(y_true, y_pred):
        # y_true最后一维是2*num_anchors,其中前num_anchors个用来标记对应anchor是否为丢弃不进行训练的anchor
        # 后num_anchors个数据才是真正表示对应anchor是正样本还是负样本
        return lambda_rpn_class * K.sum(
            y_true[:, :, :, :num_anchors] * K.binary_crossentropy(y_pred[:, :, :, :],y_true[:, :, :,num_anchors:])) \
               / K.sum(epsilon + y_true[:, :, :, :num_anchors])
    return rpn_loss_cls_fixed_num

4. 整个网络最后的分类层对应的损失函数

def final_cls_loss(y_true, y_pred):
    '''
        计算整个网络最后的分类层对应的损失,直接使用softmax对应的多分类损失函数
        :param y_true:
        :param y_pred:
        :return:
    '''
    return lambda_cls_class * K.mean(categorical_crossentropy(y_true[0, :, :], y_pred[0, :, :]))

至此,Faster RCNN中各个独立模块都已建立,下次,我们将实现建立RPN网络与ROI Pooing层之前的连接。

未完待续


相关本章完整代码以及VOC2102数据集百度网盘下载,请关注我自己的公众号 AI计算机视觉工坊,回复【代码】和【数据集】获取。本公众号不定期推送机器学习,深度学习,计算机视觉等相关文章,欢迎大家和我一起学习,交流。