目录
- 前言
- model
- BasicBlock 和Bottleneck
- ResNet
- ResNet18\34\50\101\152
- data
- train
- test
- 代码运行以及测试结果
前言
之前已经给大家详解解析了ResNet的原理,其是在什么背景下产生的,这对我们其实有很重要的意义,只有我们了解当时的研究情况,就不会觉得ResNet的出现会突兀,会给我们带来许多思考,如果没看过的,一定先去看看原理篇:一文带你看透什么是ResNet - carpell - 博客园
这里将带大家手撕ResNet代码,小白也没事,一样也能听懂看懂代码的运行逻辑,逻辑一定是最重要的,只有了解其逻辑才有可能自己来复现代码。我们这里就以花的图像分类为例子带大家来看看ResNet代码复现的细节。其与pytorch的官方代码是差不多的。如果可以的话,大家一定要自己尝试去复现一样,自己写的过程中更能够加深自己的理解。
全部的代码位于:fouen6/image_classification_ResNet: 基于resnet的图片分类(pytorch) 下载下来可直接运行复现(只需要配好pytorch环境就行了)
model
BasicBlock 和Bottleneck
首先我们来看ResNet最经典的残差结构,其有两种形式BasicBlock和Bottleneck,其中BasicBlock用于浅层网络,Bottleneck用于深层网络。具体的细节就不讲述了。我们来看代码现的细节。
首先来看BasicBlock部分的代码,在每一个BasicBlock块中,有两层卷积结构,并且都是3x3的卷积,然后还有我们用来处理梯度消失梯度爆炸的BN层,还有我们用来提取特征增加非线性变换的激活层,所以我们就能够很清晰的知道,每个BasicBlock的构成就是(卷积BN激活)x2,同时在输出时进行相加,保证恒等映射,即增加残差结构(out += identity)。还有个注意的地方,第二个激活层是要在(out += identity)之后的。这就是最主要的了,还有些别的细节,再来看,有个参数是expansion,这是干嘛的呢?就是在残差块中进行通道变换的,图中也能看出,输入与输出的通道都是64,所以在这expansion是为1的。这就不难看出后面的Bottleneck中的expansion参数就是为4了,即通道变成输入的4倍。- class BasicBlock(nn.Module):
- expansion = 1
- def __init__(self,in_planes,planes,stride=1,downsample=None,norm_layer=nn.BatchNorm2d):
- super(BasicBlock,self).__init__()
- self.conv1 = conv3x3(in_planes,planes,stride)
- self.bn1 = norm_layer(planes)
- self.relu = nn.ReLU(inplace=True)
- self.conv2 = conv3x3(planes,planes)
- self.bn2 = norm_layer(planes)
- self.downsample = downsample
- self.stride = stride
- def forward(self,x):
- identity = x
- out = self.conv1(x)
- out = self.bn1(out)
- out = self.relu(out)
- out = self.conv2(out)
- out = self.bn2(out)
- if self.downsample is not None:
- identity = self.downsample(x)
- out += identity
- out = self.relu(out)
- return out
复制代码 然后再来看Bottleneck部分的代码:组成与逻辑上都是与BasicBlock块相似的,一样的部分我就不说了,主要看不同的地方。首先经过一个1x1的卷积,作用是减少通道数,然后在3x3卷积提取特征,在经过1x1的卷积,增加通道数变回原来的通道。先降维再升维的目的就是降低参数量。所以逻辑上很清楚了,按照刚才所说的搭建整个Bottleneck部分就行了。- class Bottleneck(nn.Module):
- expansion = 4
- def __init__(self,in_planes,planes,stride=1,downsample=None,norm_layer=nn.BatchNorm2d):
- super(Bottleneck,self).__init__()
- self.conv1 = conv1x1(in_planes,planes,stride)
- self.bn1 = norm_layer(planes)
- self.conv2 = conv3x3(planes,planes,stride)
- self.bn2 = norm_layer(planes)
- self.conv3 = conv1x1(planes,planes * self.expansion)
- self.bn3 = norm_layer(planes * self.expansion)
- self.relu = nn.ReLU(inplace=True)
- self.downsample = downsample
- self.stride = stride
- def forward(self,x):
- identity = x
- out = self.conv1(x)
- out = self.bn1(out)
- out = self.relu(out)
- out = self.conv2(out)
- out = self.bn2(out)
- out = self.relu(out)
- out = self.conv3(out)
- out = self.bn3(out)
- if self.downsample is not None:
- identity = self.downsample(x)
- out += identity
- out = self.relu(out)
- return out
复制代码 有个参数downsample一直没讲,因为两部分都有,在这里着重讲一下。其作用就是下采样,当identity与out两者shape不同时,统一shape的。什么时候会用到这个参数呢?看图虚线的地方就代表发生了downsample。要知道我们是将ResNet模块化搭建的,会有着4个layer层,而每个layer层会有着多个的BasicBlock或者是Bottleneck,但是每个layer层的shape是不同的,是向下递减的,举个例子,比如第一个layer是112,那么后续就会变成56,28,14,逐渐下降的。注意这是shape逐渐下降的,但是通道是每个layer变多的。那么不同layer的层的shape不同,不同层之间如何进行恒等相加呢?所以这里就设置了downsample参数,并且其只在每个layer的第一个BasicBlock或者是Bottleneck使用到的。
ResNet
这部分就比较简单了,对于ResNet,首先经过一个初步的特征编码,7x7的卷积,BN层,激活层,池化层,然后就是四个layer层的搭建,最后就是全局池化和全连接层了。这是整体的架构的搭建,在来看一些细节的处理,对于我们所搭建的网络模型,我们肯定是需要进行参数的初始化的,对于卷积层的参数采用凯明初始化,BN层的参数初始化为权重为1,偏置为0。还有个不同的地方就是zero_init_residual的设置,我们将每个BasicBlock或者是Bottleneck的最后一个BN层的权重初始化为0。因为这样是能够对网络的性能有所提升的。还有make_layer函数的部分,我们上面说了只让每个layer的第一个BasicBlock或者是Bottleneck有downsample参数,那么如何控制呢?就是通过stride不为1来让其有downsample参数。- class ResNet(nn.Module):
- def __init__(self,block,layers,num_classes=1000,zero_init_residual=False,norm_layer=nn.BatchNorm2d):
- super(ResNet,self).__init__()
- self.in_planes = 64
- self.conv1 = nn.Conv2d(3,64,kernel_size=7,stride=1,padding=3,bias=False)
- self.bn1 = norm_layer(64)
- self.relu = nn.ReLU(inplace=True)
- self.maxpool = nn.MaxPool2d(kernel_size=3,stride=2,padding=1)
- self.layer1 = self._make_layer(block,64,layers[0],norm_layer=norm_layer)
- self.layer2 = self._make_layer(block,128,layers[1],stride=2,norm_layer=norm_layer)
- self.layer3 = self._make_layer(block,256,layers[2],stride=2,norm_layer=norm_layer)
- self.layer4 = self._make_layer(block,512,layers[3],stride=2,norm_layer=norm_layer)
- self.avgpool = nn.AdaptiveAvgPool2d((1,1))
- self.fc = nn.Linear(512*block.expansion,num_classes)
- for m in self.modules():
- if isinstance(m, nn.Conv2d):
- nn.init.kaiming_normal_(m.weight,mode='fan_out',nonlinearity='relu')
- if isinstance(m, nn.BatchNorm2d):
- nn.init.constant_(m.weight,1)
- nn.init.constant_(m.bias,0)
- if zero_init_residual:
- for m in self.modules():
- if isinstance(m, Bottleneck):
- nn.init.constant_(m.bn3.weight,0)
- if isinstance(m, BasicBlock):
- nn.init.constant_(m.bn2.weight,0)
- def _make_layer(self,block,planes,num_blocks,stride=1,norm_layer=nn.BatchNorm2d):
- downsample = None
- if stride != 1 or self.in_planes != planes *block.expansion:
- downsample = nn.Sequential(
- conv1x1(self.in_planes,planes * block.expansion,stride),
- norm_layer(planes * block.expansion),
- )
- layers = []
- layers.append(block(self.in_planes,planes,stride,downsample,norm_layer))
- self.in_planes = planes * block.expansion
- for i in range(1,num_blocks):
- layers.append(block(self.in_planes,planes))
- return nn.Sequential(*layers)
- def forward(self,x):
- x = self.conv1(x)
- x = self.bn1(x)
- x = self.relu(x)
- x = self.maxpool(x)
- x = self.layer1(x)
- x = self.layer2(x)
- x = self.layer3(x)
- x = self.layer4(x)
- x = self.avgpool(x)
- x = x.view(x.size(0),-1)
- x = self.fc(x)
- return x
复制代码 ResNet18\34\50\101\152
然后不同深度的ResNet网络,就是通过控制其是使用BasicBlock或者是Bottleneck,还有每层具体的数量来控制,所以我们可以搭建多个不同深度的ResNet模型。- def resnet18(pretrained=False,**kwargs):
- model = ResNet(BasicBlock,[2,2,2,2],**kwargs)
- if pretrained:
- model.load_state_dict(model_zoo.load_url(model_urls['resnet18']))
- return model
- def resnet34(pretrained=False,**kwargs):
- model = ResNet(BasicBlock,[3,4,6,4],**kwargs)
- if pretrained:
- model.load_state_dict(model_zoo.load_url(model_urls['resnet34']))
- return model
- def resnet50(pretrained=False,**kwargs):
- model = ResNet(BasicBlock,[3,4,6,3],**kwargs)
- if pretrained:
- model.load_state_dict(model_zoo.load_url(model_urls['resnet50']))
- return model
- def resnet101(pretrained=False,**kwargs):
- model = ResNet(BasicBlock,[3,4,23,3],**kwargs)
- if pretrained:
- model.load_state_dict(model_zoo.load_url(model_urls['resnet101']))
- return model
- def resnet152(pretrained=False,**kwargs):
- model = ResNet(BasicBlock,[3,8,36,3],**kwargs)
- if pretrained:
- model.load_state_dict(model_zoo.load_url(model_urls['resnet152']))
- return model
复制代码 data
文件: split.py
功能: 数据集划分脚本。将原始数据集 flower_photos 划分为 train 和 test 两个数据集,并更改图片size=224x224
数据集下载地址:http://download.tensorflow.org/example_images/flower_photos.tgz
数据集保存路径: 根目录 \ dataset \ flower_photos
首先我们要知道我们如何去处理数据的逻辑,首先我们得知道,我们该以什么样的比例来划分数据集,然后就是我们将其处理到224的大小来便于ResNet的处理提取,其实这一步有没有都行,后续训练数据处理的时候也可以进行。所以就是代码实现我们的逻辑就行了,确定划分比例为train:test=8:2。读取我们的文件夹数据,然后逐步处理图片大小,划分训练集测试集。- import os
- import glob
- import random
- import cv2
- import numpy as np
- if __name__ == '__main__':
- split_rate = 0.2 # 训练集和验证集划分比率
- resize_image = 224 # 图片缩放后统一大小
- file_path = './flower_photos' # 获取原始数据集路径
- # 找到文件中所有文件夹的目录,即类文件夹名
- dirs = glob.glob(os.path.join(file_path, '*'))
- dirs = [d for d in dirs if os.path.isdir(d)]
- print("Totally {} classes: {}".format(len(dirs), dirs)) # 打印花类文件夹名称
- for path in dirs:
- # 对每个类别进行单独处理
- path = path.split('\\')[-1] # -1表示以分隔符/保留后面的一段字符
- # 在根目录中创建两个文件夹,train/test
- os.makedirs("train\\{}".format(path), exist_ok=True)
- os.makedirs("test\\{}".format(path), exist_ok=True)
- # 读取原始数据集中path类中对应类型的图片,并添加到files中
- files = glob.glob(os.path.join(file_path, path, '*jpg'))
- files += glob.glob(os.path.join(file_path, path, '*jpeg'))
- files += glob.glob(os.path.join(file_path, path, '*png'))
- random.shuffle(files) # 打乱图片顺序
- split_boundary = int(len(files) * split_rate) # 训练集和测试集的划分边界
- for i, file in enumerate(files):
- img = cv2.imread(file)
- # 更改原始图片尺寸
- old_size = img.shape[:2] # (height, width)
- ratio = float(resize_image) / max(old_size) # 通过最长的size计算原始图片缩放比率
- # 把原始图片最长的size缩放到resize_pic,短的边等比率缩放,等比例缩放不会改变图片的原始长宽比
- new_size = tuple([int(x * ratio) for x in old_size])
- im = cv2.resize(img, (new_size[1], new_size[0])) # 更改原始图片的尺寸
- new_im = np.zeros((resize_image, resize_image, 3), dtype=np.uint8) # 创建一个resize_pic尺寸的黑色背景
- # 把新图片im贴到黑色背景上,并通过'地板除//'设置居中放置
- x_start = (resize_image - new_size[1]) // 2
- y_start = (resize_image - new_size[0]) // 2
- new_im[y_start:y_start + new_size[0], x_start:x_start + new_size[1]] = im
- # 打印处理进度
- print("Processing file {} of {}: {}".format(i + 1, len(files), file))
- # 先划分0.2_rate的测试集,剩下的再划分为0.8_ate的训练集,同时直接更改图片后缀为.jpg
- if i < split_boundary:
- cv2.imwrite(os.path.join("test\\{}".format(path),
- file.split('\\')[-1].split('.')[0] + '.jpg'), new_im)
- else:
- cv2.imwrite(os.path.join("train\\{}".format(path),
- file.split('\\')[-1].split('.')[0] + '.jpg'), new_im)
- # 统计划分好的训练集和测试集中.jpg图片的数量
- train_files = glob.glob(os.path.join('train', '*', '*.jpg'))
- test_files = glob.glob(os.path.join('test', '*', '*.jpg'))
- print("Totally {} files for train".format(len(train_files)))
- print("Totally {} files for test".format(len(test_files)))
复制代码 train
怎么写train文件,先确定我们整体的逻辑:首先设备的选择gpu确定,然后数据的处理读入,然后模型的搭建(这里有个细节就是,我们选择加载其预训练模型,但是其网络的结构是不对的,因为我们输出的只有5个类,原来有1000,所以我们先加载预训练模型,然后更换全连接层设置输出类别为我们自己的类别),以及优化器,损失函数的确定,接着就是每个epoch训练的逻辑,读取训练数据,模型预测,损失计算,梯度反传更新网络。训练数据结束后我们评估当前epoch的模型训练成果,进行评价,读取测试数据预测评估判断准确率。保存准确率最高的模型的参数。
以下就是将代码的实现的细节过程,知道了整体逻辑再去看代码,思路就会清晰很多,其实这是train文件的整体的设计思路,基本上都是按照这个逻辑来的,当然其中的细节你可以继续优化如何去做,但是整体上的逻辑就基本上都是这样的。再结合代码仔细看看。- import os
- import argparse
- import sys
- import torch.nn as nn
- import torch
- import time
- import torchvision.transforms as transforms
- import torchvision.datasets as datasets
- import json
- import torch.utils.data as Data
- from tqdm import tqdm
- from model import *
- import torch.optim as optim
- def get_argparse():
- parser = argparse.ArgumentParser()
- parser.add_argument('--epochs',type=int,default=10,help='number of epochs')
- parser.add_argument('--batch_size',type=int,default=8,help='batch size')
- parser.add_argument('--data_path',type=str,default='./dataset/',help='path to dataset')
- parser.add_argument('--model',type=str,default='resnet18',help='model name')
- parser.add_argument('--lr',type=float,default=0.001,help='learning rate')
- parser.add_argument('--save_dir',type=str,default='./checkpoint/',help='save .pth')
- parser.add_argument('--num_classes',type=int,default=5,help='number of classes')
- return parser
- def train(args):
- device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
- print('Use device:', device)
- train_transform = transforms.Compose([
- transforms.RandomResizedCrop(224),
- transforms.RandomHorizontalFlip(),
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
- ])
- val_transform = transforms.Compose([
- transforms.Resize(256), # 图像缩放
- transforms.CenterCrop(224), # 中心裁剪
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
- ])
- train_dataset = datasets.ImageFolder(root=os.path.join(args.data_path, 'train'), transform=train_transform)
- train_num = len(train_dataset)
- val_dataset = datasets.ImageFolder(root=os.path.join(args.data_path, 'test'), transform=val_transform)
- val_num = len(val_dataset)
- flower_list = train_dataset.class_to_idx
- class_dict = dict((val,key) for key,val in flower_list.items())
- json_str = json.dumps(class_dict,indent=4)
- with open('class_indices.json','w') as json_file:
- json_file.write(json_str)
- num_workers = min([os.cpu_count(), args.batch_size if args.batch_size > 1 else 0, 8])
- print("Using batch_size={} dataloader worker every process.".format(num_workers))
- train_loader = Data.DataLoader(train_dataset,batch_size=args.batch_size,shuffle=True,num_workers=num_workers)
- val_loader = Data.DataLoader(val_dataset,batch_size=args.batch_size,num_workers=num_workers,shuffle=False)
- print('Number of training images:{}, Number of validation images:{}'.format(train_num,val_num))
- model = get_model(args.model)
- num_ftrs = model.fc.in_features # 获取全连接层的输入特征数量
- model.fc = torch.nn.Linear(num_ftrs, len(flower_list)) # 修改输出维度为5
- model = model.cuda()
- loss_function = nn.CrossEntropyLoss()
- params = [p for p in model.parameters() if p.requires_grad]
- optimizer = optim.Adam(params,args.lr)
- batch_num = len(train_loader)
- total_time = 0
- best_acc = 0
- for epoch in range(args.epochs):
- start_time = time.perf_counter()
- model.train()
- train_loss = 0
- train_bar = tqdm(train_loader,file=sys.stdout)
- for step,data in enumerate(train_bar):
- train_images,train_labels = data
- train_images = train_images.to(device)
- train_labels = train_labels.to(device)
- optimizer.zero_grad()
- outputs = model(train_images)
- loss = loss_function(outputs,train_labels)
- loss.backward()
- optimizer.step()
- train_loss += loss.item()
- train_bar.desc = "train eopch[{}/{}] loss: {:.3f}".format(epoch+1,args.epochs,loss)
- model.eval()
- val_acc = 0
- var_bar = tqdm(val_loader,file=sys.stdout)
- with torch.no_grad():
- for val_data in var_bar:
- val_images, val_labels = val_data
- val_images = val_images.to(device)
- val_labels = val_labels.to(device)
- val_y=model(val_images)
- pred_y = torch.max(val_y,1)[1]
- val_acc += torch.eq(pred_y,val_labels).sum().item()
- var_bar.desc = "val eopch[{}/{}]".format(epoch+1,args.epochs)
- val_accurate = val_acc / val_num
- print("[epoch {:.0f}] train_loss: {:.3f} val_accuracy: {:.3f}".format(epoch+1,train_loss/batch_num,val_accurate))
- epoch_time = time.perf_counter()-start_time
- print("epoch_time:{}".format(epoch_time))
- total_time += epoch_time
- print()
- if val_accurate > best_acc:
- best_acc = val_accurate
- torch.save(model.state_dict(),os.path.join(args.save_dir,args.model+'_best.pth'))
- m,s = divmod(total_time,60)
- h,m = divmod(m,60)
- print("total time:{:0f}:{:0f}:{:0f}".format(h,m,s))
- print("Finished Training!")
- if __name__ == '__main__':
- args = get_argparse().parse_args()
- train(args)
复制代码 test
测试代码的就是测试我们的模型预测结果了。其实这里的逻辑实现就跟我们train里面的评价阶段是类似的。但是这里会多出可视化的步骤。具体的代码细节看看下面,你就能理解了。- import argparse
- import torch
- import torch.nn as nn
- from matplotlib import pyplot as plt
- from model import *
- import torchvision.transforms as transforms
- import cv2
- import os
- import json
- def get_argparse():
- parser = argparse.ArgumentParser()
- parser.add_argument('--data_path',type=str,default='./dataset/show',help='path to dataset')
- parser.add_argument('--model',type=str,default='resnet18',help='model name')
- parser.add_argument('--checkpoint',type=str,default='./checkpoint/resnet18_best.pth',help='checkpoint path')
- parser.add_argument('--num_classes',type=int,default=5,help='number of classes')
- return parser
- def main():
- args = get_argparse().parse_args()
- device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
- data_transform = transforms.Compose([
- transforms.ToPILImage(),
- transforms.Resize(256),
- transforms.CenterCrop(224),
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]),
- ])
- image_path = [os.path.join(args.data_path,f) for f in os.listdir(args.data_path) if f.endswith('.jpg')]
- image_path = image_path[:10]
- class_indict = json.load(open("./class_indices.json"))
- model = get_model(args.model)
- num_ftrs = model.fc.in_features # 获取全连接层的输入特征数量
- model.fc = torch.nn.Linear(num_ftrs, args.num_classes)
- model.load_state_dict(torch.load(args.checkpoint, map_location=device))
- model.to(device)
- model.eval()
- fig , axes = plt.subplots(2,5,figsize=(15,6))
- axes = axes.flatten()
- for idx, image_path in enumerate(image_path):
- img = cv2.imread(image_path)
- img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
- img = data_transform(img).unsqueeze(0)
- img = img.to(device)
- with torch.no_grad():
- output = model(img)
- predict = torch.softmax(output, dim=1)
- pred = torch.argmax(predict,dim=1).cpu().numpy()
- class_name = class_indict[str(pred[0])] # 获取类别名称
- prob = predict[0, pred[0]].item() # 获取预测概率
- print_res = "class: {} prob: {:.3f}".format(class_name, prob)
- # 显示图片和标题
- mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1).to(device)
- std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1).to(device)
- img_for_display = img.squeeze().mul(std).add(mean).permute(1, 2, 0).cpu().numpy() # 调整通道顺序并移动到 CPU
- axes[idx].imshow(img_for_display) # 显示图像
- axes[idx].set_title(print_res)
- axes[idx].axis('off') # 不显示坐标轴
- # 隐藏多余的子图(如果有)
- for idx in range(len(image_path), len(axes)):
- axes[idx].axis('off')
- plt.tight_layout()
- plt.show()
- if __name__ == '__main__':
- main()
复制代码 代码运行以及测试结果
如果你认真看到这,恭喜你,已经对如何利用ResNet模型去进行图片分类任务有了一个比较详细的认知了,以下就是我的代码的运行以及测试的结果。
完整运行代码地址(下载即可用):fouen6/image_classification_ResNet: 基于resnet的图片分类(pytorch)
代码的整体架构就是
首先运行split.py进行数据集的划分,然后就是运行train.py训练模型网络,最后可以通过test.py测试模型。按这个顺序可完整实现上述功能。如有讲的不够好不对的地方欢迎批评指正。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |