ML|Hung-yi Lee
date
Oct 19, 2024
slug
ml-hung-yi-lee
status
Published
tags
AI
Pytorch
type
Post
Introduction
Machine LearningLook For Function
- Regression: The function outputs a scalar
- Classification: Given options, the function outputs the correct one
- Structured Learning: Create something with structure
Steps to find the function(or ML)
- function with unknown
- define loss from training data
- optimization
Neuron and Neuron Network
我们可以用很多Sigmoid函数叠加,去拟合任何函数;而通过调整w、b和c,可以创建出我们想要的Sigmoid函数。每个Sigmoid函数是一个神经元
Hw1
Code
import pandas as pd import torch import torch.nn as nn from sklearn.feature_selection import SelectKBest, f_regression from torch.utils.data import Dataset, DataLoader import numpy as np import csv import os import matplotlib.pyplot as plt from matplotlib.pyplot import figure # Function to get the device def get_device(): return 'mps' if torch.backends.mps.is_available() else 'cpu' # Plot learning curve def plot_learning_curve(loss_record, title=''): total_steps = len(loss_record['train']) x_1 = range(total_steps) x_2 = x_1[::len(loss_record['train']) // len(loss_record['dev'])] # //表示整除,向下取整 figure(figsize=(6, 4)) plt.plot(x_1, loss_record['train'], c='tab:red', label='train') plt.plot(x_2, loss_record['dev'], c='tab:cyan', label='dev') plt.ylim(0.0, 5.0) plt.xlabel('Training steps') plt.ylabel('MSE loss') plt.title(f'Learning curve of {title}') plt.legend() plt.show() # Plot prediction results def plot_pred(dv_set, model, device, lim=35., preds=None, targets=None): if preds is None or targets is None: model.eval() preds, targets = [], [] for x, y in dv_set: x, y = x.to(device), y.to(device) with torch.no_grad(): pred = model(x) preds.append(pred.detach().cpu()) targets.append(y.detach().cpu()) preds = torch.cat(preds, dim=0).numpy() targets = torch.cat(targets, dim=0).numpy() figure(figsize=(5, 5)) plt.scatter(targets, preds, c='r', alpha=0.5) plt.plot([-0.2, lim], [-0.2, lim], c='b') plt.xlim(-0.2, lim) plt.ylim(-0.2, lim) plt.xlabel('ground truth value') plt.ylabel('predicted value') plt.title('Ground Truth vs Prediction') plt.show() # Feature selection function def feature_selection(): data = pd.read_csv(r'covid.train.csv') x = data[data.columns[1:94]] y = data[data.columns[94]] # Normalization x = (x - x.min()) / (x.max() - x.min()) # SelectKBest for feature selection bestfeatures = SelectKBest(score_func=f_regression) fit = bestfeatures.fit(x, y) # Convert scores to DataFrame for visualization dfscores = pd.DataFrame(fit.scores_) dfcolumns = pd.DataFrame(x.columns) featureScores = pd.concat([dfcolumns, dfscores], axis=1) featureScores.columns = ['Specs', 'Score'] # Return indices of top features top_rows = featureScores.nlargest(20, 'Score').index.tolist()[:17] return top_rows # Dataset class for COVID-19 dataset class COVID19Dataset(Dataset): def __init__(self, path, mode='train', target_only=True): self.mode = mode with open(path, 'r') as fp: data = list(csv.reader(fp)) data = np.array(data[1:])[:, 1:].astype(float) # Skip first row and column if not target_only: feats = list(range(93)) else: feats = feature_selection() if mode == 'test': data = data[:, feats] self.data = torch.FloatTensor(data) else: target = data[:, -1] data = data[:, feats] indices = [] if mode == 'train': indices = [i for i in range(len(data)) if i % 10 != 0] elif mode == 'dev': indices = [i for i in range(len(data)) if i % 10 == 0] self.data = torch.FloatTensor(data[indices]) self.target = torch.FloatTensor(target[indices]) self.data[:, 40:] = \ (self.data[:, 40:] - self.data[:, 40:].mean(dim=0, keepdim=True)) \ / self.data[:, 40:].std(dim=0, keepdim=True) self.dim = self.data.shape[1] print(f'Finished reading the {mode} set of COVID19 Dataset ({len(self.data)} samples found, each dim = {self.dim})') def __getitem__(self, index): if self.mode in ['train', 'dev']: return self.data[index], self.target[index] else: return self.data[index] def __len__(self): return len(self.data) # Prepare data loader def prep_dataloader(path, mode, batch_size, n_jobs=0, target_only=False): dataset = COVID19Dataset(path, mode=mode, target_only=target_only) dataloader = DataLoader( dataset, batch_size, shuffle=(mode == 'train'), drop_last=False, num_workers=n_jobs, pin_memory=True) return dataloader # Define Neural Network class NeuralNet(nn.Module): def __init__(self, input_dim): super(NeuralNet, self).__init__() self.net = nn.Sequential( nn.Linear(input_dim, 16), nn.BatchNorm1d(16), nn.Dropout(p=0.2), nn.ReLU(), nn.Linear(16, 1) ) self.criterion = nn.MSELoss(reduction='mean') def forward(self, x): return self.net(x).squeeze(1) def cal_loss(self, pred, target): regularization_loss = 0 for param in model.parameters(): regularization_loss += torch.sum(param ** 2) return self.criterion(pred, target) + 0.00075 * regularization_loss # Training function def train(tr_set, dv_set, model, config, device): n_epochs = config['n_epochs'] optimizer = getattr(torch.optim, config['optimizer'])(model.parameters(), **config['optim_hparas']) min_mse = 1000.0 loss_record = {'train': [], 'dev': []} early_stop_cnt = 0 epoch = 0 while epoch < n_epochs: model.train() #训练模式 (model.train()): # Dropout 是启用的,部分神经元会被随机丢弃。 # BatchNorm 使用的是当前批次的均值和方差进行归一化,并更新其内部的全局统计量。 #评估模式 (model.eval()): # Dropout 被禁用,所有神经元都参与计算。 # BatchNorm 使用的是训练过程中积累的全局均值和方差,而不是当前批次的数据。 for x, y in tr_set: optimizer.zero_grad() x, y = x.to(device), y.to(device) pred = model(x) mse_loss = model.cal_loss(pred, y) mse_loss.backward() optimizer.step() loss_record['train'].append(mse_loss.detach().cpu().item()) dev_mse = dev(dv_set, model, device) if dev_mse < min_mse: min_mse = dev_mse print(f'Saving model (epoch = {epoch + 1:4d}, loss = {min_mse:.4f})') torch.save(model.state_dict(), config['save_path']) early_stop_cnt = 0 else: early_stop_cnt += 1 epoch += 1 loss_record['dev'].append(dev_mse) if early_stop_cnt > config['early_stop']: break print(f'Finished training after {epoch} epochs') return min_mse, loss_record # Validation function def dev(dv_set, model, device): model.eval() total_loss = 0 for x, y in dv_set: x, y = x.to(device), y.to(device) with torch.no_grad(): pred = model(x) mse_loss = model.cal_loss(pred, y) total_loss += mse_loss.detach().cpu().item() * len(x) total_loss /= len(dv_set.dataset) return total_loss # Testing function def test(tt_set, model, device): model.eval() preds = [] for x in tt_set: x = x.to(device) with torch.no_grad(): pred = model(x) preds.append(pred.detach().cpu()) preds = torch.cat(preds, dim=0).numpy() return preds # Save predictions to CSV def save_pred(preds, file): print(f'Saving results to {file}') with open(file, 'w') as fp: writer = csv.writer(fp) writer.writerow(['id', 'tested_positive']) for i, p in enumerate(preds): writer.writerow([i, p]) # Main function if __name__ == '__main__': device = get_device() os.makedirs('models', exist_ok=True) target_only = True myseed = 42069 np.random.seed(myseed) torch.manual_seed(myseed) config = { 'n_epochs': 10000, 'batch_size': 256, 'optimizer': 'Adam', 'optim_hparas': { 'lr': 0.0005, }, 'early_stop': 1000, 'save_path': 'models/model.pth' } tr_path = 'covid.train.csv' # path to training data tt_path = 'covid.test.csv' tr_set = prep_dataloader(tr_path, 'train', config['batch_size'], target_only=target_only) dv_set = prep_dataloader(tr_path, 'dev', config['batch_size'], target_only=target_only) tt_set = prep_dataloader(tt_path, 'test', config['batch_size'], target_only=target_only) model = NeuralNet(tr_set.dataset.dim).to(device) #tr_set.dataset.dim = tr_set.dataset.data.shape[1] model_loss, model_loss_record = train(tr_set, dv_set, model, config, device) plot_learning_curve(model_loss_record, title='deep model') del model model = NeuralNet(tr_set.dataset.dim).to(device) ckpt = torch.load(config['save_path'], map_location=device) model.load_state_dict(ckpt) plot_pred(dv_set, model, device) preds = test(tt_set, model, device) save_pred(preds, 'pred.csv')
General Guide
graph TD A[Loss on Training Data] --> B[Large] A --> C[Small] B --> D[Model Bias] B --> F[Optimization] C --> H[Loss on Testing Data] D --> E[Make your model complex] H --> I[Large] H --> J[Small] I --> K[Overfitting] K --> L[More training data not in HWs] K --> M[Data Augmentation] K --> N[Make your model simpler] I --> O[Mismatch] E <-->|Trade-off| N
Optimization Issue
- Gaining the insights from comparison
- Start from shallower networks (or other models),which are easier to optimize.
- If deeper networks do not obtain smaller loss on training data, then there is optimization issue.
Solution: More powerful optimization technology
Local minima and Saddle point
把gradient=0的点称为critical point
可以用泰勒级数估计
,是梯度也是一阶偏导;H是Hessian矩阵,是二阶偏导,
- For all ,
- Around :
- Local minima
- For all ,
- Around :
- Local maxima
- Sometimes , sometimes
- Saddle point
此时如果把取特征向量,则
取特征值,则,。取,就可以脱离Saddle Point让L下降,但实际上不会用这种方法,计算量太大。
当参数很多的时候,Local minima是很少的,大多是Saddle Point;在高维下,总有路可以走。
Overfitting
Cross Validation
N-fold Cross Validation
Batch and Momentum
Small Batch and Large Batch
Momentum
Starting at 𝜽𝟎,Movement 𝒎𝟎 = 𝟎
Compute gradient 𝒈𝟎,Movement 𝒎𝟏 = λ𝒎𝟎− 𝜂𝒈𝟎,Move to 𝜽𝟏 = 𝜽𝟎 + 𝒎𝟏
Compute gradient 𝒈𝟏,Movement 𝒎𝟐 = λ𝒎𝟏− 𝜂𝒈𝟏,Move to 𝜽𝟐 = 𝜽𝟏 + 𝒎𝟐
Adaptive Learning Rate
Adagrad
RMSProp
增加了最近的梯度的权重,这样过去的梯度影响更小。
Adam=Adagrad+RMSProp
Warm Up
左侧是一般的Learning Rate,右侧是Warm Up。Warm Up可以使得模型在一开始用小步长探索信息,降低统计信息的方差
Batch Normalization
如果很大,很小,那么对L的贡献就会比大得多,就会出现左图的情况。通过Normalization,可以变成右图
有因为我们不可能考虑整个Network的平均值和标准差,因此只考虑一个Batch
Hw2
Code
import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader # For data preprocess import numpy as np import csv import os import gc myseed = 42069 # set a random seed for reproducibility np.random.seed(myseed) torch.manual_seed(myseed) # torch.backends.cudnn.deterministic = True # torch.backends.cudnn.benchmark = False if torch.cuda.is_available(): torch.cuda.manual_seed_all(myseed) train = np.load('train_11.npy')#得到function obj train_label = np.load('train_label_11.npy')#得到ndarray test = np.load('test_11.npy')#得到function obj class TIMITDataset(Dataset): def __init__(self, X, y=None): self.data = torch.from_numpy(X).float() if y is not None: y = y.astype(int) self.label = torch.LongTensor(y) else: self.label = None def __getitem__(self, idx): if self.label is not None: return self.data[idx], self.label[idx] else: return self.data[idx] def __len__(self): return len(self.data) VAL_RATIO = 0.2 percent = int(train.shape[0] * (1 - VAL_RATIO)) train_x, train_y, val_x, val_y = train[:percent], train_label[:percent], train[percent:], train_label[percent:] BATCH_SIZE = 64 train_set = TIMITDataset(train_x, train_y) val_set = TIMITDataset(val_x, val_y) train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True) #only shuffle the training data val_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=False) test_set = TIMITDataset(test, None) test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False) del train, train_label, train_x, train_y, val_x, val_y gc.collect() class Classifier(nn.Module): def __init__(self): super(Classifier,self).__init__() self.layer0 = nn.Linear(429, 2048) self.layer1 = nn.Linear(2048, 1024) self.layer2 = nn.Linear(1024, 512) self.layer3 = nn.Linear(512, 128) self.layer4 = nn.Linear(256, 128) self.out = nn.Linear(128, 39) self.relu = nn.ReLU() self.dropout = nn.Dropout(p=0.25) self.batchnorm0 = nn.BatchNorm1d(2048) self.batchnorm1 = nn.BatchNorm1d(1024) self.batchnorm2 = nn.BatchNorm1d(512) self.batchnorm3 = nn.BatchNorm1d(128) self.batchnorm4 = nn.BatchNorm1d(128) def forward(self, x): x = self.layer0(x) x = self.batchnorm0(x) x = self.relu(x) x = self.dropout(x) x = self.layer1(x) x = self.batchnorm1(x) x = self.relu(x) x = self.dropout(x) x = self.layer2(x) x = self.batchnorm2(x) x = self.relu(x) x = self.dropout(x) x = self.layer3(x) x = self.batchnorm3(x) x = self.relu(x) x = self.dropout(x) x = self.layer4(x) x = self.batchnorm4(x) x = self.relu(x) x = self.dropout(x) x = self.out(x) return x device='cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu' epochs= 40 learning_rate=0.0001 model_path = 'model.ckpt' model=Classifier().to(device) criterion=nn.CrossEntropyLoss() optimizer=torch.optim.RAdam(model.parameters(),lr=learning_rate) def train(): best_acc=0.0 for epoch in range(epochs): train_acc=0.0 train_loss=0.0 val_acc=0.0 val_loss=0.0 model.train() for i,data in enumerate(train_loader): inputs,labels=data inputs,labels=inputs.to(device),labels.to(device) optimizer.zero_grad() outputs=model(inputs) loss=criterion(outputs,labels) _, train_pred = torch.max(outputs, 1) loss.backward() optimizer.step() train_acc += (train_pred.cpu() == labels.cpu()).sum().item() train_loss+=loss.item() if len(val_set)>0: model.eval() with torch.no_grad(): for i,data in enumerate(val_loader): inputs,labels=data inputs=inputs.to(device) labels=labels.to(device) outputs=model(inputs) loss=criterion(outputs,labels) _, val_pred = torch.max(outputs, 1) val_acc += (val_pred.cpu() == labels.cpu()).sum().item() val_loss+=loss.item() print('Epoch [{}/{}], Train Accuracy: {:.2f}%, Train Loss: {:.4f}, Validation Accuracy: {:.2f}%, Validation Loss: {:.4f}'.format(epoch+1, epochs, train_acc/len(train_set)*100, train_loss, val_acc/len(val_set)*100, val_loss)) if val_acc>best_acc: best_acc=val_acc torch.save(model.state_dict(),model_path) print('Model Saved') else: print('Epoch [{}/{}], Train Accuracy: {:.2f}%, Train Loss: {:.4f}'.format(epoch+1, epochs, train_acc/len(train_set)*100, train_loss)) if len(val_set) == 0: torch.save(model.state_dict(), model_path) print('saving model at last epoch') def predict(): predict = [] model=Classifier().to(device) model.load_state_dict(torch.load(model_path)) model.eval() with torch.no_grad(): for i,data in enumerate(test_loader): input=data input=input.to(device) outputs=model(input) _, test_pred = torch.max(outputs, 1) # get the index of the class with the highest probability for y in test_pred.cpu().numpy(): predict.append(y) with open('prediction.csv', 'w') as f: f.write('Id,Class\n') for i, y in enumerate(predict): f.write('{},{}\n'.format(i, y)) if __name__ == '__main__': train() predict() print('done')
CNN
Pooling可以没有,比如AlphaGo使用了CNN,但没有Pooling
这部分内容可以看另一篇文章的CNN部分
Hw3
Code
# 导入必要的包 import os import numpy as np import torch import torch.nn as nn import torchvision.transforms as transforms from PIL import Image from torch.utils.data import DataLoader from torchvision.datasets import DatasetFolder from torchvision.models import resnet18 from tqdm.auto import tqdm # 设备配置 device = "cuda" if torch.cuda.is_available() else "cpu" # 训练数据的转换(包含数据增强) train_tfm = transforms.Compose([ transforms.RandomResizedCrop((128, 128)), transforms.RandomChoice( [transforms.AutoAugment(), transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10), transforms.AutoAugment(transforms.AutoAugmentPolicy.SVHN)] ), transforms.RandomHorizontalFlip(p=0.5), transforms.ColorJitter(brightness=0.5), transforms.RandomRotation(15), transforms.ToTensor(), ]) # 验证和测试数据的转换(不包含数据增强) test_tfm = transforms.Compose([ transforms.Resize((128, 128)), transforms.ToTensor(), ]) # 批次大小 batch_size = 32 test_batch_size = 512 # 确保传递文件路径给 Image.open def custom_loader(path): return Image.open(path).convert('RGB') # Construct datasets train_set = DatasetFolder( "food-11/training/labeled", loader=custom_loader, extensions="jpg", transform=train_tfm ) valid_set = DatasetFolder( "food-11/validation", loader=custom_loader, extensions="jpg", transform=test_tfm ) unlabeled_set = DatasetFolder( "food-11/training/unlabeled", loader=custom_loader, extensions="jpg", transform=train_tfm ) test_set = DatasetFolder( "food-11/testing", loader=custom_loader, extensions="jpg", transform=test_tfm ) # 构建数据加载器 train_loader = DataLoader( train_set, batch_size=batch_size, shuffle=True, pin_memory=True ) valid_loader = DataLoader( valid_set, batch_size=batch_size, shuffle=False, pin_memory=True ) test_loader = DataLoader( test_set, batch_size=batch_size, shuffle=False ) unlabel_loader = DataLoader( unlabeled_set, batch_size=batch_size, shuffle=False, pin_memory=True ) # 自定义伪标签数据集 class PseudoLabeledDataset(torch.utils.data.Dataset): def __init__(self, original_dataset): self.original_dataset = original_dataset self.pseudo_labels = [] def add_pseudo_label(self, img, label): self.pseudo_labels.append((img, label)) def __len__(self): return len(self.original_dataset) + len(self.pseudo_labels) def __getitem__(self, idx): if idx < len(self.original_dataset): return self.original_dataset[idx] else: return self.pseudo_labels[idx - len(self.original_dataset)] # 创建伪标签数据集 pseudo_dataset = PseudoLabeledDataset(train_set) class Classifier(nn.Module): def __init__(self): super(Classifier, self).__init__() # 使用 weights 参数代替 pretrained self.model = resnet18(weights='IMAGENET1K_V1') # 使用预训练的 ResNet18 # 替换最后一层全连接层以匹配类别数(11) num_ftrs = self.model.fc.in_features self.model.fc = nn.Linear(num_ftrs, 11) def forward(self, x): x = self.model(x) return x def get_pseudo_labels(model, dataloader, threshold=0.95): model.eval() softmax = nn.Softmax(dim=-1) for batch in tqdm(dataloader): imgs = batch[0] # batch[0] 是图像数据 batch_size = imgs.size(0) with torch.no_grad(): logits = model(imgs.cuda()) # 假设你使用 GPU probs = softmax(logits) for i in range(batch_size): if torch.max(probs[i]).item() > threshold: label = torch.argmax(probs[i]).item() # 将新样本存储在伪标签数据集中 pseudo_dataset.add_pseudo_label(imgs[i], label) print(f"New pseudo-labeled samples added.") model.train() # 初始化模型并放到设备上 model = Classifier().to(device) # 定义损失函数 criterion = nn.CrossEntropyLoss() # 初始化优化器(使用 RAdam) optimizer = torch.optim.RAdam(model.parameters(), lr=0.0001, weight_decay=1e-5) def train(): global train_loader # 训练轮数 n_epochs = 512 valid_acc_threshold = 0.8 valid_acc_last = 0 best_valid_acc = 0.0 epochs_no_improve = 0 n_epochs_stop = 32 # 早停法的耐心参数 for epoch in range(n_epochs): if valid_acc_last > valid_acc_threshold: valid_acc_threshold = valid_acc_last # 使用已训练的模型为未标注数据生成伪标签 if len(pseudo_dataset) != 9866: # 3080+6786 get_pseudo_labels(model, unlabel_loader) # 创建新的数据加载器进行训练 train_loader = DataLoader(pseudo_dataset, batch_size=batch_size, shuffle=True, pin_memory=True) model.train() train_loss = [] train_accs = [] for batch in tqdm(train_loader): imgs, labels = batch imgs = imgs.to(device) labels = labels.to(device) logits = model(imgs) loss = criterion(logits, labels) optimizer.zero_grad() loss.backward() nn.utils.clip_grad_norm_(model.parameters(), max_norm=10) optimizer.step() acc = (logits.argmax(dim=-1) == labels).float().mean() train_loss.append(loss.item()) train_accs.append(acc.item()) train_loss = sum(train_loss) / len(train_loss) train_acc = sum(train_accs) / len(train_accs) print(f"[ Train | {epoch + 1:03d}/{n_epochs:03d} ] loss = {train_loss:.5f}, acc = {train_acc:.5f}]") model.eval() valid_loss = [] valid_accs = [] for batch in tqdm(valid_loader): imgs, labels = batch imgs = imgs.to(device) labels = labels.to(device) with torch.no_grad(): logits = model(imgs) loss = criterion(logits, labels) acc = (logits.argmax(dim=-1) == labels).float().mean() valid_loss.append(loss.item()) valid_accs.append(acc.item()) valid_loss_epoch = sum(valid_loss) / len(valid_loss) valid_acc_epoch = sum(valid_accs) / len(valid_accs) valid_acc_last = valid_acc_epoch print(f"[ Valid | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss_epoch:.5f}, acc = {valid_acc_epoch:.5f}") if valid_acc_epoch > best_valid_acc: best_valid_acc = valid_acc_epoch torch.save(model.state_dict(), 'best_model.pth') print(f"Best model saved with accuracy: {best_valid_acc:.5f}") epochs_no_improve = 0 else: epochs_no_improve += 1 print(f'Epochs with no improvement: {epochs_no_improve}') if epochs_no_improve >= n_epochs_stop: print('Early stopping!') break def mytest(): model.load_state_dict(torch.load('best_model.pth')) model.eval() predictions = [] for batch in tqdm(test_loader): imgs, _ = batch imgs = imgs.to(device) with torch.no_grad(): logits = model(imgs) predictions.extend(logits.argmax(dim=-1).cpu().numpy().tolist()) with open("predict.csv", "w") as f: f.write("Id,Category\n") for i, pred in enumerate(predictions): f.write(f"{i},{pred}\n") if __name__ == "__main__": train() mytest()
ResNet18
from CSDN
import os import torch import torch.nn as nn os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" class BasicBlock(nn.Module): def __init__(self, in_channel, s): """ 基础模块, 共有两种形态, 1.s=1输入输出维度相同时 2.s=2特征图大小缩小一倍, 维度扩充一倍 :param in_channel: 输入通道数维度 :param s: s=1 不缩小 s=2 缩小尺度 """ super(BasicBlock, self).__init__() self.s = s self.conv1 = nn.Conv2d(in_channel, in_channel * s, kernel_size=3, stride=s, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(in_channel * s) self.relu = nn.ReLU(inplace=True) self.conv2 = nn.Conv2d(in_channel * s, in_channel * s, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(in_channel * s) if self.s == 2: self.downsample = nn.Sequential( nn.Conv2d(in_channel, in_channel * s, kernel_size=1, stride=2, bias=False), nn.BatchNorm2d(in_channel * s) ) def forward(self, x): identity = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) if self.s == 2: # 缩小 identity = self.downsample(x) out = out + identity out = self.relu(out) return out class ResNet18(nn.Module): def __init__(self, n_class, zero_init_residual=True): super(ResNet18, self).__init__() self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False) self.bn1 = nn.BatchNorm2d(64) self.relu = nn.ReLU(inplace=True) self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) self.layer1 = nn.Sequential( BasicBlock(in_channel=64, s=1), BasicBlock(in_channel=64, s=1), ) self.layer2 = nn.Sequential( BasicBlock(in_channel=64, s=2), BasicBlock(in_channel=128, s=1), ) self.layer3 = nn.Sequential( BasicBlock(in_channel=128, s=2), BasicBlock(in_channel=256, s=1), ) self.layer4 = nn.Sequential( BasicBlock(in_channel=256, s=2), BasicBlock(in_channel=512, s=1), ) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(512, n_class) # 初始化参数 -> 影响准确率 7% for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) # 初始化BasicBlock -> 影响准确率 1-2% if zero_init_residual: for m in self.modules(): if isinstance(m, BasicBlock): nn.init.constant_(m.bn2.weight, 0) def forward(self, x): x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.maxpool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.avgpool(x) x = x.view(x.size(0), -1) x = self.fc(x) return x if __name__ == '__main__': inputs = torch.rand(10, 3, 224, 224) model = ResNet18(n_class=9) print(model) outputs = model(inputs) print(outputs.shape)
Self-attention
Muti-head
Muti-Head Self-Attention是CNN Pro Max
Muti-Head=CNN里多个卷积核(决定输出通道数)
同时每个Head的Self-Attention又可以通过调整权重自定义考虑的范围,而不是CNN里的Kernal Size大小的方框
每个Head对于每个Pixel都会做Self-Attention,相当于Filter扫一次图片
Positional Encoding
No position information in self-attention.
- Each position has a unique positional vector 𝑒𝑖
- hand-crafted
- learned from data
Hw4
Code(没调参,稀烂)
import os import json import math import csv import random from pathlib import Path from tqdm import tqdm import torch import torch.nn as nn import torch.nn.functional as F from torch.optim import AdamW, Optimizer from torch.optim.lr_scheduler import LambdaLR from torch.utils.data import Dataset, DataLoader, random_split from torch.nn.utils.rnn import pad_sequence class myDataset(Dataset): """自定义数据集类,用于加载训练和验证数据。""" def __init__(self, data_dir, segment_len=128): self.data_dir = data_dir self.segment_len = segment_len # 加载说话人名称到ID的映射。 mapping_path = Path(data_dir) / "mapping.json" mapping = json.load(mapping_path.open()) self.speaker2id = mapping["speaker2id"] # 加载训练数据的元数据。 metadata_path = Path(data_dir) / "metadata.json" metadata = json.load(open(metadata_path))["speakers"] # 获取说话人总数。 self.speaker_num = len(metadata.keys()) self.data = [] for speaker in metadata.keys(): for utterances in metadata[speaker]: self.data.append([utterances["feature_path"], self.speaker2id[speaker]]) def __len__(self): return len(self.data) def __getitem__(self, index): feat_path, speaker = self.data[index] # 加载预处理的梅尔频谱图。 mel = torch.load(os.path.join(self.data_dir, feat_path)) # 将梅尔频谱图分割成固定长度的片段。 if len(mel) > self.segment_len: # 随机选择片段的起始点。 start = random.randint(0, len(mel) - self.segment_len) # 获取一个长度为segment_len的片段。 mel = torch.FloatTensor(mel[start:start+self.segment_len]) else: mel = torch.FloatTensor(mel) # 将说话人ID转换为long类型,方便后续计算损失。 speaker = torch.FloatTensor([speaker]).long() return mel, speaker def get_speaker_number(self): return self.speaker_num def collate_batch(batch): """整理一个批次的数据。""" mel, speaker = zip(*batch) # 对同一批次中的特征进行填充,使其长度相同。 mel = pad_sequence(mel, batch_first=True, padding_value=-20) # 填充值为很小的值。 # mel: (batch size, length, 40) return mel, torch.FloatTensor(speaker).long() def get_dataloader(data_dir, batch_size, n_workers): """生成数据加载器。""" dataset = myDataset(data_dir) speaker_num = dataset.get_speaker_number() # 将数据集分割为训练集和验证集。 trainlen = int(0.9 * len(dataset)) lengths = [trainlen, len(dataset) - trainlen] trainset, validset = random_split(dataset, lengths) train_loader = DataLoader( trainset, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=n_workers, pin_memory=True, collate_fn=collate_batch, ) valid_loader = DataLoader( validset, batch_size=batch_size, num_workers=n_workers, drop_last=True, pin_memory=True, collate_fn=collate_batch, ) return train_loader, valid_loader, speaker_num # 定义前馈模块 class FeedForwardModule(nn.Module): """Conformer中的前馈模块。""" def __init__(self, d_model, dim_feedforward, dropout): super(FeedForwardModule, self).__init__() self.seq = nn.Sequential( nn.LayerNorm(d_model), nn.Linear(d_model, dim_feedforward), nn.SiLU(), # 使用SiLU(Swish)激活函数 nn.Dropout(dropout), nn.Linear(dim_feedforward, d_model), nn.Dropout(dropout) ) def forward(self, x): return self.seq(x) # 定义卷积模块 class ConvolutionModule(nn.Module): """Conformer中的卷积模块。""" def __init__(self, d_model, kernel_size=31, dropout=0.1): super(ConvolutionModule, self).__init__() self.layer_norm = nn.LayerNorm(d_model) self.pointwise_conv1 = nn.Conv1d(d_model, 2 * d_model, kernel_size=1, padding=0) self.glu = nn.GLU(dim=1) self.depthwise_conv = nn.Conv1d(d_model, d_model, kernel_size=kernel_size, padding=(kernel_size - 1) // 2, groups=d_model) self.batch_norm = nn.BatchNorm1d(d_model) self.activation = nn.SiLU() # 使用SiLU(Swish)激活函数 self.pointwise_conv2 = nn.Conv1d(d_model, d_model, kernel_size=1, padding=0) self.dropout = nn.Dropout(dropout) def forward(self, x): # x: (seq_len, batch_size, d_model) x = x.permute(1, 0, 2) # (batch_size, seq_len, d_model) x = self.layer_norm(x) x = x.transpose(1, 2) # (batch_size, d_model, seq_len) x = self.pointwise_conv1(x) x = self.glu(x) # (batch_size, d_model, seq_len) x = self.depthwise_conv(x) x = self.batch_norm(x) x = self.activation(x) x = self.pointwise_conv2(x) x = self.dropout(x) x = x.transpose(1, 2) # (batch_size, seq_len, d_model) x = x.permute(1, 0, 2) # (seq_len, batch_size, d_model) return x # 定义Conformer块 class ConformerBlock(nn.Module): """结合前馈、注意力和卷积模块的Conformer块。""" def __init__(self, d_model, nhead, dim_feedforward=256, kernel_size=31, dropout=0.1): super(ConformerBlock, self).__init__() self.ffn1 = FeedForwardModule(d_model, dim_feedforward, dropout) self.mha = nn.MultiheadAttention(embed_dim=d_model, num_heads=nhead, dropout=dropout) self.conv_module = ConvolutionModule(d_model, kernel_size, dropout) self.ffn2 = FeedForwardModule(d_model, dim_feedforward, dropout) self.layer_norm = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout) def forward(self, x): # x: (seq_len, batch_size, d_model) residual = x x = residual + 0.5 * self.ffn1(x) residual = x x, _ = self.mha(x, x, x) x = residual + x x = x + self.conv_module(x) x = x + 0.5 * self.ffn2(x) x = self.layer_norm(x) return x # 定义分类器模型 class Classifier(nn.Module): def __init__(self, d_model=80, n_spks=600, num_layers=2, dropout=0.1): super().__init__() # 将输入特征的维度从40投影到d_model。 self.prenet = nn.Linear(40, d_model) # 使用多个Conformer块代替Transformer层 self.conformer_blocks = nn.ModuleList([ ConformerBlock(d_model=d_model, nhead=2, dim_feedforward=256, dropout=dropout) for _ in range(num_layers) ]) # 将特征维度从d_model投影到说话人数量。 self.pred_layer = nn.Sequential( nn.Linear(d_model, d_model), nn.ReLU(), nn.Linear(d_model, n_spks), ) def forward(self, mels): """ args: mels: (batch size, length, 40) return: out: (batch size, n_spks) """ # out: (batch size, length, d_model) out = self.prenet(mels) # out: (length, batch size, d_model) out = out.permute(1, 0, 2) # 通过Conformer块 for block in self.conformer_blocks: out = block(out) # out: (batch size, length, d_model) out = out.transpose(0, 1) # 平均池化 stats = out.mean(dim=1) # out: (batch, n_spks) out = self.pred_layer(stats) return out def get_cosine_schedule_with_warmup( optimizer: Optimizer, num_warmup_steps: int, num_training_steps: int, num_cycles: float = 0.5, last_epoch: int = -1, ): """ 创建一个学习率调度器,在预热期间学习率从0线性增加到初始值,然后按照余弦函数下降。 Args: optimizer (:class:`~torch.optim.Optimizer`): 优化器。 num_warmup_steps (:obj:`int`): 预热阶段的步数。 num_training_steps (:obj:`int`): 总的训练步数。 num_cycles (:obj:`float`, `optional`, defaults to 0.5): 余弦调度中的周期数。 last_epoch (:obj:`int`, `optional`, defaults to -1): 上一个epoch的索引。 Return: :obj:`torch.optim.lr_scheduler.LambdaLR` 学习率调度器。 """ def lr_lambda(current_step): # 预热阶段 if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) # 衰减阶段 progress = float(current_step - num_warmup_steps) / float( max(1, num_training_steps - num_warmup_steps) ) return max( 0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)) ) return LambdaLR(optimizer, lr_lambda, last_epoch) def model_fn(batch, model, criterion, device): """将一个批次的数据前向传播通过模型。""" mels, labels = batch mels = mels.to(device) labels = labels.to(device) outs = model(mels) loss = criterion(outs, labels) # 获取具有最高概率的说话人ID。 preds = outs.argmax(1) # 计算准确率。 accuracy = torch.mean((preds == labels).float()) return loss, accuracy def valid(dataloader, model, criterion, device): """在验证集上进行验证。""" model.eval() running_loss = 0.0 running_accuracy = 0.0 pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit=" uttr") for i, batch in enumerate(dataloader): with torch.no_grad(): loss, accuracy = model_fn(batch, model, criterion, device) running_loss += loss.item() running_accuracy += accuracy.item() pbar.update(dataloader.batch_size) pbar.set_postfix( loss=f"{running_loss / (i+1):.2f}", accuracy=f"{running_accuracy / (i+1):.2f}", ) pbar.close() model.train() return running_accuracy / len(dataloader) def parse_args(): """参数设置""" config = { "data_dir": "./Dataset", # 数据集路径 "save_path": "model.ckpt", # 模型保存路径 "batch_size": 32, # 批次大小 "n_workers": 8, # 数据加载器的工作线程数 "valid_steps": 2000, # 每隔多少步进行一次验证 "warmup_steps": 1000, # 学习率预热步数 "save_steps": 10000, # 每隔多少步保存一次模型 "total_steps": 70000, # 总训练步数 "model_path": "model.ckpt", # 推理时加载的模型路径 "output_path": "output.csv", # 推理结果输出路径 } return config def main(): """主函数。""" # 解析参数 config = parse_args() data_dir = config["data_dir"] save_path = config["save_path"] batch_size = config["batch_size"] n_workers = config["n_workers"] valid_steps = config["valid_steps"] warmup_steps = config["warmup_steps"] total_steps = config["total_steps"] save_steps = config["save_steps"] model_path = config["model_path"] output_path = config["output_path"] device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"[Info]: 使用 {device} 作为计算设备。") # 获取数据加载器和说话人数 train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers) train_iterator = iter(train_loader) print(f"[Info]: 数据加载完成。", flush=True) # 初始化模型、损失函数、优化器和学习率调度器 model = Classifier(n_spks=speaker_num).to(device) criterion = nn.CrossEntropyLoss() optimizer = AdamW(model.parameters(), lr=1e-3) scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps) print(f"[Info]: 模型创建完成。", flush=True) best_accuracy = -1.0 best_state_dict = None pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step") for step in range(total_steps): # 获取数据 try: batch = next(train_iterator) except StopIteration: train_iterator = iter(train_loader) batch = next(train_iterator) loss, accuracy = model_fn(batch, model, criterion, device) batch_loss = loss.item() batch_accuracy = accuracy.item() # 更新模型 loss.backward() optimizer.step() scheduler.step() optimizer.zero_grad() # 日志记录 pbar.update() pbar.set_postfix( loss=f"{batch_loss:.2f}", accuracy=f"{batch_accuracy:.2f}", step=step + 1, ) # 进行验证 if (step + 1) % valid_steps == 0: pbar.close() valid_accuracy = valid(valid_loader, model, criterion, device) # 保存最好的模型 if valid_accuracy > best_accuracy: best_accuracy = valid_accuracy best_state_dict = model.state_dict() pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step") # 保存模型 if (step + 1) % save_steps == 0 and best_state_dict is not None: torch.save(best_state_dict, save_path) pbar.write(f"步骤 {step + 1},最佳模型已保存。(准确率={best_accuracy:.4f})") pbar.close() # 最终保存模型 if best_state_dict is not None: torch.save(best_state_dict, save_path) print(f"[Info]: 训练完成,最佳模型已保存。(准确率={best_accuracy:.4f})") # 推理部分 # 加载测试数据集 class InferenceDataset(Dataset): def __init__(self, data_dir): testdata_path = Path(data_dir) / "testdata.json" metadata = json.load(testdata_path.open()) self.data_dir = data_dir self.data = metadata["utterances"] def __len__(self): return len(self.data) def __getitem__(self, index): utterance = self.data[index] feat_path = utterance["feature_path"] mel = torch.load(os.path.join(self.data_dir, feat_path)) return feat_path, mel def inference_collate_batch(batch): """整理一个批次的数据。""" feat_paths, mels = zip(*batch) return feat_paths, torch.stack(mels) # 进行推理 print("[Info]: 开始推理...", flush=True) mapping_path = Path(data_dir) / "mapping.json" mapping = json.load(mapping_path.open()) inference_dataset = InferenceDataset(data_dir) inference_dataloader = DataLoader( inference_dataset, batch_size=1, shuffle=False, drop_last=False, num_workers=n_workers, collate_fn=inference_collate_batch, ) print(f"[Info]: 推理数据加载完成,共有 {len(inference_dataset)} 条数据。", flush=True) # 加载训练好的模型 model = Classifier(n_spks=speaker_num).to(device) model.load_state_dict(torch.load(model_path)) model.eval() print(f"[Info]: 模型加载完成。", flush=True) # 进行预测 results = [["Id", "Category"]] for feat_paths, mels in tqdm(inference_dataloader): with torch.no_grad(): mels = mels.to(device) outs = model(mels) preds = outs.argmax(1).cpu().numpy() for feat_path, pred in zip(feat_paths, preds): results.append([feat_path, mapping["id2speaker"][str(pred)]]) # 保存结果到CSV文件 with open(output_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerows(results) print(f"[Info]: 推理完成,结果已保存到 {output_path}。") if __name__ == "__main__": main()
Transformer
Encoder
Decoder
Comparison
除了当中和Encoder输出Cross Attention的部分,和Encoder结构一样
Masked Self-attention
为了使得decoder不能看见未来的信息,也就是对于一个序列中的第i个token,解码的时候只能够依靠i时刻之前(包括i)的的输出,而不能依赖于i时刻之后的输出,我们要采取一个遮盖的方法(Mask)使得其在计算self-attention的时候只用i个时刻之前的token进行计算。因为Decoder是用来做预测的,而在训练预测能力的时候,我们不能够"提前看答案",因此要将未来的信息给遮盖住。
比如Decoder输入“器”的时候,只能看到“Start”、“机”和“器”
Autoregressive和Non-autoregressive
Autoregressive会在词库中加入一个END MARK,如果Decoder判断下一个词是END MARK,那么这个Sequence就结束了。
Non-autoregressive可以:
Another predictor for output length
Output a very long sequence, ignore tokens after END
好处是可以并行化,缺点是效果不如Autoregressive
Encoder-Decoder
Cross Attention
不一定要和Encoder最后一层的Output做Cross Attention
Training
训练的时候,Decoder看到的是Ground Truth,即真实值,而Testing的时候Decoder看到的是Decoder上一时刻的输出(左边是Training)
这样会有问题:训练的时候都用真实值,没见过噪声。那么在测试的时候如果某时刻的输出错了,那么之后整个序列都会爆炸。可以用Scheduled Sampling(把Decoder输出和Ground Truth一起喂)
左边是交叉熵,右边BLEU(越大越好)
BLEU没法微分,没法做梯度下降。When you don’t know how to optimize, just use
reinforcement learning (RL)!
Hw5
to do
感觉挺难的