Pytorch深度学习实践
date
Sep 29, 2024
slug
Pytorch-Basic-Knowledge
status
Published
tags
AI
Pytorch
summary
type
Post
Basic Knowledge线性代数Hadamard product范数微积分方向导数梯度混淆矩阵Overall AccuracyAverage accuracyRecallPrecisionF1PR曲线置信度绘图epoch、batch(mini-batch)、step、iterationTensorOverview线性模型梯度下降批梯度下降 Batch Gradient Descent 随机梯度下降 Stochastic Gradient Descent反向传播y.backward()为什么 y 是标量时不需要传入同型向量,而 y 是向量时需要?1. 标量的情况2. 向量的情况线性回归1. Prepare dataset2. Design Model3. Construct Loss and Optimizer4. Training Cycle完整代码Logistic RegressionMultiple Dimension InputDataset and DataLoaderSoftmax ClassifierCNNConvolutionPaddingStrideMaxPoolingExample1x1 ConvolutionInception ModuleResidual Network权重共享RNNRNN CellRNNOne-Hot VectorsExampleRNN CellRNN ModuleEmbeddingLSTMGRUBidirectionalExample 根据人名判断国家数据处理pack_padded_sequence完整代码结束
Basic Knowledge
线性代数
默认会基本的向量、矩阵乘法、转置
Hadamard product
表示为,将矩阵对应位置元素相乘
范数
向量的范数表示一个向量有多大(不涉及维度,而是分量的大小)
他是一个将向量映射到标量的函数,具有以下性质:
当且仅当分量都为0时,
范数:
矩阵的Frobenius范数:
微积分
方向导数
方向导数为函数在某一个方向上的导数,具体地,定义平面上一点以及单位向量,在曲面上,从点出发,沿方向走单位长度后,函数值为
则点处方向的方向导数为:
上面推导中使用了链式法则。其中,和分别为函数在位置的偏导数。由上面的推导可知:
该位置处,任意方向的方向导数为偏导数的线性组合,系数为该方向的单位向量。当该方向与坐标轴正方向一致时,方向导数即偏导数,换句话说,偏导数为坐标轴方向上的方向导数,其他方向的方向导数为偏导数的合成。
写成向量形式,偏导数构成的向量为,称之为梯度。
梯度
梯度,写作,二元时为,多元时为。
我们继续上面方向导数的推导,处方向上的方向导数为
其中,为与的夹角,显然,当即与梯度同向时,方向导数取得最大值,最大值为梯度的模,当即与梯度反向时,方向导数取得最小值,最小值为梯度模的相反数。此外,根据上面方向导数的公式可知,在夹角时方向导数为正,表示该方向函数值上升,时方向导数为负,表示该方向函数值下降。
至此,方向有了梯度的几何意义:
- 当前位置的梯度方向,为函数在该位置处方向导数最大的方向,也是函数值上升最快的方向,反方向为下降最快的方向;
- 当前位置的梯度长度(模),为最大方向导数的值。
假设为n维向量,在对多元函数求微分时经常使用以下规则
- 对于所有,都有
- 对于所有,都有
- 对于所有,都有
对任何矩阵,有
对于第二个结论的证明如下:
假设,,我们要对这个表达式关于求导。
首先,是一个的行向量,其表达式可以写成:
这可以展开成:
我们现在对关于求导,即要求。根据向量的梯度定义,梯度结果是一个矩阵。
首先考虑每个分量。对于第列,即:
对每个求偏导数:
这样,我们可以得到每个分量的偏导。为了更直观地表达,我们将整个梯度矩阵写出来:
由于每个,所以整个梯度矩阵实际上就是本身。也就是说,
从矩阵形式可以看出,对于每个,其偏导数等于矩阵的对应行。因此,梯度结果就是这个矩阵本身。
综上,我们证明了:
其他几个式子的证明方式类似,都可以用定义推出。
混淆矩阵
混淆矩阵(也称误差矩阵)是机器学习和深度学习中表示精度评价的一种标准格式,常用n行n列的矩阵形式来表示。其列代表的是预测的类别,行代表的是实际的类标,以一个常见的二分类的混淆矩阵为例。我们会发现二分类的混淆矩阵包括TP, FP, FN, TN,其中TP为True Positive,True代表实际和预测相同,Positive代表预测为正样本。同理可得,False Positive (FP)代表的是实际类别和预测类标不同,并且预测类别为正样本,实际类别为负样本;False Negative (FN)代表的是实际类别和预测类标不同,并且预测类别为负样本,实际类别为正样本;True Negative (TP)代表的是实际类别和预测类标相同,预测类别和实际类别均为负样本。
在这里我们以一个二分类的例子来帮助大家理解。假设我们是某核酸检测机构,将对100个人进行核酸检测,实际结果为98个阴性,2个阳性,但是我们的模型对核酸检测结果进行预测,预测结果为94个阴性,6个阳性结果,在这里我们定义核酸结果阴性为正样本,核酸结果阳性为负样本。在这个例子中,TP代表实际为阴性且被预测为阴性的数量,共有94人;FP代表实际为阳性,模型预测为阴性的数量,共有0人;FN代表实际为阴性被模型判断为阳性的数量,共有4人;TN代表实际为阳性,被模型识别为阳性的数量,共有2人。 于是我们就可以得到下面的这个混淆矩阵。
Overall Accuracy
Overall Accuracy代表了所有预测正确的样本占所有预测样本总数的比例,结合上述例子,我们可以用下述公式表示:
这里的分类正确代表了正样本被正确分类为正样本,负样本正确分类为负样本。准确率是描述模型最简单的指标,Acc的缺点主要在于假如说我们举得例子,在样本中,核酸结果成阴性占据了模型98%。
假如说模型不调试分类,我只要一直判断核酸结果呈阴性,那我们的模型最终效果可能会更好,但是实际上模型的效率以及准确率都是非常低下的。当我们的样本正负样本极端不平衡时Acc这个评价指标其实是没有意义的。我们再回到上面的例子中,可以计算在该例子中Acc = 0.96。
Average accuracy
Average accuracy(AA) 代表的是平均精度的计算,平均精度计算的是每一类预测正确的样本与该类总体数量之间的比值,最终再取每一类的精度的平均值。
Recall
Recall也称召回率,代表了实际为正样本并且也被正确识别为正样本的数量占样本中所有为正样本的比例,可以用下述公式进行表示
Recall是判断模型正确识别所有正样本的能力。结合我们所举的例子,代表了模型对于正样本的识别能力,也能较好的反应模型的优劣。与Precision不同的是,我们是反映了模型预测时候有多少阴性(正样本)被检测出来。在例子中代表了实际为阴性被模型正确判断为阴性的数量占实际为阴性的比例,即
Precision
Precision也称精准率,代表的是在全部预测为正的结果中,被预测正确的正样本所占的比例,可以用下述公式进行表示
FP代表了阳性患者被预测为阴性(正样本),TP代表了阴性被正确预测为阴性。相较于Acc,Precision更能较好的反应出模型对于正样本(阴性)识别能力。和Recall不同的是,Precision代表了预测结果中有多少样本是分类正确的。
在实际应用场景中,我们继续结合核酸的例子,当我们在进行全面核酸检测时,我们的希望在于模型尽可能少的漏掉阳性患者,此时认为模型Precision显得更为重要。
F1
F1在模型评估中也是一种重要的评价指标,F1可以解释为召回率和精确率的加权平均,F1越高,说明模型鲁棒性越好。人们希望有一种更加广义的方法定义F-score,希望可以改变P和R的权重,于是人们定义了,其定义式如下:
- 当时,更偏好召回(Recall)
- 当时,更偏好精准(Precision)
- 当时,平衡精准和召回,即为 F1
当有多个混淆矩阵(多次训练、多个数据集、多分类任务)时,有两种方式估算 “全局” 性能:
- macro 方法:先计算每个 PR,取平均后,再计算 F1
- micro 方法:先计算混淆矩阵元素的平均,再计算 PR 和 F1
PR曲线
在深度学习常用指标中,PR曲线也能很直观的反应模型好坏。我们用一副经典图来说明问题:
横轴是召回率,纵轴代表了P(精确率),P-R曲线上的一个点代表着,在某一阈值下,模型将大于该阈值的结果判定为正样本,小于该阈值的结果判定为负样本,此时返回结果对应的召回率和精确率。整条P-R曲线是通过将阈值从高到低移动而生成的。原点附近代表当阈值最大时模型的精确率和召回率,在PR曲线我们可以从图中直观的看到某一个曲线被另外一条曲线完全包裹,所包围的面积大于另一条曲线包围的面积,举例图中可以说明A模型的性能优于B和C。
置信度
在目标检测中,我们通常需要将边界框内物体划分为正样本和负样本。我们使用置信度这个指标来进行划分,当小于置信度设置的阈值判定为负样本(背景),大于置信度设置的阈值判定为正样本.
绘图
可以用visdom
epoch、batch(mini-batch)、step、iteration
epoch
:表示将训练数据集中的所有样本都过一遍(且仅过一遍)的训练过程。在一个epoch
中,训练算法会按照设定的顺序将所有样本输入模型进行前向传播、计算损失、反向传播和参数更新。一个epoch
通常包含多个step
。batch
:一般翻译为“批次”,表示一次性输入模型的一组样本。在神经网络的训练过程中,训练数据往往是很多的,比如几万条甚至几十万条——如果我们一次性将这上万条的数据全部放入模型,对计算机性能、神经网络模型学习能力等的要求太高了;那么就可以将训练数据划分为多个batch
,并随后分批将每个batch
的样本一起输入到模型中进行前向传播、损失计算、反向传播和参数更新。但要注意,一般batch
这个词用的不多,多数情况大家都是只关注batch size
的。batch size
:一般翻译为“批次大小”,表示训练过程中一次输入模型的一组样本的具体样本数量。前面提到了,我们在神经网络训练过程中,往往需要将训练数据划分为多个batch
;而具体每一个batch
有多少个样本,那么就是batch size
指定的了。step
:一般翻译为“步骤”,表示在一个epoch
中模型进行一次参数更新的操作。通俗地说,在神经网络训练过程中,每次完成对一个batch
数据的训练,就是完成了一个step
。很多情况下,step
和iteration
表示的是同样的含义。iteration
:一般翻译为“迭代”,多数情况下就表示在训练过程中经过一个step
的操作。一个iteration
包括了一个step
中前向传播、损失计算、反向传播和参数更新的流程。当然,在某些情况下,step
和iteration
可能会有细微的区别——有时候iteration
是指完成一次前向传播和反向传播的过程,而step
是指通过优化算法对模型参数进行一次更新的操作。但是绝大多数情况下,我们就认为二者是一样的即可。以上是对这些名词的解释,我们将他们带入实际的例子就更好理解了。
假设我们现在有一个训练数据集(这个数据集不包括测试集),其中数据的样本数量为
1500
。那么,我们将这1500
条数据全部训练1
次,就是一个epoch
。其中,由于数据量较大(其实1500
个样本在神经网络研究中肯定不算大,但是我们这里只是一个例子,大家理解即可),因此我们希望将其分为多个batch
,分批加以训练;我们决定每1
批训练100
条数据,那么为了将这些数据全部训练完,就需要训练15
批——在这里,batch size
就是100
,而batch
就是15
。而前面我们提到,每次完成对一个batch
数据的训练,就是完成了一个step
,那么step
和iteration
就也都是15
。以上是我们对这一数据集加以
1
次训练(1
个epoch
)的情况,而一般情况下我们肯定是需要训练多次的,也就是多个epoch
。我们假设我们需要训练3
个epoch
,相当于需要将这1500
个样本训练3
次。那么,step
和iteration
都会随着epoch
的改变而发生改变——二者都变为45
,因为15 * 3
。但是,batch
依然是15
,因为其是在每一个epoch
的视角内来看待的,和epoch
的具体大小没有关系。Tensor
N维数组+自动微分
x=torch.arange(12) x.shape #输出形状 X=x.reshape(3,4) #改变形状 X=x.reshape(-1,4) #自动判断行数
创建全0全1张量
torch.zeros((2,3,4)) torch.ones((2,3,4)) #2x3x4
生成随机数
torch.randn(3,4) #符合均值=0,标准差=1的正态分布
对Tensor对你运算都是按元素的
x=torch.tensor([1.,2.,4.,8.]) y=torch.tensor([2,4,8,16]) x+y x-y x**y torch.exp(x) x==y
广播机制
import torch a=torch.arange(3).reshape((3,1)) b=torch.arange(2).reshape((1,2)) a+b # tensor([[0, 1], # [1, 2], # [2, 3]])
节省内存
before=id(a) a=a+b id(a)==before #False before=id(a) a+=b id(a)==before #True
Overview
Prediction:讲自然语言、图像等推理成抽象概念
维度诅咒:维度越多,需要采样的样本越多。在保留高维空间的度量信息的前提下进行降维
线性模型
loss是对一个样本而言,cost是对数据集而言
此处的cost是MSE(Mean Square Error)
先考虑
先进行随机猜测,取为随机值
import numpy as np import matplotlib.pyplot as plt # 数据 x_data = [1.0, 2.0, 3.0] y_data = [2.0, 4.0, 6.0] # 前向计算函数(预测) def forward(x, w): return x * w # 损失函数(均方误差) def loss(x, y, w): y_pred = forward(x, w) return (y_pred - y) ** 2 # 初始化列表存储不同权重和对应的均方误差 w_list = [] mse_list = [] # 遍历不同的权重值,计算MSE for w in np.arange(0.0, 4.1, 0.1): print('w =', w) l_sum = 0 for x_val, y_val in zip(x_data, y_data): y_pred_val = forward(x_val, w) loss_val = loss(x_val, y_val, w) l_sum += loss_val print('\t', x_val, y_val, y_pred_val, loss_val) mse = l_sum / 3 # 计算平均损失(MSE) print('MSE =', mse) w_list.append(w) mse_list.append(mse) # 画出权重和MSE的关系图 plt.plot(w_list, mse_list) plt.xlabel('w') plt.ylabel('MSE') plt.title('w vs MSE') plt.show()
考虑
import numpy as np import matplotlib.pyplot as plt # 数据 x_data = np.array([1.0, 2.0, 3.0]) y_data = np.array([2.0, 4.0, 6.0]) # 前向计算函数(预测) def forward(x, w, b): return x * w + b # 损失函数(均方误差) def loss(x_data, y_data, w, b): # 使用x_data的广播机制同时计算多个x的预测值 y_pred = forward(x_data, w, b) return np.mean((y_pred - y_data) ** 2) # 返回MSE # 创建w和b的值范围 w_values = np.arange(0.0, 4.1, 0.1) b_values = np.arange(-2.0, 2.1, 0.1) # 创建网格 W, B = np.meshgrid(w_values, b_values) # 初始化MSE矩阵 MSE = np.zeros(W.shape) #shape表示矩阵大小(41,41),shape[0] 表示数组的行数,而 shape[1] 表示数组的列数。 # 计算每一对(w, b)的MSE for i in range(W.shape[0]): for j in range(W.shape[1]): w = W[i, j] b = B[i, j] MSE[i, j] = loss(x_data, y_data, w, b) # 绘制3D图 fig = plt.figure() ax = fig.add_subplot(111, projection='3d') # 绘制网格点的三维图 ax.plot_surface(W, B, MSE, cmap='viridis') ax.set_xlabel('w') ax.set_ylabel('b') ax.set_zlabel('MSE') plt.title('w, b vs MSE') plt.show()
梯度下降
本质思想是贪心,因此可能陷入局部最优解,但是深度学习神经网络中没有很多局部最优点,然而有鞍点
批梯度下降 Batch Gradient Descent
x_data = [1.0, 2.0, 3.0] y_data = [2.0, 4.0, 6.0] w = 1.0 def forward(x): return x * w def cost(xs, ys): cost = 0 for x, y in zip(xs, ys): y_pred = forward(x) cost += (y_pred - y) ** 2 return cost / len(xs) def gradient(xs, ys): grad = 0 for x, y in zip(xs, ys): grad += 2 * x * (x * w - y) return grad / len(xs) print('Predict (before training)', 4, forward(4)) for epoch in range(100): cost_val = cost(x_data, y_data) grad_val = gradient(x_data, y_data) w -= 0.01 * grad_val print('Epoch:', epoch, 'w=', w, 'loss=', cost_val) print('Predict (after training)', 4, forward(4))
随机梯度下降 Stochastic Gradient Descent
对每个样本点做梯度下降(引入了噪声,有利于跨过鞍点)
x_data = [1.0, 2.0, 3.0] y_data = [2.0, 4.0, 6.0] w = 1.0 def forward(x): return x * w def loss(x, y): y_pred = forward(x) return (y_pred - y) ** 2 def gradient(x, y): return 2 * x * (x * w - y) print('Predict (before training)', 4, forward(4)) for epoch in range(100): for x, y in zip(x_data, y_data): grad = gradient(x, y) w = w - 0.01 * grad print("\tgrad: ", x, y, grad) l = loss(x, y) print("progress:", epoch, "w=", w, "loss=", l) print('Predict (after training)', 4, forward(4))
虽然 BGD 每个 epoch 的总计算量与 SGD 是相同的(都是 O(N)),但 SGD 更新频率更高,每次更新计算量更小,使得它在初期的收敛速度比 BGD 更快。而 BGD 由于一次性处理整个数据集,更新较慢,尤其是在大数据集上,计算梯度所需时间更长,因此它在很多场景中显得“更慢”,而且内存开销更大。
为了克服两种方法的缺点,现在一般采用的是一种折中手段,Mini-Batch Gradient Decent,小批的梯度下降,这种方法把数据分为若干个批,按批来更新参数,这样,一个批中的一组数据共同决定了本次梯度的方向,下降起来就不容易跑偏,减少了随机性。另一方面因为批的样本数与整个数据集相比小了很多,计算量也不是很大。
反向传播
- 前馈(从前向后计算loss)
- 反馈(从后向前计算)
构建计算图
import torch x_data = [1.0, 2.0, 3.0] y_data = [2.0, 4.0, 6.0] w = torch.Tensor([1.0]) w.requires_grad = True def forward(x): return x * w def loss(x, y): y_pred = forward(x) return (y_pred - y) ** 2 print("predict (before training)", 4, forward(4).item()) for epoch in range(100): for x, y in zip(x_data, y_data): l = loss(x, y) l.backward() print('\tgrad:', x, y, w.grad.item()) w.data = w.data - 0.01 * w.grad.data w.grad.data.zero_() print("progress:", epoch, l.item()) print("predict (after training)", 4, forward(4).item())
import torch x_data = [1.0, 2.0, 3.0] y_data = [2.0, 4.0, 6.0] # 使用torch.tensor()来创建张量 w1 = torch.tensor([1.0], requires_grad=True) w2 = torch.tensor([1.0], requires_grad=True) b = torch.tensor([1.0], requires_grad=True) def forward(x): return w1 * x * x + w2 * x + b def loss(x, y): y_pred = forward(x) return (y_pred - y) ** 2 # 训练前的预测 print("predict (before training)", 4, forward(4).item()) for epoch in range(100): for x, y in zip(x_data, y_data): l = loss(x, y) l.backward() print(f'\tgrad: x={x}, y={y}, w1_grad={w1.grad.item()}, w2_grad={w2.grad.item()}, b_grad={b.grad.item()}') # 在torch.no_grad()上下文中更新参数 with torch.no_grad(): w1 -= 0.01 * w1.grad w2 -= 0.01 * w2.grad b -= 0.01 * b.grad # 将梯度清零 w1.grad.zero_() w2.grad.zero_() b.grad.zero_() # 输出每个epoch的进展 print("progress:", epoch, "loss:", l.item()) # 训练后的预测 print("predict (after training)", 4, forward(4).item())
因为损失函数是标量,所以x.grad一定和x同型
y.backward()
为什么 y
是标量时不需要传入同型向量,而 y
是向量时需要?
这里涉及到梯度和雅可比矩阵的区别,以及 PyTorch 的自动求导机制如何处理标量和向量。
1. 标量的情况
当
y
是标量时,y
相对于 x
的梯度是一个向量。这个向量的维度与 x
相同,表示 y
对 x
的每个分量的偏导数。对于标量 y
,PyTorch 的 backward()
函数能够直接计算 y
相对于 x
的梯度,并存储在 x.grad
中。例如:
x = torch.randn(3, requires_grad=True) y = x.sum() # y 是标量 y.backward() # 直接计算梯度 print(x.grad) # 输出 x 的梯度
在这种情况下,
y.backward()
将自动计算标量 y
对 x
的梯度,无需传入向量。2. 向量的情况
x
是一个向量,例如x=[-0.9332, 1.9616, 0.1739]
,当 y
是一个向量(如在例子中的 y = [-477.7843, 1004.3264, 89.0424]
),y
相对于 x
的梯度是一个 雅可比矩阵,即 dy/dx
。雅可比矩阵的维度是 y
和 x
的维度之积。假设 y
和 x
都是 3 维向量,那么雅可比矩阵的形状将是 3x3。然而,PyTorch 的
backward()
函数默认只能计算标量的梯度,无法直接计算整个雅可比矩阵。因此,在 y
是向量的情况下,PyTorch 不能直接计算 dy/dx
,而是通过雅可比向量积的方式来简化这个过程。雅可比向量积是指将雅可比矩阵
dy/dx
与某个向量 v
相乘。通过向 backward()
传入与 y
同形的向量 v
,PyTorch 将会计算这个积,而不是计算完整的雅可比矩阵。y.backward(v)
这意味着
y.backward(v)
会计算 dy/dx
与 v
的积,并将结果存储在 x.grad
中。此时,v
的作用是指定在某个特定方向上进行雅可比向量积的计算。假设向量v恰好是标量损失l关于向量Y的梯度,如下:
则雅可比向量积为
但是因为损失函数是标量,所以这里并不会用到雅可比向量积
线性回归
线性回归输出的是向量,维度和相同
1. Prepare dataset
x_data = torch.Tensor([[1.0], [2.0], [3.0]]) y_data = torch.Tensor([[2.0], [4.0], [6.0]])
2. Design Model
要从
nn.Module
继承class LinearModel(torch.nn.Module): def __init__(self): super(LinearModel,self).__init__() self.linear=torch.nn.Linear(1,1) def forward(self,x): y_pred=self.linear(x) return y_pred model=LinearModel()
nn.Linear
类包含两个成员张量,weight
和bias
# 构造函数 def __init__(self, in_features: int, out_features: int, bias: bool = True,device=None, dtype=None) -> None
in_features
是每个输入样本的维度,out_features
是每个输出样本的维度如果
x_data = torch.Tensor([[1.0], [2.0], [3.0]]
,那么他就是3x1
的矩阵,表示3
个样本,每个样本有1
个feature,即维度是1
。换言之,如果输入为,则样本数量为,每个样本的维度为。
而
nn.Linear
类继承了nn.Module
的__call__()
方法,因此可以把类的实例当函数调用。我们需要用自己的
forward
函数Overridenn.Linear
类自带的forward
函数:# 自带的forward def forward(self, input: Tensor) -> Tensor: return F.linear(input, self.weight, self.bias)#相当于执行pass
3. Construct Loss and Optimizer
criterion = torch.nn.MSELoss(reduction='sum') optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
reduction='none' | 'mean' | 'sum'
none表示不做操作,还是Tensor;mean表示求平均,sum求和,后两者都是标量
model.parameters()
是nn.Module
的方法,可以获取所有的参数;lr
是learning rate4. Training Cycle
for epoch in range(1000): y_pred = model(x_data) loss = criterion(y_pred, y_data) print(epoch, loss.item()) optimizer.zero_grad() #清零 loss.backward() optimizer.step() #更新权重
完整代码
import torch.nn x_data = torch.Tensor([[1.0], [2.0], [3.0]]) y_data = torch.Tensor([[2.0], [4.0], [6.0]]) class LinearModel(torch.nn.Module): def __init__(self): super(LinearModel,self).__init__() self.linear=torch.nn.Linear(1,1) def forward(self,x): y_pred=self.linear(x) return y_pred model=LinearModel() criterion = torch.nn.MSELoss(reduction='sum') optimizer = torch.optim.SGD(model.parameters(), lr=0.01)#用Adam的话在100000次训练之后可以算出f(4)=8 for epoch in range(1000): y_pred = model(x_data) loss = criterion(y_pred, y_data) print(epoch, loss.item()) optimizer.zero_grad() #清零 loss.backward() optimizer.step() #更新权重 print('w = ', model.linear.weight.item()) print('b = ', model.linear.bias.item()) x_test = torch.Tensor([[4.0]]) y_test = model(x_test) print('y_pred = ', y_test.data)
Logistic Regression
虽然名字叫Regression,但事实上是一个Classification
Logistic Function:
Sigmoid functions:
Logistic Regression Model
就是在线性回归上套一个Logistic函数
对于二分类问题,单个样本损失函数(交叉熵):
如果是Mini-Batch,就取均值
import torch.nn.functional as F x_data = torch.Tensor([[1.0], [2.0], [3.0]]) y_data = torch.Tensor([[0], [0], [1]]) class LogisticRegressionModel(torch.nn.Module): def __init__(self): super(LogisticRegressionModel, self).__init__() self.linear = torch.nn.Linear(1, 1) def forward(self, x): y_pred = F.sigmoid(self.linear(x)) return y_pred model = LogisticRegressionModel() criterion = torch.nn.BCELoss(reduction='sum') optimizer = torch.optim.SGD(model.parameters(), lr=0.01) for epoch in range(1000): y_pred = model(x_data) loss = criterion(y_pred, y_data) print(epoch, loss.item()) optimizer.zero_grad() loss.backward() optimizer.step()
Logistic Regression的结果也是Sigmod曲线,因为相当于把做线性变换。
Multiple Dimension Input
X1 | X2 | X3 | X4 | X5 | X6 | X7 | X8 | Y |
-0.29 | 0.49 | 0.18 | -0.29 | 0.00 | 0.00 | -0.53 | -0.03 | 0 |
-0.88 | -0.15 | 0.08 | -0.41 | 0.00 | -0.21 | -0.77 | -0.67 | 1 |
-0.06 | 0.84 | 0.05 | 0.00 | 0.00 | -0.31 | -0.49 | -0.63 | 0 |
-0.88 | -0.11 | 0.08 | -0.54 | -0.78 | -0.16 | -0.92 | 0.00 | 1 |
0.00 | 0.38 | -0.34 | -0.29 | -0.60 | 0.28 | 0.89 | -0.60 | 0 |
-0.41 | 0.17 | 0.21 | 0.00 | 0.00 | -0.24 | -0.89 | -0.70 | 1 |
-0.65 | -0.22 | -0.18 | -0.35 | -0.79 | -0.08 | -0.85 | -0.83 | 0 |
0.18 | 0.16 | 0.00 | 0.00 | 0.00 | 0.05 | -0.95 | -0.73 | 1 |
-0.76 | 0.98 | 0.15 | -0.09 | 0.28 | -0.09 | -0.93 | 0.07 | 0 |
-0.06 | 0.26 | 0.57 | 0.00 | 0.00 | 0.00 | -0.87 | 0.10 | 0 |
对于每一行,称为一个sample(样本);每一列,表示一个feature(特征)
那么Logistic Regression Model应该表示为:
其中,i表示第i个样本。因此有:
把多个Linear Layer串在一起,就可以构建神经网络了:
import torch import numpy as np xy = np.loadtxt('diabetes.csv.gz', delimiter=',', dtype=np.float32) x_data = torch.from_numpy(xy[:,:-1]) y_data = torch.from_numpy(xy[:, [-1]]) class Model(torch.nn.Module): def __init__(self): super().__init__() self.linear1 = torch.nn.Linear(8, 6) self.linear2 = torch.nn.Linear(6, 4) self.linear3 = torch.nn.Linear(4, 1) self.activate = torch.nn.Sigmoid() def forward(self, x): x = self.activate(self.linear1(x)) x = self.activate(self.linear2(x)) x = self.activate(self.linear3(x)) return x model = Model() criterion = torch.nn.BCELoss(reduction='mean') optimizer = torch.optim.SGD(model.parameters(), lr=0.1) for epoch in range(100): # Forward y_pred = model(x_data) loss = criterion(y_pred, y_data) print(epoch, loss.item()) # Backward optimizer.zero_grad() loss.backward() # Update optimizer.step()
Dataset and DataLoader
class DiabetesDataset(Dataset): def __init__(self, filepath): xy = np.loadtxt(filepath, delimiter=',', dtype=np.float32) self.len = xy.shape[0] self.x_data = torch.from_numpy(xy[:, :-1]) self.y_data = torch.from_numpy(xy[:, [-1]]) def __getitem__(self, index): #The expression, dataset[index],will call this magic function. return self.x_data[index], self.y_data[index] def __len__(self): return self.len dataset = DiabetesDataset('diabetes.csv.gz') train_loader = DataLoader(dataset=dataset, batch_size=32, shuffle=True, num_workers=2) #num_worker规定读取数据的进程数
DiabetesDataset
继承自Dataset
,他是一个抽象类,不能被实例化因为linux用的fork,而windows和mac arm都用的spawn,要把代码包在
if __name__ == "__main__"
里才不会报错。import torch from torch.utils.data import Dataset, DataLoader import numpy as np class DiabetesDataset(Dataset): def __init__(self, filepath): xy = np.loadtxt(filepath, delimiter=',', dtype=np.float32) self.len = xy.shape[0] self.x_data = torch.from_numpy(xy[:, :-1]) self.y_data = torch.from_numpy(xy[:, [-1]]) def __getitem__(self, index): return self.x_data[index], self.y_data[index] def __len__(self): return self.len class Model(torch.nn.Module): def __init__(self): super().__init__() self.linear1 = torch.nn.Linear(8, 6) self.linear2 = torch.nn.Linear(6, 4) self.linear3 = torch.nn.Linear(4, 1) self.sigmoid = torch.nn.Sigmoid() def forward(self, x): x = self.sigmoid(self.linear1(x)) x = self.sigmoid(self.linear2(x)) x = self.sigmoid(self.linear3(x)) return x if __name__ == "__main__": dataset = DiabetesDataset('diabetes.csv.gz') train_loader = DataLoader(dataset=dataset, batch_size=32, shuffle=True, num_workers=2) model = Model() criterion = torch.nn.BCELoss() optimizer = torch.optim.SGD(model.parameters(), lr=0.01) for epoch in range(100): for i, data in enumerate(train_loader, 0): # 1. Prepare data inputs, labels = data # 2. Forward y_pred = model(inputs) loss = criterion(y_pred, labels) print(epoch, i, loss.item()) # 3. Backward optimizer.zero_grad() loss.backward() # 4. Update optimizer.step()
也可以调用m系列芯片的GPU进行运算
import torch from torch.utils.data import Dataset, DataLoader import numpy as np class DiabetesDataset(Dataset): def __init__(self, filepath): xy = np.loadtxt(filepath, delimiter=',', dtype=np.float32) self.len = xy.shape[0] self.x_data = torch.from_numpy(xy[:, :-1]) self.y_data = torch.from_numpy(xy[:, [-1]]) def __getitem__(self, index): return self.x_data[index], self.y_data[index] def __len__(self): return self.len class Model(torch.nn.Module): def __init__(self): super().__init__() self.linear1 = torch.nn.Linear(8, 6) self.linear2 = torch.nn.Linear(6, 4) self.linear3 = torch.nn.Linear(4, 1) self.sigmoid = torch.nn.Sigmoid() def forward(self, x): x = self.sigmoid(self.linear1(x)) x = self.sigmoid(self.linear2(x)) x = self.sigmoid(self.linear3(x)) return x if torch.backends.mps.is_available(): device = torch.device("mps") else: device = torch.device("cpu") if __name__ == "__main__": dataset = DiabetesDataset('diabetes.csv.gz') train_loader = DataLoader(dataset=dataset, batch_size=32, shuffle=True, num_workers=2) model = Model().to(device) criterion = torch.nn.BCELoss() optimizer = torch.optim.SGD(model.parameters(), lr=0.01) for epoch in range(100): for i, data in enumerate(train_loader, 0): # 1. Prepare data inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) # 2. Forward y_pred = model(inputs) loss = criterion(y_pred, labels) print(epoch, i, loss.item()) # 3. Backward optimizer.zero_grad() loss.backward() # 4. Update optimizer.step()
Softmax Classifier
假设是最后一层线性层的输出,Softmax函数表示为:
NLL Loss(Negative Log Likelihood Loss)表示为:
:表示损失函数,其中是输入,是目标。
:一个包含个损失值的向量,表示每个样本的损失。
:单个样本的损失计算为目标类别的对数概率的负值乘以权重。
比如说,,因此
因为,而每一列都表示对应类别的概率,因此相当于列下标
:对于类别,权重取决于是否等于
ignore_index
,如果不相等,则乘以相应的权重。而CrossEntropyLoss=NLL(LogSoftmax)
import torch input=torch.rand(3,3) target=torch.tensor([0,2,1]) softmax=torch.nn.Softmax(dim=1) loss=torch.nn.NLLLoss() CEL=torch.nn.CrossEntropyLoss() print(loss(torch.log(softmax(input)),target)) print(CEL(input,target))
可以看到两个值相等
多维情况
上面只有一个维度的,那么如果在k个维度上都有预测值,则需要对每个维度上都计算损失。设表示在k维度上的预测,表示在k维度上的label,有C个分类,表示在c分类上的权重,则reduction=sum下:
构建一个多层的神经网络
完整代码:
import torch from torchvision import transforms from torchvision import datasets from torch.utils.data import DataLoader import torch.nn.functional as F import torch.optim as optim # 检查 MPS 是否可用 device = torch.device("mps" if torch.backends.mps.is_available() else "cpu") batch_size = 64 transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307, ), (0.3081, )) ]) train_dataset = datasets.MNIST(root='../dataset/mnist/', train=True, download=True, transform=transform) train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size) test_dataset = datasets.MNIST(root='../dataset/mnist/', train=False, download=True, transform=transform) test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size) class Net(torch.nn.Module): def __init__(self): super(Net, self).__init__() self.l1 = torch.nn.Linear(784, 512) self.l2 = torch.nn.Linear(512, 256) self.l3 = torch.nn.Linear(256, 128) self.l4 = torch.nn.Linear(128, 64) self.l5 = torch.nn.Linear(64, 10) def forward(self, x): x = x.view(-1, 784) x = F.relu(self.l1(x)) x = F.relu(self.l2(x)) x = F.relu(self.l3(x)) x = F.relu(self.l4(x)) return self.l5(x) model = Net().to(device) criterion = torch.nn.CrossEntropyLoss() optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) def train(epoch): running_loss = 0.0 for batch_idx, data in enumerate(train_loader, 0): inputs, target = data # 将输入和标签移到 MPS 设备上 inputs, target = inputs.to(device), target.to(device) optimizer.zero_grad() # forward + backward + update outputs = model(inputs) loss = criterion(outputs, target) loss.backward() optimizer.step() running_loss += loss.item() if batch_idx % 300 == 299: print('[%d, %5d] loss: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 300)) running_loss = 0.0 def softtest(): correct = 0 total = 0 with torch.no_grad(): for data in test_loader: images, labels = data # 将输入和标签移到 MPS 设备上 images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, dim=1) total += labels.size(0) correct += (predicted == labels).sum().item() print('Accuracy on test set: %d %%' % (100 * correct / total)) if __name__ == '__main__': for epoch in range(10): train(epoch) softtest()
CNN
Convolution
黄色的长方体是卷积核,可以自定义宽高,但这里channel数量必须是3,与输入一致。
对于n个通道,卷积核的大小就是(n,width,height),width和height可以自定义,但我们一般让width=height
我们也可以使用多个卷积核(filter),把他们的输出进行叠加
那么当中这些filter的总大小就是(m,n,width,height),他是一个四维张量
import torch in_channels, out_channels= 5, 10 width, height = 100, 100 kernel_size = 3 batch_size = 1 input = torch.randn(batch_size,in_channels,width,height) conv_layer = torch.nn.Conv2d(in_channels,out_channels,kernel_size=kernel_size) output = conv_layer(input) print(input.shape) print(output.shape) print(conv_layer.weight.shape)
输出
torch.Size([1, 5, 100, 100]) torch.Size([1, 10, 98, 98]) torch.Size([10, 5, 3, 3])
Padding
假设Kernel是的,那么当的时候,Input和Output大小相同。因为当Kernal的中心在Input边界的时候,一定有Kernal的n/2行or列在Input外面,如图所示。
Stride
步长,当Stride=2时如图所示:
MaxPooling
每个小方格(由kernal_size决定大小)取最大值,填充新的小方格
如果kernal_size=1,则同时可以划分出16个小方格,结果和原来一样。
如果kernal_size=3,则同时只能划分出1个小方格,在每个filter的结果为的
- Input: or
- Output: or
N是batch,C是channel
Example
先ReLU和先Pooling没影响,但是先Pooling可以减少计算(应该?)
import torch from torchvision import transforms from torchvision import datasets from torch.utils.data import DataLoader import torch.nn.functional as F import torch.optim as optim import matplotlib.pyplot as plt # prepare dataset batch_size = 64 transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]) train_dataset = datasets.MNIST(root='../dataset/mnist/', train=True, download=True, transform=transform) train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size) test_dataset = datasets.MNIST(root='../dataset/mnist/', train=False, download=True, transform=transform) test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size) # design model using class class Net(torch.nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = torch.nn.Conv2d(1, 10, kernel_size=5) self.conv2 = torch.nn.Conv2d(10, 20, kernel_size=5) self.pooling = torch.nn.MaxPool2d(2) self.fc = torch.nn.Linear(320, 10) def forward(self, x): # flatten data from (n,1,28,28) to (n, 784) batch_size = x.size(0) x = F.relu(self.pooling(self.conv1(x))) x = F.relu(self.pooling(self.conv2(x))) x = x.view(batch_size, -1) # -1 此处自动算出的是320 x = self.fc(x) return x model = Net() device = torch.device("mps" if torch.mps.is_available() else "cpu") model.to(device) # construct loss and optimizer criterion = torch.nn.CrossEntropyLoss() optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) # training cycle forward, backward, update def train(epoch): running_loss = 0.0 for batch_idx, data in enumerate(train_loader, 0): inputs, target = data inputs, target = inputs.to(device), target.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, target) loss.backward() optimizer.step() running_loss += loss.item() if batch_idx % 300 == 299: print('[%d, %5d] loss: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 300)) running_loss = 0.0 def test(): correct = 0 total = 0 with torch.no_grad(): for data in test_loader: images, labels = data images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, dim=1) total += labels.size(0) correct += (predicted == labels).sum().item() print('accuracy on test set: %d %% ' % (100 * correct / total)) return correct / total if __name__ == '__main__': epoch_list = [] acc_list = [] for epoch in range(10): train(epoch) acc = test() epoch_list.append(epoch) acc_list.append(acc) plt.plot(epoch_list, acc_list) plt.ylabel('accuracy') plt.xlabel('epoch') plt.show()
注意要使用numpy 1.26.0跑,最新版会报错
1x1 Convolution
1×1卷积核主要功能是改变通道数目,致使减少计算量。在使用1×1卷积核的过程中,不改变原始图片的宽度和高度,它只是改变了通道数,它是一种信息融合。
可以看到计算量降低为1/10。而通道减少并不意味着信息损失,
Inception Module
同时使用不同的卷积层+池化层,通过训练确定权重(作用好的权重会越来越大)。所有的卷积层输出width和height必须都是一样的,然后在channel方向上拼接。
class InceptionA(nn.Module): def __init__(self, in_channels): super(InceptionA, self).__init__() self.branch1x1 = nn.Conv2d(in_channels, 16, kernel_size=1) self.branch5x5_1 = nn.Conv2d(in_channels,16, kernel_size=1) self.branch5x5_2 = nn.Conv2d(16, 24, kernel_size=5, padding=2) self.branch3x3_1 = nn.Conv2d(in_channels, 16, kernel_size=1) self.branch3x3_2 = nn.Conv2d(16, 24, kernel_size=3, padding=1) self.branch3x3_3 = nn.Conv2d(24, 24, kernel_size=3, padding=1) self.branch_pool = nn.Conv2d(in_channels, 24, kernel_size=1) def forward(self, x): branch1x1 = self.branch1x1(x) branch5x5 = self.branch5x5_1(x) branch5x5 = self.branch5x5_2(branch5x5) branch3x3 = self.branch3x3_1(x) branch3x3 = self.branch3x3_2(branch3x3) branch3x3 = self.branch3x3_3(branch3x3) branch_pool = F.avg_pool2d(x, kernel_size=3, stride=1, padding=1) branch_pool = self.branch_pool(branch_pool) outputs = [branch1x1, branch5x5, branch3x3, branch_pool] return torch.cat(outputs, dim=1) class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 10, kernel_size=5) self.conv2 = nn.Conv2d(88, 20, kernel_size=5) self.incep1 = InceptionA(in_channels=10) self.incep2 = InceptionA(in_channels=20) self.mp = nn.MaxPool2d(2) self.fc = nn.Linear(1408, 10) def forward(self, x): in_size = x.size(0) x = F.relu(self.mp(self.conv1(x))) x = self.incep1(x) x = F.relu(self.mp(self.conv2(x))) x = self.incep2(x) x = x.view(in_size, -1) x = self.fc(x) return x
Residual Network
普通的CNN可能会陷入梯度消失,但Residual net不会,求偏导可知,H(x)偏导数≥1
梯度爆炸和梯度消失
BP算法基于梯度下降策略,以目标的负梯度方向对参数进行调整,计算梯度包含了是对激活函数进行求导,如果此部分大于1,那么层数增多的时候,最终的求出的梯度更新将以指数形式增加,即发生梯度爆炸,如果此部分小于1,那么随着层数增多,求出的梯度更新信息将会以指数形式衰减,即发生了梯度消失。
都表现为当前面隐藏层的学习速率低于后面隐藏层的学习速率,即随着隐藏层数目的增加,分类准确率反而下降了。
发生梯度消失的一个原因是层数过多,另一个原因是激活函数导数值域问题。例如sigmod函数的导数值域是[0,0.25],多层叠加之后就容易出现梯度消失。
class ResidualBlock(nn.Module): def __init__(self, channels): super(ResidualBlock, self).__init__() self.channels = channels self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1) self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1) def forward(self, x): y = F.relu(self.conv1(x)) y = self.conv2(y) return F.relu(x + y) #先加再ReLU
class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 16, kernel_size=5) self.conv2 = nn.Conv2d(16, 32, kernel_size=5) self.mp = nn.MaxPool2d(2) self.rblock1 = ResidualBlock(16) self.rblock2 = ResidualBlock(32) self.fc = nn.Linear(512, 10) def forward(self, x): in_size = x.size(0) x = self.mp(F.relu(self.conv1(x))) x = self.rblock1(x) x = self.mp(F.relu(self.conv2(x))) x = self.rblock2(x) x = x.view(in_size, -1) x = self.fc(x) return x
权重共享
对于每张图片的所有像素点,卷积核的参数都是相同的,且参数数量仅与输入与输出的通道数量和卷积核的大小有关,和原始数据的长宽无关。而对于全链接网络,假设将nxm的图片压缩成1x(nm)的,经过全链接转换成1xk的,那么权重矩阵就需要nmk个参数。
RNN
适合处理序列关系
RNN Cell
下图中构成一个序列,RNN Cell的输入由和构成
上图中的权重矩阵应为,偏置应该是向量?,RNN Cell可以看成
Suppose we have sequence with below properties:
- batchSize = 1
- seqLen = 3
- inputSize = 4
- hiddenSize = 2
- So the shape of inputs and outputs of RNNCell:
- input.shape = batchSize, inputSize
- output.shape = batchSize, hiddenSize
- The sequence can be wrapped in one Tensor with shape:
- dataset.shape = seqLen, batchSize, inputSize
code
import torch batch_size = 1 seq_len = 3 input_size = 4 hidden_size = 2 cell = torch.nn.RNNCell(input_size=input_size, hidden_size=hidden_size) # (seq, batch, features) dataset = torch.randn(seq_len, batch_size, input_size) hidden = torch.zeros(batch_size, hidden_size) for idx, input in enumerate(dataset): print('=' * 20, idx, '=' * 20) print('Input size: ', input.shape) hidden = cell(input, hidden) print('outputs size: ', hidden.shape) print(hidden)
RNN
Suppose we have sequence with below properties:
- batchSize
- seqLen
- inputSize, hiddenSize
- numLayers
- The shape of input and of RNN:
- input.shape = seqLen, batchSize, inputSize
- .shape = numLayers, batchSize, hiddenSize
- The shape of output and of RNN:
- output.shape = seqLen, batchSize, hiddenSize
- .shape = numLayers, batchSize, hiddenSize
网络的构建参数中不含batchSize,说到底是把batch当中的数据逐个丢进网络进行运算
code
import torch batch_size = 1 seq_len = 3 input_size = 4 hidden_size = 2 num_layers = 1 cell = torch.nn.RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers) # (seqLen, batchSize, inputSize) inputs = torch.randn(seq_len, batch_size, input_size) hidden = torch.zeros(num_layers, batch_size, hidden_size) out, hidden = cell(inputs, hidden) print('Output size:', out.shape) print('Output:', out) print('Hidden size: ', hidden.shape) print('Hidden: ', hidden)
按序列对batch中的每个数据计算vs对batch中的每个数据按序列计算
One-Hot Vectors
这里h对应的独热向量是[0,1,0,0]的转置
Example
RNN Cell
import torch input_size = 4 # 输入维度,每个字母对应张量维度 hidden_size = 4 batch_size = 1 # 准备数据集 idx2char = ['e', 'h', 'l', 'o'] x_data = [1, 0, 2, 3, 3] # hello对应编码 y_data = [3, 1, 2, 3, 2] # ohlol对应编码 one_hot_lookup = [[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]] x_one_hot = [one_hot_lookup[x] for x in x_data] inputs = torch.Tensor(x_one_hot).view(-1, batch_size, input_size) labels = torch.LongTensor(y_data).view(-1, 1) print(inputs.shape, labels.shape) # 构建模型 class Model(torch.nn.Module): def __init__(self, input_size, hidden_size, batch_size): super(Model, self).__init__() self.batch_size = batch_size self.input_size = input_size self.hidden_size = hidden_size self.rnncell = torch.nn.RNNCell(input_size=self.input_size, hidden_size=self.hidden_size) def forward(self, inputs, hidden): hidden = self.rnncell(inputs, hidden) return hidden def init_hidden(self): return torch.zeros(self.batch_size, self.hidden_size) net = Model(input_size, hidden_size, batch_size) # 损失函数和优化器 criterion = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(net.parameters(), lr=0.1) # 训练 for epoch in range(15): loss = 0 optimizer.zero_grad() hidden = net.init_hidden() print( 'Predicted string: ', end= '') for input, label in zip(inputs, labels): hidden = net(input, hidden) loss += criterion(hidden, label) #对每个seq的所有batch求loss,这里batch就1个 _, idx = hidden.max(dim=1) print(idx2char[idx.item()], end= '') loss.backward() optimizer.step() print( ', Epoch [%d/15] loss=%.4f' % (epoch+1, loss.item()))
也可以改成一次性对所有seq的所有batch求loss,只需修改训练部分:
for epoch in range(15): loss = 0 o = [] optimizer.zero_grad() hidden = net.init_hidden() print('Predicted string: ', end='') for input, label in zip(inputs, labels): hidden = net(input, hidden) o.append(hidden) _, idx = hidden.max(dim=1) print(idx2char[idx.item()], end='') # Stack the list of hidden states to form a tensor o = torch.stack(o, dim=0) # Now o is a tensor of shape [seqLen, batchSize, hiddenSize] # Reshape `o` to match the expected input shape for the loss function loss = criterion(o.view(-1, hidden_size), labels.view(-1)) loss.backward() optimizer.step() print(', Epoch [%d/15] loss=%.4f' % (epoch + 1, loss.item()))
并把交叉熵的reduction设置为sum,默认是mean
RNN Module
import torch input_size = 4 hidden_size = 4 batch_size = 1 seq_len = 5 num_layers = 1 #1层和一个cell是一样的 # 准备数据集 idx2char = ['e', 'h', 'l', 'o'] x_data = [1, 0, 2, 3, 3] # hello对应编码 y_data = [3, 1, 2, 3, 2] # ohlol对应编码 one_hot_lookup = [[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]] x_one_hot = [one_hot_lookup[x] for x in x_data] inputs = torch.Tensor(x_one_hot).view(seq_len, batch_size, input_size) labels = torch.LongTensor(y_data) print(inputs.shape, labels.shape) # 构建模型 class Model(torch.nn.Module): def __init__(self, input_size, hidden_size, batch_size, num_layers=1): super(Model, self).__init__() self.num_layers = num_layers self.batch_size = batch_size self.input_size = input_size self.hidden_size = hidden_size self.rnn = torch.nn.RNN(input_size=self.input_size, hidden_size=self.hidden_size, num_layers=num_layers) def forward(self, inputs): hidden = torch.zeros(self.num_layers, self.batch_size, self.hidden_size) out, _ = self.rnn(inputs, hidden) # 注意维度是(seqLen, batch_size, hidden_size) return out.view(-1, self.hidden_size) # 为了容易计算交叉熵这里调整维度为(seqLen * batch_size, hidden_size) net = Model(input_size, hidden_size, batch_size) # 损失函数和优化器 criterion = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(net.parameters(), lr=0.1) # 训练 for epoch in range(15): optimizer.zero_grad() outputs = net(inputs) # print(outputs.shape, labels.shape) # 这里的outputs维度是([seqLen * batch_size, hidden]), labels维度是([seqLen*batch_size]),只不过这里batch_size刚好是1 loss = criterion(outputs, labels) #这里是对所有seq对所有batch求了交叉熵,而默认reduction=mean,因此loss是之前Cell的loss的1/seq_len loss.backward() optimizer.step() _, idx = outputs.max(dim=1) idx = idx.data.numpy() print('Predicted: ', ''.join([idx2char[x] for x in idx]), end='') print(', Epoch [%d/15] loss = %.3f' % (epoch + 1, loss.item()))
Embedding
可以用来降维也可以用来升维。
Embedding 层的功能:
Embedding 层实际上是一个查找表(lookup table),它将每个输入的整数(通常表示一个单词、符号或字符)映射到一个固定大小的向量表示。这个向量通常称为嵌入向量,它是用来捕捉输入数据的某些语义或特征信息的稠密表示。
为什么需要 Embedding 层?
如果直接将每个字符或单词以独热编码(one-hot encoding)的形式表示,数据的维度会非常大且稀疏,这样不利于模型的学习。因此,我们使用 Embedding 层将每个字符映射为一个低维稠密向量。这种方式不仅降低了维度,而且使得每个字符的表示更具有语义信息,便于后续的 RNN 或其他神经网络层处理。
加入Embedding后的网络结构如下:
最后加了一层线性层是因为RNN Cell出来的结果是hidden_size个维度,要用线性层把维度转换成和类数一样,才能做交叉熵。
import torch # parameters num_class = 4 input_size = 4 hidden_size = 8 embedding_size = 10 num_layers = 2 batch_size = 1 seq_len = 5 # 准备数据集 idx2char = ['e', 'h', 'l', 'o'] x_data = [[1, 0, 2, 2, 3]] # (batch, seq_len) y_data = [[3, 1, 2, 3, 2]] # (batch, seq_len) inputs = torch.LongTensor(x_data) # Input should be LongTensor: (batchSize, seqLen) labels = torch.LongTensor(y_data).view(-1) # Target should be LongTensor: (batchSize * seqLen) # 构建模型 class Model(torch.nn.Module): def __init__(self): super(Model, self).__init__() self.emb = torch.nn.Embedding(input_size, embedding_size) #每个input(字符)映射到一个embedding_size维的向量 self.rnn = torch.nn.RNN(input_size=embedding_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True) self.fc = torch.nn.Linear(hidden_size, num_class) def forward(self, x): hidden = torch.zeros(num_layers, x.size(0), hidden_size) x = self.emb(x) # (batch, seqLen, embeddingSize) x, _ = self.rnn(x, hidden) # 输出(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒔𝒆𝒒𝑳𝒆𝒏, hidden_size) x = self.fc(x) # 输出(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒔𝒆𝒒𝑳𝒆𝒏, 𝒏𝒖𝒎𝑪𝒍𝒂𝒔𝒔) return x.view(-1, num_class) # reshape to use Cross Entropy: (𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆×𝒔𝒆𝒒𝑳𝒆𝒏, 𝒏𝒖𝒎𝑪𝒍𝒂𝒔𝒔) net = Model() # 损失函数和优化器 criterion = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(net.parameters(), lr=0.05) # 训练模型 for epoch in range(15): optimizer.zero_grad() outputs = net(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() _, idx = outputs.max(dim=1) idx = idx.data.numpy() print('Predicted: ', ''.join([idx2char[x] for x in idx]), end='') print(', Epoch [%d/15] loss = %.3f' % (epoch + 1, loss.item()))
LSTM
比起RNN多了一个c序列
GRU
Bidirectional
先正着算一遍,再反着算一遍,最后两次计算的结果共同构成输出。更具体的可以看李沐的动手学。
此时隐层的输出为
Example 根据人名判断国家
数据处理
把人名拆成字符序列,转换成ascii码后在尾部填充0,使batch的所有序列长度相同。
把国家也处理成数字:
pack_padded_sequence
pack_padded_sequence(embedding, seq_lengths)
如果Batch_first=false,则输入的embedding是SeqxBatchsizexHiddensize的,且按照Seq长度降序排列。seq_lengths是输入数据的每个序列的长度。
按Seq方向压,一个batch一个batch放,这batch_sizes应该是指一个batch中,每个序列的长度。
完整代码
import csv import gzip import time import math import torch from torch.nn.utils.rnn import pack_padded_sequence from torch.utils.data import Dataset, DataLoader import matplotlib.pyplot as plt # 超参数设置 HIDDEN_SIZE = 100 BATCH_SIZE = 256 N_LAYER = 2 N_EPOCHS = 100 N_CHARS = 128 USE_MPS = torch.backends.mps.is_available() # 自动检测MPS,不知道为什么CPU跑的比GPU快 # 定义数据集 class NameDataset(Dataset): def __init__(self, is_train_set=True): filename = 'names_train.csv.gz' if is_train_set else 'names_test.csv.gz' with gzip.open(filename, 'rt') as f: reader = csv.reader(f) rows = list(reader) self.names = [row[0] for row in rows] #提取名字 self.len = len(self.names) self.countries = [row[1] for row in rows] #提取国家 self.country_list = list(sorted(set(self.countries))) #先去重,再排序 self.country_dict = self.get_country_dict() self.country_num = len(self.country_list) def __getitem__(self, index): return self.names[index], self.country_dict[self.countries[index]] #返回名字和国家的索引 def __len__(self): return self.len def get_country_dict(self): country_dict = dict() for idx, country_name in enumerate(self.country_list, 0): #可以省略0,0表示从0开始遍历 country_dict[country_name] = idx return country_dict def idx2country(self, index): return self.country_list[index] def get_countries_num(self): return self.country_num # 数据加载 trainset = NameDataset(is_train_set=True) trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True) testset = NameDataset(is_train_set=False) testloader = DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False) N_COUNTRY = trainset.get_countries_num() # 定义模型 class RNNClassifier(torch.nn.Module): def __init__(self, input_size, hidden_size, output_size, n_layers=1, bidirectional=True): super(RNNClassifier, self).__init__() self.hidden_size = hidden_size self.n_layers = n_layers self.n_directions = 2 if bidirectional else 1 self.embedding = torch.nn.Embedding(input_size, hidden_size) #input_size=N_CHARS,input_size其实是字典的大小 self.gru = torch.nn.GRU(hidden_size, hidden_size, n_layers, bidirectional=bidirectional) self.fc = torch.nn.Linear(hidden_size * self.n_directions, output_size) def _init_hidden(self, batch_size): hidden = torch.zeros(self.n_layers * self.n_directions, batch_size, self.hidden_size) return create_tensor(hidden) def forward(self, input, seq_lengths): input = input.t() # 转置 input 形状:B x S -> S x B batch_size = input.size(1) hidden = self._init_hidden(batch_size) embedding = self.embedding(input) gru_input = pack_padded_sequence(embedding, seq_lengths) #embedding是SxBxH,且按照seq_length降序排列,seq_lengths是每个序列的长度 output, hidden = self.gru(gru_input, hidden) if self.n_directions == 2: hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim=1) #双向RNN的输出是两个方向的输出拼接 else: hidden_cat = hidden[-1] fc_output = self.fc(hidden_cat) return fc_output # 将张量放置到 MPS 或 CPU def create_tensor(tensor): if USE_MPS: return tensor.to('mps') else: return tensor # 计算经过的时间 def time_since(since): now = time.time() s = now - since m = math.floor(s / 60) s -= m * 60 return f'{m}m {s:.0f}s' # 构建输入张量 def make_tensors(names, countries): sequences_and_lengths = [name2list(name) for name in names] #将名字转换为ASCII序列 name_sequences = [sl[0] for sl in sequences_and_lengths] #提取ASCII序列 seq_lengths = torch.LongTensor([sl[1] for sl in sequences_and_lengths]) #提取序列长度 countries = countries.long() #将国家转换为整数 seq_tensor = torch.zeros(len(name_sequences), seq_lengths.max()).long() #创建一个全0的张量,大小为len(name_sequences) x seq_lengths.max() for idx, (seq, seq_len) in enumerate(zip(name_sequences, seq_lengths), 0): seq_tensor[idx, :seq_len] = torch.LongTensor(seq) #填充张量 seq_lengths, perm_idx = seq_lengths.sort(dim=0, descending=True) #按照序列长度降序排列 seq_tensor = seq_tensor[perm_idx] #按照perm_idx的顺序排列,tensor或者np.array都可以这样操作,普通的list不行 countries = countries[perm_idx] return create_tensor(seq_tensor), seq_lengths, create_tensor(countries) #保留seq_lengths在CPU上 # 将名字转换为 ASCII 序列 def name2list(name): arr = [ord(c) for c in name] return arr, len(arr) # 训练模型 def train_model(): total_loss = 0 for i, (names, countries) in enumerate(trainloader, 1): inputs, seq_lengths, target = make_tensors(names, countries) output = classifier(inputs, seq_lengths) loss = criterion(output, target) optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() if i % 10 == 0: print(f'[{time_since(start)}] Epoch {epoch} [{i * len(inputs)}/{len(trainset)}] loss={total_loss / (i * len(inputs)):.4f}') return total_loss # 测试模型 def test_model(): correct = 0 total = 0 with torch.no_grad(): for i, (names, countries) in enumerate(testloader, 1): inputs, seq_lengths, target = make_tensors(names, countries) output = classifier(inputs, seq_lengths) pred = output.max(dim=1, keepdim=True)[1] correct += pred.eq(target.view_as(pred)).sum().item() total += target.size(0) accuracy = correct / total * 100 return accuracy # 初始化模型、损失函数、优化器 classifier = RNNClassifier(N_CHARS, HIDDEN_SIZE, N_COUNTRY, N_LAYER) if USE_MPS: classifier.to('mps') criterion = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001) # 开始训练并在每个 epoch 后测试 start = time.time() accuracy_list=[] for epoch in range(1, N_EPOCHS + 1): print(f"\nEpoch {epoch}/{N_EPOCHS}") train_loss = train_model() print(f'Epoch {epoch} training loss: {train_loss:.4f}') accuracy=test_model() accuracy_list.append(accuracy) print(f'Epoch {epoch} accuracy: {accuracy:.2f}%') # Save the Model torch.save(classifier.state_dict(), 'rnn.pth') # 作图 plt.plot(range(1,N_EPOCHS+1),accuracy_list) plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.show()
结束
無限進步
(完)