加载配置文件
script_dir = Path.cwd() # 获取当前工作目录的路径:d:/..../lsq-net-master
args = util.get_config(default_file = script_dir / 'config.yaml') # 参数为当前目录下的配置文件路径,返回为 Munch 类型
其中处理配置项的功能函数定义了命令行参数解析器,并且能解析多个命令行配置文件:
def get_config(default_file):
p = argparse.ArgumentParser(description='Learned Step Size Quantization')
p.add_argument('config_file', metavar='PATH', nargs='+',
help='path to a configuration file') # 添加命令行参数
arg = p.parse_args() # 解析命令行参数,默认为 config_file = ['config.yaml']
with open(default_file) as yaml_file: # 打开默认指定文件(只有一个文件),将内容读入yaml_file
'''
例如cfg的默认值为:
{'name': 'MyProject',
'output_dir': 'out',
'device': {'type': 'cuda', 'gpu': [...]},
...
'''
cfg = yaml.safe_load(yaml_file) # 该函数将YAML内容转换为Python对象,并将结果存储在cfg中
for f in arg.config_file: # 遍历命令行每个参数文件,因为设置了nargs为+,可接收多个配置文件
if not os.path.isfile(f): # 检查f是否为一个存在的文件
raise FileNotFoundError('Cannot find a configuration file at', f)
with open(f) as yaml_file:
c = yaml.safe_load(yaml_file)
cfg = merge_nested_dict(cfg, c) # 将 cfg(第一个cfg是初始默认配置文件) 和 c 合并
return munch.munchify(cfg) # 转换为 Munch 对象并返回,Munch对象允许你可以像访问对象的属性一样来访问值
【技巧代码】如何合并多个配置文件
def merge_nested_dict(d, other): # 初始配置文件 其他命令行配置文件
new = dict(d) # 创建 d 的字典的副本
for k, v in other.items():
if d.get(k, None) is not None and type(v) is dict: # 检查 k 是否在字典 d 中存在(两个字典有同一个键),并且v的类型是否为字典
new[k] = merge_nested_dict(d[k], v) # 递归合并,例如初始参数中 gpu 就符合条件
else:
new[k] = v
return new
创建输出目录
output_dir = script_dir / args.output_dir # 默认 args.output_dir 为 out
output_dir.mkdir(exist_ok=True) #这个参数决定了当目录已经存在时,是否抛出一个异常。如果exist_ok被设置为True,那么如果目录已经存在,将不会抛出异常,而是会忽略这个操作。如果exist_ok被设置为False(这也是默认值),那么如果目录已经存在,将会抛出一个FileExistsError异常。
初始化日志记录器并保存当前配置
log_dir = util.init_logger(args.name, output_dir, script_dir / 'logging.conf') # 返回初始实验日志路径
logger = logging.getLogger() # 创建一个日志对象
with open(log_dir / "args.yaml", "w") as yaml_file: # 将 args 转换为 YAML 格式,并写入 yaml_file
yaml.safe_dump(args, yaml_file)
【技巧代码】创建每一次的实验文件及日志
def init_logger(experiment_name, output_dir, cfg_file=None):
time_str = time.strftime("%Y%m%d-%H%M%S") # 获取当前时间 例如20231228-181202
exp_full_name = time_str if experiment_name is None else experiment_name + '_' + time_str # 拼接实验名称
log_dir = output_dir / exp_full_name # 设置新的日志路径
log_dir.mkdir(exist_ok=True) # 创建路径
log_file = log_dir / (exp_full_name + '.log') # 创建日志文件
logging.config.fileConfig(cfg_file, defaults={'logfilename': log_file})
logger = logging.getLogger() # 获取一个日志记录器对象,并将其存储在logger中
logger.info('Log file for this run: ' + str(log_file)) # 例如 Log file for this run : d:\...\MyProject_20231228-181202\MyProject_20231228-181202.log
return log_dir
初始化监视器
pymonitor = util.ProgressMonitor(logger) # 进度监视
tbmonitor = util.TensorBoardMonitor(logger, log_dir) # 可视化监视
monitors = [pymonitor, tbmonitor]
计算和存储平均值和当前值
class AverageMeter:
"""Computes and stores the average and current value"""
def __init__(self, fmt='%.6f'): # 控制平均值的格式
self.fmt = fmt
self.val = self.avg = self.sum = self.count = 0
def reset(self): # 重置所有属性为0
self.val = self.avg = self.sum = self.count = 0
def update(self, val, n=1):
self.val = val # val 为新的值
self.sum += val * n # n 为权重
self.count += n # 计算权重个数
self.avg = self.sum / self.count # 计算平均值
def __str__(self): # 返回对象的字符串表示
s = self.fmt % self.avg # 首先使用格式化平均值,再返回
return s
进度监视器
class ProgressMonitor(Monitor): # 用于监视和记录训练过程中的进度
def __init__(self, logger):
super().__init__()
self.logger = logger # 接受参数为日志记录器对象
def update(self, epoch, step_idx, step_num, prefix, meter_dict): # 当前的训练轮数 当前的步骤索引 总的步骤数 日志消息的前缀 一个包含度量信息的字典
msg = prefix # 创建一个新的字符串msg,初始化为日志信息的前缀
if epoch > -1: # 除预训练模型以外
msg += ' [%d][%5d/%5d] ' % (epoch, step_idx, int(step_num)) # 在msg的末尾加上一些格式化的信息
else:
msg += ' [%5d/%5d] ' % (step_idx, int(step_num)) # 否则只包括两个,没有epoch
for k, v in meter_dict.items():
msg += k + ' '
if isinstance(v, AverageMeter): # 如果 v 是 AverageMeter 的实例
msg += str(v) # msg后缀添加
else:
msg += '%.6f' % v # 添加v的浮点数表示
msg += ' ' # msg末尾添加三个空格
self.logger.info(msg) # 使用info方法来记录msg
TensorBoard进度监视器
class TensorBoardMonitor(Monitor):
def __init__(self, logger, log_dir): # 日志记录器对象 日志目录
super().__init__()
self.writer = SummaryWriter(log_dir / 'tb_runs') # 用于在TensorBoard中写入数据,这个对象将数据写入到指定目录中
logger.info('TensorBoard data directory: %s/tb_runs' % log_dir) # 表示数据目录的位置
def update(self, epoch, step_idx, step_num, prefix, meter_dict):
current_step = epoch * step_num + step_idx # 计算当前的步骤数
for k, v in meter_dict.items():
val = v.val if isinstance(v, AverageMeter) else v
self.writer.add_scalar(prefix + '/' + k, val, current_step) # TensorBoard添加标量数据
处理和设置GPU设备
if args.device.type == 'cpu' or not t.cuda.is_available() or args.device.gpu == []:
args.device.gpu = []
else:
available_gpu = t.cuda.device_count()
for dev_id in args.device.gpu:
if dev_id >= available_gpu: # 如果参数中的gpu数量大于系统可用gpu,则会报错
logger.error('GPU device ID {0} requested, but only {1} devices available'
.format(dev_id, available_gpu))
exit(1)
# Set default device in case the first one on the list
t.cuda.set_device(args.device.gpu[0])
# Enable the cudnn built-in auto-tuner to accelerating training, but it
# will introduce some fluctuations in a narrow range.
t.backends.cudnn.benchmark = True # 当你的输入数据的尺寸不变时,这个选项可以加速训练。cuDNN会自动寻找最适合当前配置的高效算法来执行卷积等操作。但是,这可能会在一定范围内引入一些波动。
t.backends.cudnn.deterministic = False # 这行代码表示训练过程中的一些操作允许有一定的随机性。这可能会导致你每次运行模型时,即使输入数据和模型参数完全相同,也可能得到略有不同的结果。如果你需要完全可复现的结果,你应该将此选项设置为True。
初始化数据加载器
# Initialize data loader
train_loader, val_loader, test_loader = util.load_data(args.dataloader)
logger.info('Dataset `%s` size:' % args.dataloader.dataset +
'\n Training Set = %d (%d)' % (len(train_loader.sampler), len(train_loader)) +
'\n Validation Set = %d (%d)' % (len(val_loader.sampler), len(val_loader)) +
'\n Test Set = %d (%d)' % (len(test_loader.sampler), len(test_loader)))
-
检测用于验证的数据的比例是否合法:
if cfg.val_split < 0 or cfg.val_split >= 1: # 检测用于验证的数据的比例是否合法 raise ValueError('val_split should be in the range of [0, 1) but got %.3f' % cfg.val_split)
-
图像归一化
tv_normalize = tv.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 用于对图像进行归一化,这里是预先计算好的RGB通道的均值和标准差
-
统一预处理数据集
if cfg.dataset == 'imagenet': train_transform = tv.transforms.Compose([ tv.transforms.RandomResizedCrop(224), tv.transforms.RandomHorizontalFlip(), tv.transforms.ToTensor(), tv_normalize ]) # 训练数据的预处理,包括随机裁剪、随机水平翻转、转换为张量、归一化等 val_transform = tv.transforms.Compose([ tv.transforms.Resize(256), tv.transforms.CenterCrop(224), tv.transforms.ToTensor(), tv_normalize ]) # 调整大小 中心裁剪 转换为张量 归一化 train_set = tv.datasets.ImageFolder( root=os.path.join(cfg.path, 'train'), transform=train_transform) test_set = tv.datasets.ImageFolder( root=os.path.join(cfg.path, 'val'), transform=val_transform) elif cfg.dataset == 'cifar10': train_transform = tv.transforms.Compose([ tv.transforms.RandomHorizontalFlip(), tv.transforms.RandomCrop(32, 4), # size 和 padding:在裁剪前每一边都会被填充4个元素,然后从裁剪后的图像中随机裁剪出一个32×32像素的区域。 tv.transforms.ToTensor(), tv_normalize ]) val_transform = tv.transforms.Compose([ tv.transforms.ToTensor(), tv_normalize ]) # 参数文件中的path:'/localhome/fair/Dataset/cifar10' train_set = tv.datasets.CIFAR10(cfg.path, train=True, transform=train_transform, download=True) test_set = tv.datasets.CIFAR10(cfg.path, train=False, transform=val_transform, download=True) else: raise ValueError('load_data does not support dataset %s' % cfg.dataset)
注意:该项目可使用imagenet或cifar10,imagenet需要提前下载,而cifar可借助torchvision自动下载。
- 如果你的图像是彩色的,你可能需要使用
transforms.ToTensor()
将PIL图像或NumPy ndarray转换为torch.Tensor,并且在[0., 1.]范围内缩放图像的像素强度。 - 如果你的图像的大小不一致,你可能需要使用
transforms.Resize()
将它们调整为相同的大小。 - 许多预训练的模型要求输入的图像是224x224或其他特定大小的,因此你可能需要使用
transforms.Resize()
或transforms.RandomResizedCrop()
。 - 如果你的模型是在具有特定均值和标准差的图像上预训练的,你可能需要使用
transforms.Normalize()
来对你的图像进行归一化。 - 为了提高模型的泛化能力,你可能需要在训练集上应用一些随机的变换,如
transforms.RandomHorizontalFlip()
或transforms.RandomRotation()
。
- 如果你的图像是彩色的,你可能需要使用
-
根据验证集的比例划分训练集/验证集/测试集
if cfg.val_split != 0: # 需要验证数据 train_set, val_set = __balance_val_split(train_set, cfg.val_split) # 根据验证集的占比划分训练集和验证集 else: # In this case, use the test set for validation val_set = test_set # 如果不需要验证集,则测试集全部为验证集
【技巧代码】根据比例划分训练集和验证集
def __balance_val_split(dataset, val_split=0.): # 训练数据集 验证数据集的占比 targets = np.array(dataset.targets) # 获取数据集的标签并转换为Numpy数组 train_indices, val_indices = train_test_split( np.arange(targets.shape[0]), # 训练集中的样本总数并转换为数组 test_size=val_split, # 表示验证集的占比,如果不设置训练集的占比会默认验证集的占比的补数 stratify=targets # 表示分割时要保持每个类别的比例,在这里实际分割的是上面的样本的索引数组,这样就可以根据索引来获取数据 ) train_dataset = t.utils.data.Subset(dataset, indices=train_indices) # 只包含训练集的数据 val_dataset = t.utils.data.Subset(dataset, indices=val_indices) # 只包含测试集的数据 return train_dataset, val_dataset
-
设置每个工作进程的状态
worker_init_fn = None if cfg.deterministic: worker_init_fn = __deterministic_worker_init_fn
【技巧代码】工作进程的可重复性
def __deterministic_worker_init_fn(worker_id, seed=0): # 用于设置每个工作进程的随机种子 import random random.seed(seed) np.random.seed(seed) # 影响Numpy生成的所有随机数 t.manual_seed(seed) # 影响PyTorch生成的所有随机数
-
使用数据加载器加载训练集/验证集/测试集
train_loader = t.utils.data.DataLoader( train_set, cfg.batch_size, shuffle=True, num_workers=cfg.workers, pin_memory=True, worker_init_fn=worker_init_fn) # pin_memory会在返回批次之前,将数据放入CUDA固定的内存中,这可以加速将数据移动到GPU的速度 val_loader = t.utils.data.DataLoader( val_set, cfg.batch_size, num_workers=cfg.workers, pin_memory=True, worker_init_fn=worker_init_fn) test_loader = t.utils.data.DataLoader( test_set, cfg.batch_size, num_workers=cfg.workers, pin_memory=True, worker_init_fn=worker_init_fn) return train_loader, val_loader, test_loader
创建模型
# Create the model
model = create_model(args)
import logging
from .resnet import *
from .resnet_cifar import *
def create_model(args):
logger = logging.getLogger()
model = None
if args.dataloader.dataset == 'imagenet':
if args.arch == 'resnet18':
model = resnet18(pretrained=args.pre_trained)
elif args.arch == 'resnet34':
model = resnet34(pretrained=args.pre_trained)
elif args.arch == 'resnet50':
model = resnet50(pretrained=args.pre_trained)
elif args.arch == 'resnet101':
model = resnet101(pretrained=args.pre_trained)
elif args.arch == 'resnet152':
model = resnet152(pretrained=args.pre_trained)
elif args.dataloader.dataset == 'cifar10':
if args.arch == 'resnet20':
model = resnet20(pretrained=args.pre_trained)
elif args.arch == 'resnet32':
model = resnet32(pretrained=args.pre_trained)
elif args.arch == 'resnet44':
model = resnet44(pretrained=args.pre_trained)
elif args.arch == 'resnet56':
model = resnet56(pretrained=args.pre_trained)
elif args.arch == 'resnet110':
model = resnet152(pretrained=args.pre_trained)
elif args.arch == 'resnet1202':
model = resnet1202(pretrained=args.pre_trained)
if model is None:
logger.error('Model architecture `%s` for `%s` dataset is not supported' % (args.arch, args.dataloader.dataset))
exit(-1)
msg = 'Created `%s` model for `%s` dataset' % (args.arch, args.dataloader.dataset)
msg += '\n Use pre-trained model = %s' % args.pre_trained
logger.info(msg)
return model
以resnet20为例:
def _resnet(arch, block, layers, pretrained, progress, **kwargs): # 模型架构的名称 ResNet中的块类型 每个块中的层次数 是否加载预训练权重 是否显示下载进度条
model = ResNet(block, layers, **kwargs) # ResNet模型
if pretrained: # 是否需要加载预训练权重
s = load_state_dict_from_url(model_urls[arch], progress=progress) # 从指定URL下载预训练权重
state_dict = OrderedDict() # 有序字典,用于存储预训练权重
for k, v in s['state_dict'].items():
if k.startswith('module.'):
state_dict[k[7:]] = v # 如果 k 以 'module.' 开头,则添加到字典中,但是去掉前缀
model.load_state_dict(state_dict) # 将预训练权重加载到模型中
return model # 返回模型
def resnet20(pretrained=False, progress=True):
return _resnet('resnet20', BasicBlock, [3, 3, 3], pretrained, progress)
【技巧代码】预加载训练权重
if pretrained: # 是否需要加载预训练权重
s = load_state_dict_from_url(model_urls[arch], progress=progress) # 从指定URL下载预训练权重
state_dict = OrderedDict() # 有序字典,用于存储预训练权重
for k, v in s['state_dict'].items():
if k.startswith('module.'):
state_dict[k[7:]] = v # 如果 k 以 'module.' 开头,则添加到字典中,但是去掉前缀
model.load_state_dict(state_dict) # 将预训练权重加载到模型中
ResNet模型
class ResNet(nn.Module):
def __init__(self, block, num_blocks, num_classes=10):
super(ResNet, self).__init__()
self.in_planes = 16
self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False) # 网络
self.bn1 = nn.BatchNorm2d(16)
self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
self.linear = nn.Linear(64, num_classes)
self.apply(_weights_init)
def _make_layer(self, block, planes, num_blocks, stride): # 第一个卷积块的步长为stride,其余块的步长都为1
strides = [stride] + [1] * (num_blocks - 1) # 初始步长 + 除第一个块之外的卷积块的数量
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
# out = F.avg_pool2d(out, out.size()[3])
pool = nn.AdaptiveAvgPool2d((1,1))
out = pool(out)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
基本块的构造
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_planes, planes, stride=1, option='A'):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential() # 快捷连接
if stride != 1 or in_planes != planes: # 检查是否需要在快捷连接中添加卷积层。如果步长不为1或输入通道数与输出通道数不同,那么需要添加卷积层
if option == 'A': # 如果选项为A
"""
For CIFAR10 ResNet paper uses option A.
LambdaLayer 是一个自定义的层,它允许你定义一个任意的函数作为网络的一部分
x[:, :, ::2, ::2]: 对输入x进行下采样,即每隔一个像素取一个像素,这样可以将图像的高度和宽度减半。(batch_size,channels,height,width) ::2是从开始到结束,步长为2
F.pad(..., (0, 0, 0, 0, planes // 4, planes // 4), 'constant', 0): 即在通道维度前后各添加 planes // 4 个零通道
因为CIFAR10的图像尺寸较小,如果在shortcut中使用卷积操作可能会引入额外的复杂性和计算成本
在PyTorch中,F.pad(input, pad, mode='constant', value=0) 是一个用于对输入张量进行填充的函数。这里的 F 是 torch.nn.functional 的别名。
1.input 是需要填充的输入张量。
2.pad 是一个元组,定义了各个维度两侧的填充量。长度应为2倍的张量维度,例如对于一个4维张量,pad 应为长度为8的元组。元组中的元素按照最后一个维度的后面,最后一个维度的前面,倒数第二个维度的后面,倒数第二个维度的前面,…,第一个维度的后面,第一个维度的前面的顺序排列。
3.mode 定义了填充的方式,可以是 ‘constant’、‘reflect’ 或 ‘replicate’。默认为 ‘constant’。
4.value 在 mode='constant' 时定义了填充的值。默认为0。
"""
self.shortcut = LambdaLayer(lambda x:
F.pad(x[:, :, ::2, ::2], (0, 0, 0, 0, planes // 4, planes // 4), "constant",
0)) # 添加一个LambdaLayer,它会对输入进行下采样并进行零填充
elif option == 'B':
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False), # 添加一个卷积层
nn.BatchNorm2d(self.expansion * planes) # 添加一个BN层
)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
找到需要量化的模块
def find_modules_to_quantize(model, quan_scheduler): # 用于在给定的模型中找到需要量化的模块
replaced_modules = dict() # 初始化空字典,用于存储需要被替换的模块
for name, module in model.named_modules(): # 遍历模型中的所有模块 named_modules() 方法 会递归地返回模型中所有模块的名称和模块本身
if type(module) in QuanModuleMapping.keys(): # 检查当前模块的类型是否在键中:t.nn.Conv2d 或 t.nn.Linear(通用量化处理)
if name in quan_scheduler.excepts: # 说明这个模块有特殊的量化配置
replaced_modules[name] = QuanModuleMapping[type(module)](
module,
quan_w_fn=quantizer(quan_scheduler.weight,
quan_scheduler.excepts[name].weight),
quan_a_fn=quantizer(quan_scheduler.act,
quan_scheduler.excepts[name].act)
)
else:
replaced_modules[name] = QuanModuleMapping[type(module)](
module,
quan_w_fn=quantizer(quan_scheduler.weight),
quan_a_fn=quantizer(quan_scheduler.act)
)
elif name in quan_scheduler.excepts: # 说明用户想要对这个模块进行特殊的量化配置,但我们找不到对应的量化模块
logging.warning('Cannot find module %s in the model, skip it' % name)
return replaced_modules
QuanModuleMapping = {
t.nn.Conv2d: QuanConv2d,
t.nn.Linear: QuanLinear
}
根据位宽对激活函数/权重执行不同的量化配置函数
def quantizer(default_cfg, this_cfg=None): # 默认的量化配置 特定模块的量化配置
target_cfg = dict(default_cfg)
if this_cfg is not None: # 如果有特定模块的量化配置
for k, v in this_cfg.items():
target_cfg[k] = v # 覆盖默认的量化配置
if target_cfg['bit'] is None:
q = IdentityQuan # 不进行任何量化
elif target_cfg['mode'] == 'lsq':
q = LsqQuan
else:
raise ValueError('Cannot find quantizer `%s`', target_cfg['mode'])
target_cfg.pop('mode') # 移除mode字段
return q(**target_cfg) # 执行相应的量化算法
【关键代码】LSQ量化激活函数/权重
import torch as t
from .quantizer import Quantizer
def grad_scale(x, scale):
y = x
y_grad = x * scale # 乘法操作可以直接改变梯度的大小,而不改变其方向
return (y - y_grad).detach() + y_grad # 这个结果与x相同,但是梯度不同,因为(y - y_grad).detach()梯度为0,而y_grad的梯度是x的梯度乘以scale。这样,整个表达式的梯度就是x的梯度乘以scale。
def round_pass(x):
y = x.round()
y_grad = x
return (y - y_grad).detach() + y_grad
class LsqQuan(Quantizer):
def __init__(self, bit, all_positive=False, symmetric=False, per_channel=True): # 量化位宽 是否将所有数字量化为非负数 是否使用对称量化 是否每个输出通道使用自己的缩放因子
super().__init__(bit)
if all_positive:
assert not symmetric, "Positive quantization cannot be symmetric" # 在量化为非负数的情况下,必须使用非对称量化
# unsigned activation is quantized to [0, 2^b-1]
self.thd_neg = 0
self.thd_pos = 2 ** bit - 1
else:
if symmetric: # 对称量化
# signed weight/activation is quantized to [-2^(b-1)+1, 2^(b-1)-1]
self.thd_neg = - 2 ** (bit - 1) + 1
self.thd_pos = 2 ** (bit - 1) - 1
else: # 非对称量化
# signed weight/activation is quantized to [-2^(b-1), 2^(b-1)-1]
self.thd_neg = - 2 ** (bit - 1)
self.thd_pos = 2 ** (bit - 1) - 1
self.per_channel = per_channel
self.s = t.nn.Parameter(t.ones(1)) # 创建一个全为1的新参数
def init_from(self, x, *args, **kwargs): # 初始化缩放因子
if self.per_channel:
self.s = t.nn.Parameter(
x.detach().abs().mean(dim=list(range(1, x.dim())), keepdim=True) * 2 / (self.thd_pos ** 0.5)) # 在除第一位(通道维)以外的所有维度上计算均值,然后将均值乘以2,然后除以上限的平方根、这是一个缩放操作,用于将s的初始值调整到合适的范围
else:
self.s = t.nn.Parameter(x.detach().abs().mean() * 2 / (self.thd_pos ** 0.5)) # 计算所有维度的均值
def forward(self, x):
if self.per_channel:
s_grad_scale = 1.0 / ((self.thd_pos * x.numel()) ** 0.5) # 计算梯度缩放因子
else:
s_grad_scale = 1.0 / ((self.thd_pos * x.numel()) ** 0.5)
s_scale = grad_scale(self.s, s_grad_scale) # 返回的是值与s相同但梯度不同的s_scale
x = x / s_scale
x = t.clamp(x, self.thd_neg, self.thd_pos) # 限制在量化的范围内
x = round_pass(x) # 进行四舍五入而不改变梯度
x = x * s_scale # 四舍五入后再返回
return x
注意:在PyTorch中,许多操作(如加、减、乘、除)都定义了如何计算梯度,这使得我们可以使用反向传播算法来训练神经网络。然而,四舍五入操作(round)并没有定义梯度,因为它在整数处是不连续的,所以在这些点上没有定义导数。
- 因此,如果你直接对一个Tensor进行四舍五入,然后尝试计算梯度,你会发现梯度为None。因此作者使用了一个特殊的
round_pass
函数来进行四舍五入,而不是直接使用round
函数。round_pass
函数的作用是返回一个值等于四舍五入后的x
,但梯度等于x
的梯度的Tensor,这样就可以在进行四舍五入操作的同时保持梯度不变。- 同理
grad_scale
函数,如果你直接返回x * scale
,那么得到的Tensor的值会是x
的scale
倍,这并不是我们想要的。我们希望得到的Tensor的值仍然是x
,但其梯度是x
的梯度乘以scale
。
量化后的卷积层/线性层
import torch as t
class QuanConv2d(t.nn.Conv2d):
def __init__(self, m: t.nn.Conv2d, quan_w_fn=None, quan_a_fn=None):
assert type(m) == t.nn.Conv2d
super().__init__(m.in_channels, m.out_channels, m.kernel_size,
stride=m.stride,
padding=m.padding,
dilation=m.dilation,
groups=m.groups,
bias=True if m.bias is not None else False,
padding_mode=m.padding_mode)
self.quan_w_fn = quan_w_fn
self.quan_a_fn = quan_a_fn
self.weight = t.nn.Parameter(m.weight.detach())
self.quan_w_fn.init_from(m.weight)
if m.bias is not None:
self.bias = t.nn.Parameter(m.bias.detach())
def forward(self, x):
quantized_weight = self.quan_w_fn(self.weight)
quantized_act = self.quan_a_fn(x)
return self._conv_forward(quantized_act, quantized_weight, bias = None)
class QuanLinear(t.nn.Linear):
def __init__(self, m: t.nn.Linear, quan_w_fn=None, quan_a_fn=None):
assert type(m) == t.nn.Linear
super().__init__(m.in_features, m.out_features,
bias=True if m.bias is not None else False)
self.quan_w_fn = quan_w_fn
self.quan_a_fn = quan_a_fn
self.weight = t.nn.Parameter(m.weight.detach())
self.quan_w_fn.init_from(m.weight)
if m.bias is not None:
self.bias = t.nn.Parameter(m.bias.detach())
def forward(self, x):
quantized_weight = self.quan_w_fn(self.weight)
quantized_act = self.quan_a_fn(x)
return t.nn.functional.linear(quantized_act, quantized_weight, self.bias)
QuanModuleMapping = {
t.nn.Conv2d: QuanConv2d,
t.nn.Linear: QuanLinear
}
替换原有模型的网络层
modules_to_replace = quan.find_modules_to_quantize(model, args.quan)
model = quan.replace_module_by_names(model, modules_to_replace)
tbmonitor.writer.add_graph(model, input_to_model=train_loader.dataset[0][0].unsqueeze(0))
logger.info('Inserted quantizers into the original model')
【技巧代码】替换原有模型的网络层为量化后的网络层
def replace_module_by_names(model, modules_to_replace): # 将初始化量化后的模块替换掉原模型中的模块
def helper(child: t.nn.Module):
for n, c in child.named_children():
if type(c) in QuanModuleMapping.keys():
for full_name, m in model.named_modules():
if c is m:
child.add_module(n, modules_to_replace.pop(full_name))
break
else:
helper(c)
helper(model)
return model
加载预训练模型
if args.device.gpu and not args.dataloader.serialized:
model = t.nn.DataParallel(model, device_ids=args.device.gpu) # 该类运行你在多个GPU上并行运行模型
model.to(args.device.type)
start_epoch = 0
if args.resume.path: # 如果参数文件有预训练模型,则加载预训练模型
model, start_epoch, _ = util.load_checkpoint(
model, args.resume.path, args.device.type, lean=args.resume.lean)
【技巧代码】如何加载预训练模型
def load_checkpoint(model, chkp_file, model_device=None, strict=False, lean=False):
"""Load a pyTorch training checkpoint.
Args:
model: the pyTorch model to which we will load the parameters. You can
specify model=None if the checkpoint contains enough metadata to infer
the model. The order of the arguments is misleading and clunky, and is
kept this way for backward compatibility. 我们将加载参数的Pytorch模型,如果检查点包含足够的元数据以推断模型,则可以指定model = None。参数的顺序令人误解和笨拙,并且保持这种方式是为了向后兼容。
chkp_file: the checkpoint file 检查点文件
lean: if set, read into model only 'state_dict' field 如果设置,只读取模型中的“state_dict”字段
model_device [str]: if set, call model.to($model_device)
This should be set to either 'cpu' or 'cuda'. 如果设置,调用model.to($ model_device)这应该设置为“cpu”或“cuda”之一。
:returns: updated model, optimizer, start_epoch 返回更新的模型,优化器,start_epoch
"""
if not os.path.isfile(chkp_file): # 如果 chkp_file 不是文件,则报错
raise IOError('Cannot find a checkpoint at', chkp_file)
# 加载通过torch.save()保存的序列化对象,这些对象通常包括模型的状态字典、优化器的状态字典等
checkpoint = t.load(chkp_file, map_location=lambda storage, loc: storage) # lambada 函数无论原来的位置是什么,都会返回 storage,这意味着,无论张量原来在什么设备上,都会被放到内存cpu中
if 'state_dict' not in checkpoint: # 检查点必须包含模型参数
raise ValueError('Checkpoint must contain model parameters')
extras = checkpoint.get('extras', None) # 获取额外的参数
arch = checkpoint.get('arch', '_nameless_') # 获取模型的名称
checkpoint_epoch = checkpoint.get('epoch', None) # 获取检查点的 epoch
start_epoch = checkpoint_epoch + 1 if checkpoint_epoch is not None else 0 # 获取开始的 epoch
anomalous_keys = model.load_state_dict(checkpoint['state_dict'], strict) # 加载模型参数
if anomalous_keys: # 如果anomalous_keys不为空,说明加载的状态字典和模型的状态字典不完全匹配
missing_keys, unexpected_keys = anomalous_keys # 包含了模型中没有的键和检查点中没有的键
if unexpected_keys: # 如果有意外的键,发出警告
logger.warning("The loaded checkpoint (%s) contains %d unexpected state keys" %
(chkp_file, len(unexpected_keys)))
if missing_keys: # 如果有缺失的键,报错
raise ValueError("The loaded checkpoint (%s) is missing %d state keys" %
(chkp_file, len(missing_keys)))
if model_device is not None:
model.to(model_device)
if lean: # 如果 lean 为 True,说明只需要模型,不需要其他状态信息
logger.info("Loaded checkpoint %s model (next epoch %d) from %s", arch, 0, chkp_file)
return model, 0, None
else: # 否则返回模型,开始的 epoch,额外的状态信息
logger.info("Loaded checkpoint %s model (next epoch %d) from %s", arch, start_epoch, chkp_file)
return model, start_epoch, extras
参数器/学习率等调整函数
# Define loss function (criterion) and optimizer
criterion = t.nn.CrossEntropyLoss().to(args.device.type)
# optimizer = t.optim.Adam(model.parameters(), lr=args.optimizer.learning_rate)
optimizer = t.optim.SGD(model.parameters(),
lr=args.optimizer.learning_rate,
momentum=args.optimizer.momentum,
weight_decay=args.optimizer.weight_decay)
lr_scheduler = util.lr_scheduler(optimizer,
batch_size=train_loader.batch_size,
num_samples=len(train_loader.sampler),
**args.lr_scheduler) # 学习率调度器,需要优化器,批量大小,样本数量,以及其他参数
logger.info(('Optimizer: %s' % optimizer).replace('\n', '\n' + ' ' * 11))
logger.info('LR scheduler: %s\n' % lr_scheduler)
得分输出函数
perf_scoreboard = process.PerformanceScoreboard(args.log.num_best_scores)
【技巧代码】得分输出函数top1/top5/top10
class PerformanceScoreboard: # 用于记录和更新训练过程中的最佳得分
def __init__(self, num_best_scores):
self.board = list()
self.num_best_scores = num_best_scores
def update(self, top1, top5, epoch):
""" Update the list of top training scores achieved so far, and log the best scores so far"""
self.board.append({'top1': top1, 'top5': top5, 'epoch': epoch})
# Keep scoreboard sorted from best to worst, and sort by top1, top5 and epoch
curr_len = min(self.num_best_scores, len(self.board)) # 最佳得分数量,为参数文件中的 num_best_scores 和当前得分数量的最小值
self.board = sorted(self.board,
key=operator.itemgetter('top1', 'top5', 'epoch'),
reverse=True)[0:curr_len] # 按照 top1, top5, epoch 从大到小排序,取前 curr_len 个
for idx in range(curr_len):
score = self.board[idx]
logger.info('Scoreboard best %d ==> Epoch [%d][Top1: %.3f Top5: %.3f]',
idx + 1, score['epoch'], score['top1'], score['top5'])
def is_best(self, epoch):
return self.board[0]['epoch'] == epoch # 判断是否对应最佳得分
验证与训练数据集
if args.eval:
process.validate(test_loader, model, criterion, -1, monitors, args)
else: # training
if args.resume.path or args.pre_trained:
logger.info('>>>>>>>> Epoch -1 (pre-trained model evaluation)')
top1, top5, _ = process.validate(val_loader, model, criterion,
start_epoch - 1, monitors, args)
perf_scoreboard.update(top1, top5, start_epoch - 1)
for epoch in range(start_epoch, args.epochs):
logger.info('>>>>>>>> Epoch %3d' % epoch)
t_top1, t_top5, t_loss = process.train(train_loader, model, criterion, optimizer,
lr_scheduler, epoch, monitors, args)
v_top1, v_top5, v_loss = process.validate(val_loader, model, criterion, epoch, monitors, args)
tbmonitor.writer.add_scalars('Train_vs_Validation/Loss', {'train': t_loss, 'val': v_loss}, epoch)
tbmonitor.writer.add_scalars('Train_vs_Validation/Top1', {'train': t_top1, 'val': v_top1}, epoch)
tbmonitor.writer.add_scalars('Train_vs_Validation/Top5', {'train': t_top5, 'val': v_top5}, epoch)
perf_scoreboard.update(v_top1, v_top5, epoch)
is_best = perf_scoreboard.is_best(epoch)
util.save_checkpoint(epoch, args.arch, model, {'top1': v_top1, 'top5': v_top5}, is_best, args.name, log_dir)
logger.info('>>>>>>>> Epoch -1 (final model evaluation)')
process.validate(test_loader, model, criterion, -1, monitors, args)
tbmonitor.writer.close() # close the TensorBoard
logger.info('Program completed successfully ... exiting ...')
logger.info('If you have any questions or suggestions, please visit: github.com/zhutmost/lsq-net')
有两部分可以影响到
s
的值:
init_from
方法:在这个方法中,s
的初始值是根据输入数据x
的统计信息计算得到的。如果per_channel
为真,那么x
的每个通道的均值会被用来计算s
的初始值;否则,x
的全局均值会被用来计算s
的初始值。forward
方法:在这个方法中,s
被用来计算梯度缩放因子s_grad_scale
,然后用grad_scale
函数对其进行梯度缩放,得到用于量化x
的s_scale
。这个s_scale
的梯度会在反向传播过程中被计算,然后在优化步骤中被用来更新s
的值。