【黑曜石】 五分钟让你会从零手写Pytorch【是不是有点标题党hh】

2022-03-25   359 次阅读


事不宜迟马上开始

一共有五步这样
1.自定义数据集、
2.加载自定义数据集、
3.网络模型结构定义、
4.[定义损失函数、定义优化器]、
5.[训练模型、测试模型、保存与加载模型]

定义数据集

我们就只需要用到这个Dataset父类

from torch.utils.data import Dataset

将其继承到子类中
而真正继承时就是

from torch.utils.data import Dataset

class MyDataset(Dataset):
    def __init__(self):
        # TODO
        # 1. Initialize file path or list of file names.
        pass
    def __len__(self):
        # TODO
        # 1. Read one data from file (e.g. using numpy.fromfile, PIL.Image.open).
        # 2. Preprocess the data (e.g. torchvision.Transform).
        # 3. Return a data pair (e.g. image and label).
        pass
     def __getitem__(self, index):
       # TODO
       # You should change 0 to the total size of your dataset.
       pass

__init__就是初始化一些filePath,实体化上之后马上运行的东西。不需要return
len 运动后取一个数据,还是要return的
__getitem__是取出来一个迭代器,分image和label,return出来就是[image,label]的这样

初始化的时候还可以考虑对数据进行微调

image_transform = transforms.Compose([
    transforms.Resize(256),               # 把图片resize为256*256
    transforms.RandomCrop(224),           # 随机裁剪224*224
    transforms.RandomHorizontalFlip(),    # 水平翻转
    transforms.ToTensor(),                # 将图像转为Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])   # 标准化
])

#然后在MyDataset的getitem啥的
if self.transform:
    image = self.transform(image)

读取自定义数据库

from torch.utils.data import DataLoader
# 下面是可选的一些参数
class torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False, sampler=None, num_workers=0, collate_fn=<function default_collate>, pin_memory=False, drop_last=False)

dataset (Dataset) – 加载数据的数据集。这个参数可以传递自定义的数据集对象
batch_size (int, optional) – 每个batch加载多少个样本(默认: 1)。
shuffle (bool, optional) – 设置为True时会在每个epoch重新打乱数据(默认: False).
sampler (Sampler, optional) – 定义从数据集中提取样本的策略。如果指定,则忽略shuffle参数。
num_workers (int, optional) – 用多少个子进程加载数据。0表示数据将在主进程中加载(默认: 0)
collate_fn (callable, optional) – 将一个list的sample组成一个mini-batch的函数
pin_memory (bool, optional) – 如果设置为True,那么data loader将会在返回它们之前,将tensors拷贝到CUDA中的固定内存(CUDA pinned memory)中.
drop_last (bool, optional) – 如果数据集大小不能被batch size整除,则设置为True后可删除最后一个不完整的batch。如果设为False并且数据集的大小不能被batch size整除,则最后一个batch将更小。(默认: False)。这个是对最后的未完成的batch来说的,比如你的batch_size设置为64,而一个epoch只有100个样本,那么训练的时候后面的36个就被扔掉了,如果为False(默认),那么会继续正常执行,只是最后的batch_size会小一点。
timeout(numeric, optional): 如果是正数,表明等待从worker进程中收集一个batch等待的时间,若超出设定的时间还没有收集到,那就不收集这个内容了。这个numeric应总是大于等于0。默认为0
worker_init_fn (callable, optional): 每个worker初始化函数 If not None, this will be called on each

一般的读取数据库的样子是这样的

mydataset = MyDataset()

dataloader = DataLoader(dataset=mydataset,batch_size=32,shuffle=True,num_workers=8,drop_last=True)

for batch_index ,(data,label) in enumerate(dataloader):
    data = Variable(data.cuda(0),requires_grad=False)
    label = Variable(label.cuda(0),requires_grad=False)

定义模型

import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

主要用nn.做cov和liner之类的做定义到forward里进行处理
主要用F. 做max_pool2d,relu,softmax等中间处理

定义损失函数

loss = nn.CrossEntropyLoss().cuda(output_device)

这就是定义了,只要取nn就行

损失函数可以选

torch.nn.L1Loss(size_average=True):创建一个衡量输入x(模型预测输出)和目标y之间差的绝对值的平均值的标准。
torch.nn.MSELoss(size_average=True):创建一个衡量输入x(模型预测输出)和目标y之间均方误差标准。
torch.nn.CrossEntropyLoss(weight=None, size_average=True):此标准将LogSoftMax和NLLLoss集成到一个类中,当训练一个多类分类器的时候,这个方法是十分有用的。
torch.nn.NLLLoss(weight=None, size_average=True):负的log likelihood loss损失。用于训练一个n类分类器。
torch.nn.NLLLoss2d(weight=None, size_average=True):对于图片的 negative log likehood loss。计算每个像素的 NLL loss。
torch.nn.KLDivLoss(weight=None, size_average=True):计算 KL 散度损失。KL散度常用来描述两个分布的距离,并在输出分布的空间上执行直接回归是有用的。
torch.nn.BCELoss(weight=None, size_average=True):计算 target 与 output 之间的二进制交叉熵。
torch.nn.MarginRankingLoss(margin=0, size_average=True)
torch.nn.HingeEmbeddingLoss(size_average=True):这个loss通常用来测量两个输入是否相似,即:使用L1 成对距离。典型是用在学习非线性 embedding或者半监督学习中。
torch.nn.MultiLabelMarginLoss(size_average=True):计算多标签分类的 hinge loss(margin-based loss) 。
torch.nn.SmoothL1Loss(size_average=True):平滑版L1 loss。
torch.nn.SoftMarginLoss(size_average=True):创建一个标准,用来优化2分类的logistic loss。
torch.nn.MultiLabelSoftMarginLoss(weight=None, size_average=True):创建一个标准,基于输入x和目标y的 max-entropy,优化多标签 one-versus-all 的损失。
torch.nn.CosineEmbeddingLoss(margin=0, size_average=True):此标准使用cosine距离测量两个输入是否相似,一般用来用来学习非线性embedding或者半监督学习。
torch.nn.MultiMarginLoss(p=1, margin=1, weight=None, size_average=True):用来计算multi-class classification的hinge loss(magin-based loss)

定义优化器

import torch.optim

具体的反向传播可以这样来用

# 定义损失函数
loss_fn = nn.CrossEntropyLoss().cuda(output_device)

# 定义优化器
optimizer = optim.Adam([var1, var2], lr = 0.0001)

for batch_index ,(data,label) in enumerate(dataloader):
    data = Variable(data.cuda(0),requires_grad=False)
    label = Variable(label.cuda(0),requires_grad=False)

    # 清空所有被优化过的梯度
    optimizer.zero_grad()

    # 执行模型forward
    pred_label = model(data)

    # 损失函数比较差异
    loss = loss_fn(pred_label, label)

    # 反向传播
    loss.backward()

    # 优化模型参数
    optimizer.step()
# setp才更新,要用step更新
# 虽然反向传播本身不用算全部loss[不用loss.backward()],但是这里不loss.backward()也不能更新,因为backward是把要更新的梯度一起算出来了

训练模型

def train(args, model, device, train_loader, optimizer, epoch):
    # 切换到train状态【最重要的一点,因为特别多模型trian和test不长一个样】
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # 从数据集获取数据以及对应标签
        data, target = data.to(device), target.to(device)
        # 清空所有优化过的梯度。不清空梯度必出错,因为不会自动清空
        optimizer.zero_grad()
        # 执行模型forward
        output = model(data)
        # 计算损失
        loss = torch.nn.functional.nll_loss(output, target)
        # 反向传播
        loss.backward()
        # 优化模型参数
        optimizer.step()

测试模型

def test(args, model, device, test_loader):
    # 切换到evaluation`模式
    model.eval()
    test_loss = 0
    correct = 0
    # 标志不进行反向传播
    with torch.no_grad():   # 这个no_grad特别重要,不然会浪费算力
        for data, target in test_loader:
            # 从数据集获取数据以及对应标签
            data, target = data.to(device), target.to(device)
            # 执行模型forward
            output = model(data)
            test_loss += torch.nn.functional.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            # 获取推理的最大可能标签
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

给你一个easystart

import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)


def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))


def test(args, model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


def main():
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                        help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                        help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                        help='SGD momentum (default: 0.5)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='how many batches to wait before logging training status')
    parser.add_argument('--save-model', action='store_true', default=False,
                        help='For Saving the current Model')

    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()
    torch.manual_seed(args.seed)
    device = torch.device("cuda:0" if use_cuda and torch.cuda.is_available() else "cpu")
    print(device)

    kwargs = {'num_workers': 4, 'pin_memory': True} if use_cuda else {}
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('./mnist', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=args.batch_size, shuffle=True, **kwargs)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST('./mnist', train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])),
        batch_size=args.test_batch_size, shuffle=True, **kwargs)

    model = Net().to(device)
    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    for epoch in range(1, args.epochs + 1):
        train(args, model, device, train_loader, optimizer, epoch)

    test(args, model, device, test_loader)

    if (args.save_model):
        torch.save(model.state_dict(), "mnist_cnn.pt")

if __name__ == '__main__':
    main()


Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

无论在未来前做什么,未来都会普通的到来