【卷积网络模型系列】ResNet50的实现(Pytorch+TensorFlow)

3,422 阅读5分钟

一、ResNet简单介绍

VGGNet的提出,说明了通过提升网络模型的深度,可以提高网络的表达能力,从AlexNet的7层,到VGGNet的16或者19层,再到GoogLeNet的22层。可后来我们发现深度CNN网络达到一定深度后再一味地增加层数并不能带来进一步地分类性能提高,反而会招致网络收敛变得更慢。如下图:56层简单堆叠的网络模型在训练和测试集上表现反而没有20层的效果好。因为非常非常深的神经网络是很难训练的,因为存在梯度消失和梯度爆炸问题。

而ResNets(残差网络)的提出,则能更好的解决这个模型层数加深后带来的精度下降的问题。ResNets中提出了跳跃连接(Skip connection),它可以从某一层网络层获取激活,然后迅速反馈给另外一层,甚至是神经网络的更深层。我们可以利用跳跃连接构建能够训练深度网络的ResNets,网络深度可以达到152层。


二、ResNet50结构介绍

Resnet50里面有两种残差块结构,

第一种输入输出的模型大小保持一致。如下图:

并且先经过1*1的卷积进行降低维度,然后再与3*3的核的进行卷积,最后再经过1*1的卷积恢复恢复,方便与输入连接。

第二种输入输出模型的大小不一致,用于下采样,并且降低feature map通道数时。具体结构如下图:


三、ResNet50具体实现

1.Pytorch实现

import torch
import torch.nn as nn
from torchvision.models import resnet50
from torchvision import transforms
from PIL import Image

Layers = [3, 4, 6, 3]

class Bottleneck(nn.Module):
    def __init__(self, in_channels, filters, stride=1, is_downsample = False):
        super(Bottleneck, self).__init__()
        filter1, filter2, filter3 = filters
        self.conv1 = nn.Conv2d(in_channels, filter1, kernel_size=1, stride=stride, bias=False)
        self.bn1 = nn.BatchNorm2d(filter1)
        self.conv2 = nn.Conv2d(filter1, filter2, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(filter2)
        self.conv3 = nn.Conv2d(filter2, filter3, kernel_size=1, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(filter3)
        self.relu = nn.ReLU(inplace=True)
        self.is_downsample = is_downsample
        self.parameters()
        if is_downsample:
            self.downsample = nn.Sequential(nn.Conv2d(in_channels, filter3, kernel_size=1, stride=stride, bias=False),
                                            nn.BatchNorm2d(filter3))


    def forward(self, X):
        X_shortcut = X
        X = self.conv1(X)
        X = self.bn1(X)
        X = self.relu(X)

        X = self.conv2(X)
        X = self.bn2(X)
        X = self.relu(X)

        X = self.conv3(X)
        X = self.bn3(X)

        if self.is_downsample:
            X_shortcut = self.downsample(X_shortcut)

        X = X + X_shortcut
        X = self.relu(X)
        return X


class ResNetModel(nn.Module):

    def __init__(self):
        super(ResNetModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(num_features=64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(64, (64, 64, 256), Layers[0])
        self.layer2 = self._make_layer(256, (128, 128, 512), Layers[1], 2)
        self.layer3 = self._make_layer(512, (256, 256, 1024), Layers[2], 2)
        self.layer4 = self._make_layer(1024, (512, 512, 2048), Layers[3], 2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(2048, 1000)
        # self.named_parameters()

    def forward(self, input):
        # print("--ResNetModel_1--forward--input.shape={}".format(input.shape))
        X = self.conv1(input)
        X = self.bn1(X)
        X = self.relu(X)
        X = self.maxpool(X)
        X = self.layer1(X)
        X = self.layer2(X)
        X = self.layer3(X)
        X = self.layer4(X)

        X = self.avgpool(X)
        X = torch.flatten(X, 1)
        X = self.fc(X)
        return X


    def _make_layer(self, in_channels, filters, blocks, stride = 1):
        layers = []
        block_one = Bottleneck(in_channels, filters, stride=stride, is_downsample=True)
        layers.append(block_one)
        for i in range(1, blocks):
            layers.append(Bottleneck(filters[2], filters, stride=1, is_downsample=False))

        return nn.Sequential(*layers)



#对图像的预处理(固定尺寸到224, 转换成touch数据, 归一化)
tran = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
])

if __name__ == '__main__':
    image = Image.open("tiger.jpeg")
    image = tran(image)
    image = torch.unsqueeze(image, dim=0)

    net = ResNetModel()
    # net = resnet50()
    # for name, parameter in net.named_parameters():
    #     print("name={},size={}".format(name, parameter.size()))
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    net = net.to(device)
    image = image.to(device)
    net.load_state_dict(torch.load("resnet50-19c8e357.pth"))  # 加载pytorch中训练好的模型参数
    net.eval()

    # x = torch.randn(2, 3, 32, 32)
    # out = net(x)
    # print('resnet:', out.shape)

    output = net(image)
    test, prop = torch.max(output, 1)
    synset = [l.strip() for l in open("synset.txt").readlines()]
    print("top1:", synset[prop.item()])

    preb_index = torch.argsort(output, dim=1, descending=True)[0]
    top5 = [(synset[preb_index[i]], output[0][preb_index[i]].item()) for i in range(5)]
    print(("Top5: ", top5))


最终预测结果如下:

2. TensorFlow实现

import math
import numpy as np
import tensorflow as tf
from functools import reduce
from tensorflow.contrib.slim.nets import inception

class ResNet50():
    def __init__(self, parameter_path=None):
        if parameter_path:
            self.parameter_dict = np.load(parameter_path, encoding="latin1").item()
        else:
            self.parameter_dict = {}
        self.is_training = True

    def set_training(self, is_training):
        self.is_training = is_training


    def bulid(self, image):
        RGB_MEAN = [103.939, 116.779, 123.68]
        # image_r, image_g, image_b = tf.split(value=image, num_or_size_splits=3, axis=3)
        # assert image_r.get_shape().as_list()[1:] == [224, 224, 1]
        # assert image_g.get_shape().as_list()[1:] == [224, 224, 1]
        # assert image_b.get_shape().as_list()[1:] == [224, 224, 1]
        with tf.variable_scope("preprocess"):
            mean = tf.constant(value=RGB_MEAN, dtype=tf.float32, shape=[1,1,1,3], name="preprocess_mean")
            image = image - mean

        self.conv1 = self._conv_layer(image, stride=2, filter_size=7, in_channels=3, out_channels=64, name="conv1")
        self.conv1_bn = self.batch_norm(self.conv1)
        self.conv1_relu = tf.nn.relu(self.conv1_bn)
        print("self.conv1_relu.shape={}".format(self.conv1_relu.get_shape()))

        self.pool1 = self._max_pool(self.conv1_relu, filter_size=3, stride=2)
        self.block1_1 = self._bottleneck(self.pool1, filters=(64, 64, 256), name="block1_1", channge_dimens=True)
        self.block1_2 = self._bottleneck(self.block1_1, filters=(64, 64, 256), name="block1_2", channge_dimens=False)
        self.block1_3 = self._bottleneck(self.block1_2, filters=(64, 64, 256), name="block1_3", channge_dimens=False)
        print("self.block1_3.shape={}".format(self.block1_3.get_shape()))

        self.block2_1 = self._bottleneck(self.block1_3, filters=(128, 128, 512), name="block2_1", channge_dimens=True, block_stride=2)
        self.block2_2 = self._bottleneck(self.block2_1, filters=(128, 128, 512), name="block2_2", channge_dimens=False)
        self.block2_3 = self._bottleneck(self.block2_2, filters=(128, 128, 512), name="block2_3", channge_dimens=False)
        self.block2_4 = self._bottleneck(self.block2_3, filters=(128, 128, 512), name="block2_4", channge_dimens=False)
        print("self.block2_4.shape={}".format(self.block2_4.get_shape()))

        self.block3_1 = self._bottleneck(self.block2_4, filters=(256, 256, 1024), name="block3_1", channge_dimens=True,
                                         block_stride=2)
        self.block3_2 = self._bottleneck(self.block3_1, filters=(256, 256, 1024), name="block3_2", channge_dimens=False)
        self.block3_3 = self._bottleneck(self.block3_2, filters=(256, 256, 1024), name="block3_3", channge_dimens=False)
        self.block3_4 = self._bottleneck(self.block3_3, filters=(256, 256, 1024), name="block3_4", channge_dimens=False)
        self.block3_5 = self._bottleneck(self.block3_4, filters=(256, 256, 1024), name="block3_5", channge_dimens=False)
        self.block3_6 = self._bottleneck(self.block3_5, filters=(256, 256, 1024), name="block3_6", channge_dimens=False)
        print("self.block3_6.shape={}".format(self.block3_6.get_shape()))

        self.block4_1 = self._bottleneck(self.block3_6, filters=(512, 512, 2048), name="block4_1", channge_dimens=True,
                                         block_stride=2)
        self.block4_2 = self._bottleneck(self.block4_1, filters=(512, 512, 2048), name="block4_2", channge_dimens=False)
        self.block4_3 = self._bottleneck(self.block4_2, filters=(512, 512, 2048), name="block4_3", channge_dimens=False)
        self.block4_4 = self._bottleneck(self.block4_3, filters=(512, 512, 2048), name="block4_4", channge_dimens=False)

        print("self.block4_4.shape={}".format(self.block4_4.get_shape()))
        self.pool2 = self._avg_pool(self.block4_4, filter_size=7, stride=1, )
        print("self.pool2.shape={}".format(self.pool2.get_shape()))
        self.fc = self._fc_layer(self.pool2, in_size=2048, out_size=1000, name="fc1200")

        return self.fc



    def _bottleneck(self, input, filters, name, channge_dimens, block_stride=1):
        filter1, filter2, filter3 = filters
        input_shortcut = input
        input_channel = input.get_shape().as_list()[-1]

        block_conv_1 = self._conv_layer(input, block_stride, 1, input_channel, filter1, name=name+"_Conv1")
        block_bn1 = self.batch_norm(block_conv_1)
        block_relu1 = tf.nn.relu(block_bn1)

        block_conv_2 = self._conv_layer(block_relu1, 1, 3, filter1, filter2, name=name + "_Conv2")
        block_bn2 = self.batch_norm(block_conv_2)
        block_relu2 = tf.nn.relu(block_bn2)

        block_conv_3 = self._conv_layer(block_relu2, 1, 1, filter2, filter3, name=name + "_Conv3")
        block_bn3 = self.batch_norm(block_conv_3)

        if channge_dimens:
            input_shortcut = self._conv_layer(input, block_stride, 1, input_channel, filter3, name=name+"_ShortcutConv")
            input_shortcut = self.batch_norm(input_shortcut)

        block_res = tf.nn.relu(tf.add(input_shortcut, block_bn3))

        return block_res



    def batch_norm(self, input):
        return tf.layers.batch_normalization(inputs=input, axis=3, momentum=0.99,
                                             epsilon=1e-12, center=True, scale=True,
                                             training=self.is_training)

    def _avg_pool(self, input, filter_size, stride, padding="VALID"):
        return tf.nn.avg_pool(input, ksize=[1, filter_size, filter_size, 1],
                              strides=[1, stride, stride, 1], padding=padding)

    def _max_pool(self, input, filter_size, stride, padding="SAME"):
        return tf.nn.max_pool(input, ksize=[1, filter_size, filter_size, 1],
                              strides=[1, stride, stride, 1], padding=padding)

    def _conv_layer(self, input, stride, filter_size, in_channels, out_channels, name, padding="SAME"):
        '''
        定义卷积层
        '''
        with tf.variable_scope(name):
            conv_filter, bias = self._get_conv_parameter(filter_size, in_channels, out_channels, name)
            conv = tf.nn.conv2d(input, filter=conv_filter, strides=[1, stride, stride, 1], padding=padding)
            conv_bias = tf.nn.bias_add(conv, bias)
            return conv_bias

    def _fc_layer(self, input, in_size, out_size, name):
        '''
        定义全连接层
        '''
        with tf.variable_scope(name):
            input = tf.reshape(input, [-1, in_size])
            fc_weights, fc_bais = self._get_fc_parameter(in_size, out_size, name)
            fc = tf.nn.bias_add(tf.matmul(input, fc_weights), fc_bais)
            return fc


    def _get_conv_parameter(self, filter_size, in_channels, out_channels, name):
        '''
        用于获取卷积层参数
        :param filter_size:  卷积核大小
        :param in_channel:    卷积核channel
        :param out_channel:   卷积输出的channel,也就是卷积核个数
        :param name:         当前卷积层name
        :return: 返回对应卷积核 和 偏置
        '''
        if name in self.parameter_dict:
            conv_filter_initValue = self.parameter_dict[name][0];
            bias_initValue = self.parameter_dict[name][1]
        else:
            conv_filter_initValue = tf.truncated_normal(shape=[filter_size, filter_size, in_channels, out_channels],
                                            mean=0.0, stddev=1 / math.sqrt(float(filter_size * filter_size)))
            bias_initValue = tf.truncated_normal(shape=[out_channels], mean=0.0, stddev=1.0)

        conv_filter_value = tf.Variable(initial_value=conv_filter_initValue, name=name+"_weights")
        bias = tf.Variable(initial_value=bias_initValue, name=name+"_biases")

        return conv_filter_value, bias

    def _get_fc_parameter(self, in_size, out_size, name):
        '''
        用于获取全连接层参数
        :param in_size:
        :param out_size:
        :param name:
        :return:
        '''
        if name in self.parameter_dict:
            fc_weights_initValue = self.parameter_dict[name][0]
            fc_bias_initValue = self.parameter_dict[name][1]
        else:
            fc_weights_initValue = tf.truncated_normal(shape=[in_size, out_size], mean=0.0,
                                                       stddev=1.0 / math.sqrt(float(in_size)))
            fc_bias_initValue = tf.truncated_normal(shape=[out_size], mean=0.0, stddev=1.0)

        fc_weights = tf.Variable(initial_value=fc_weights_initValue, name=name+"_weights")
        fc_bias = tf.Variable(initial_value=fc_bias_initValue, name=name+"_biases")
        return fc_weights, fc_bias

    def save_npy(self, sess, npy_path="./model/Resnet-save.npy"):
        """
        Save this model into a npy file
        """
        assert isinstance(sess, tf.Session)

        self.data_dict = None
        data_dict = {}

        for (name, idx), var in list(self.parameter_dict.items()):
            var_out = sess.run(var)
            if name not in data_dict:
                data_dict[name] = {}
            data_dict[name][idx] = var_out

        np.save(npy_path, data_dict)
        print(("file saved", npy_path))
        return npy_path

    def get_var_count(self):
        count = 0
        for v in list(self.parameter_dict.values()):
            count += reduce(lambda x, y: x * y, v.get_shape().as_list())
        return count


if __name__ == '__main__':
    input = tf.placeholder(dtype=tf.float32, shape=[1, 224, 224, 3], name="input")
    resnet = ResNet50()
    out_put = resnet.bulid(input)
    print(out_put.get_shape())

相关完整代码以及pytorch训练好的模型参数百度网盘下载,请关注我的公众号 AI计算机视觉工坊,回复【代码】获取。本公众号不定期推送机器学习,深度学习,计算机视觉等相关文章,欢迎大家和我一起学习,交流 。