3. stage2 - 初始化

从入口那里已经看到,当 stage 1,2 时, 会创建优化器 DeepSpeedZeroOptimizer 代替原来的优化器, stage 1,2 的特性都在这个优化器中实现。

Stage 1,2 的核心功能就是对参数的梯度和优化器的状态进行分割, 每个进程(GPU,rank)只保留一部分,减少对显存的消耗。 新版本中,这部分也支持 cpu offload 的功能。

核心思路也简单,就是对基础(原始)优化器内的 params_group 进行处理, 只保留属于当前进程(GPU,rank)的参数,其它的都从优化器中删除, 这样就只会计算保留部分的梯度,以及只有保留部分的优化器状态。

对于优化器的分割初始化功能的实现都写在类的 __init__ 方法里了, 省略一些吐槽。导致这个 __init__ 方法代码很长,我们把它分拆了讲解。

3.1. 配置项初始化

首先是一些入参、配置项、变量的初始化。

from deepspeed.runtime.zero.stage_1_and_2 import DeepSpeedZeroOptimizer
class DeepSpeedZeroOptimizer(ZeROOptimizer):
"""
DeepSpeedZeroOptimizer designed to reduce the memory footprint
required for training large deep learning models.

For more details please see ZeRO: Memory Optimization Towards Training A Trillion Parameter Models
https://arxiv.org/abs/1910.02054

For usage examples, refer to TODO: DeepSpeed Tutorial

"""
   def __init__(self,
                 init_optimizer,
                 param_names,
                 timers,
                 static_loss_scale=1.0,
                 dynamic_loss_scale=False,
                 dynamic_loss_args=None,
                 verbose=True,
                 contiguous_gradients=True,
                 reduce_bucket_size=500000000,
                 allgather_bucket_size=5000000000,
                 dp_process_group=None,
                 expert_parallel_group=None,
                 expert_data_parallel_group=None,
                 reduce_scatter=True,
                 overlap_comm=False,
                 offload_optimizer_config=None,
                 mpu=None,
                 clip_grad=0.0,
                 gradient_accumulation_dtype=torch.float32,
                 communication_data_type=torch.float16,
                 postscale_gradients=True,
                 gradient_predivide_factor=1.0,
                 gradient_accumulation_steps=1,
                 ignore_unused_parameters=True,
                 partition_grads=True,
                 round_robin_gradients=False,
                 has_moe_layers=False,
                 fp16_master_weights_and_gradients=False,
                 elastic_checkpoint=False):
        # 二阶段也支持 cpu offload 和 pin memory,根据配置情况判断是否启用
        if offload_optimizer_config is not None and offload_optimizer_config.device != OffloadDeviceEnum.none:
            self.cpu_offload = True
            self.cpu_offload_pin_memory = offload_optimizer_config.pin_memory
        else:
            self.cpu_offload = False
            self.cpu_offload_pin_memory = False

        if dist.get_rank() == 0:
            logger.info(f"Reduce bucket size {reduce_bucket_size}")
            logger.info(f"Allgather bucket size {allgather_bucket_size}")
            logger.info(f"CPU Offload: {self.cpu_offload}")
            logger.info(f'Round robin gradient partitioning: {round_robin_gradients}')
        # The fused optimizer does all the work. We need this layer for two reason:
        # 1. maintain same user API from apex.fp16_utils
        # 2. keep common stuff here in case we need to add ne552w fused optimizer later

        self.elastic_checkpoint = elastic_checkpoint
        self.param_names = param_names
        self.mpu = mpu
        # differences from apex.fp16_utils:
        # - assume all model params in fp16
        # - assume all params requires grad
        # - flat by groups, not keeping state. TODO: remove state explicitly?
        # - master grad and unflat master weight never exist. TODO: a way to save out unflat master?
        if not get_accelerator().is_available():
            raise SystemError("Accelerator is not detected, cannot perform low precision training (e.g., fp16, bf16).")
        # 基础优化器
        self.optimizer = init_optimizer

        # Use torch (un)flatten ops
        # 把张量打开扁平化的方法,这两个方法调用的是 torch 的方法
        self.flatten = _flatten_dense_tensors
        self.unflatten = _unflatten_dense_tensors

        # ZeRO stage 1 (False) or 2 (True)
        #  是否启用梯度分割
        self.partition_gradients = partition_grads  # type: bool
        # stage 阶段
        self.zero_stage_string = "ZeRO-2" if partition_grads else "ZeRO-1"

        self.timers = timers

        self.reduce_scatter = reduce_scatter
        # 配置项 默认为 False
        # 尝试将梯度缩减与逆向计算相重叠
        self.overlap_comm = overlap_comm # type: bool

        self.deepspeed_adam_offload = self.cpu_offload

        self.device = get_accelerator().current_device_name() if not self.cpu_offload else 'cpu'
        # 所属的并行进程组
        self.dp_process_group = dp_process_group

        #  专家并行所属的组  expert parallel group
        self.ep_process_group = expert_parallel_group

        # 专家数据并行组  data parallel group for experts
        self.expert_dp_process_group = expert_data_parallel_group

        #   data parallel size for non-experts
        dp_size = dist.get_world_size(group=self.dp_process_group)

        #For MoE models this maybe different for different param group
        #It will be modified during MoE setup later in the init
        self.real_dp_process_group = [dp_process_group for i in range(len(self.optimizer.param_groups))]
        self.partition_count = [dp_size for i in range(len(self.optimizer.param_groups))]

        self.is_gradient_accumulation_boundary = True

        # CPU-Offload requires contiguous gradients
        # 在生成梯度时将其复制到连续的缓冲区中。避免了后向传递过程中的内存碎片。
        self.contiguous_gradients = contiguous_gradients or self.cpu_offload  # type: bool
        # 是否有 moe 层
        self.has_moe_layers = has_moe_layers
        if self.has_moe_layers:
            self._configure_moe_settings()
        self._global_grad_norm = 0.

        if mpu is None:
            self.model_parallel_group = None
            self.model_parallel_world_size = 1
            self.model_parallel_rank = 0
        else:
            self.model_parallel_group = mpu.get_model_parallel_group()
            self.model_parallel_world_size = mpu.get_model_parallel_world_size()
            self.model_parallel_rank = bwc_tensor_model_parallel_rank(mpu)

        self.overflow = False
        self.clip_grad = clip_grad
        self.communication_data_type = communication_data_type
        self.gradient_predivide_factor = gradient_predivide_factor
        self.postscale_gradients = postscale_gradients
        self.gradient_accumulation_steps = gradient_accumulation_steps
        self.micro_step_id = 0
        self.ignore_unused_parameters = ignore_unused_parameters
        self.round_robin_gradients = round_robin_gradients

        self.extra_large_param_to_reduce = None
        self.fp16_master_weights_and_gradients = fp16_master_weights_and_gradients

        if self.fp16_master_weights_and_gradients:
            assert self.cpu_offload and type(self.optimizer) in [DeepSpeedCPUAdam], \
            f"fp16_master_and_gradients requires optimizer to support keeping fp16 master and gradients while keeping the optimizer states in fp32."\
            f"Currently only supported using ZeRO-Offload with DeepSpeedCPUAdam. But current setting is ZeRO-Offload:{self.cpu_offload} and optimizer type {type(self.optimizer)}." \
            f"Either disable fp16_master_weights_and_gradients or enable {self.zero_stage_string} Offload with DeepSpeedCPUAdam."

        if self.reduce_scatter:
            valid_reduce_scatter_dtypes = (torch.float16, torch.bfloat16, torch.float32)
            assert self.communication_data_type in valid_reduce_scatter_dtypes, f"{self.zero_stage_string} supports {valid_reduce_scatter_dtypes} communication_data_type with reduce scatter enabled. Got: '{self.communication_data_type}'"
            assert self.gradient_predivide_factor == 1.0, "gradient_predivide_factor != 1.0 is not yet supported with {self.zero_stage_string} with reduce scatter enabled"
            assert self.postscale_gradients, "pre-scale gradients is not yet supported with {self.zero_stage_string} with reduce scatter enabled"

        # param flattened by groups
        self.bit16_groups = []
        self.bit16_groups_flat = []

        # param partitioned by data parallel degree
        # this will contain a list of equal sized tensors
        # each of which will be updated by a different process
        self.parallel_partitioned_bit16_groups = []

        # a single 32-bit partition of the parallel partitioned parameters
        # that this process will update
        self.single_partition_of_fp32_groups = []

        # param partition info

        # These are the parameters in each group that will not be updated by this process directly
        self.params_not_in_partition = []

        # These are the parameters that will be updated by this process directly
        self.params_in_partition = []

        # Offset from the first parameter in the self.params_in_partition
        # the parameter boundaries may not align with partition boundaries
        # so we need to keep track of the offset
        self.first_offset = []

        # number of elements per partition in each group
        self.partition_size = []

        # align nccl all-gather send buffers to 4-byte boundary
        self.nccl_start_alignment_factor = 2  # 4-byte alignment/sizeof(fp16) = 2

        assert (
            allgather_bucket_size % self.nccl_start_alignment_factor == 0
        ), f"allgather_bucket_size must be a multiple of nccl_start_alignment_factor, {self.nccl_start_alignment_factor} "

        self.all_reduce_print = False
        self.dtype = self.optimizer.param_groups[0]['params'][0].dtype
        self.gradient_accumulation_dtype = gradient_accumulation_dtype

        if self.dtype != self.gradient_accumulation_dtype:
            self.use_separate_grad_accum = True
        else:
            self.use_separate_grad_accum = False
        if self.use_separate_grad_accum and not self.partition_gradients:
            self.use_grad_accum_for_reduction = True
        else:
            self.use_grad_accum_for_reduction = False

        self.round_robin_bit16_groups = []
        self.round_robin_bit16_indices = []

        # Use different parallel to do all_to_all_reduce related things
        # padding on each partition for alignment purposes
        self.groups_padding = []

3.2. 参数分割

接下来是一个大循环,循环处理 self.optimizer.param_groups 每个参数组,这里先回顾一下 optimizer.param_groups 是什么。

首先 self.optimizer 是原来的基础优化器,它是 torch.optim.Optimizer 的(兼容)实例。 在创建 torch.optim.Optimizer 时,可以对模型参数进行分组,每组使用不同的学习率和更新参数, 这个 optimizer.param_groups: List[Dict] 是存储这个组的。其本身是一个 list,每个元素是一个 dict, 每个 dict 的key 是 dict_keys(['params', 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad'])

  • ‘params’ :需要梯度更新的模型参数。

  • ‘lr’, ‘betas’, ‘eps’, ‘weight_decay’, ‘amsgrad’ : 本组参数学习率相关的配置项,可以不用管。

# loop to deal with groups
 # 在创建 optimizer 时,可以对模型参数进行分组,每组使用不同的 学习率和更新参数
 # 这个 self.optimizer.param_groups 是存储这个组的
 # 其本身是一个 list,每个元素是一个 dict
 # self.optimizer.param_groups : List[Dict]
 # 每个 dict 的key 是 dict_keys(['params', 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad'])
 # 'params' :需要梯度更新的模型参数
 # 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad' : 本组参数学习率相关的配置项
 for i, param_group in enumerate(self.optimizer.param_groups):
     # 每组参数分开处理
     partition_id = dist.get_rank(group=self.real_dp_process_group[i])

     # push this group to list before modify
     # TODO: Explore simplification that avoids the extra book-keeping by pushing the reordered group
     trainable_parameters = []
     for param in param_group['params']:
         if param.requires_grad:
             param.grad_accum = None
             trainable_parameters.append(param)
     # 当前 param_group 中需要梯度更新的参数列表
     # 后续的分割都是针对他们的
     self.bit16_groups.append(trainable_parameters)

     # not sure why apex was cloning the weights before flattening
     # removing cloning here

     see_memory_usage(f"Before moving param group {i} to CPU")
     # move all the parameters to cpu to free up GPU space for creating flat buffer
     # 先转移到 cpu 内存,在 cpu 内存中进行处理
     move_to_cpu(self.bit16_groups[i])
     empty_cache()
     see_memory_usage(f"After moving param group {i} to CPU", force=False)

     # Reorder group parameters for load balancing of gradient partitioning during backward among ranks.
     # This ensures that gradients are reduced in a fashion such that ownership round robins among the ranks.
     # For example, rather than 3 gradients (g_n+2, g_n+1, g_n) that are reduced consecutively belonging
     # to the same rank, instead they will belong to 3 ranks (r_m+2, r_m+1, r_m).
     # 我们要把参数分配到不同的 rank,然后每个 rank 负责部分参数的梯度计算
     # 可以先不用理具体怎么分的,反正就是按照组内进程(GPU)数量进行划分,
     if self.round_robin_gradients:
         # 为了能尽量的均匀分配,这里采用循环分配(round_robin 方法)
         round_robin_tensors, round_robin_indices = self._round_robin_reorder(
             self.bit16_groups[i], dist.get_world_size(group=self.real_dp_process_group[i]))
     else:
         round_robin_tensors = self.bit16_groups[i]
         round_robin_indices = list(range(len(self.bit16_groups[i])))

     self.round_robin_bit16_groups.append(round_robin_tensors)
     self.round_robin_bit16_indices.append(round_robin_indices)

     # create flat buffer in CPU and move to GPU
     # 将参数列表打平放到一个一维连续空间中
     self.bit16_groups_flat.append(
         self.flatten_dense_tensors_aligned(
             self.round_robin_bit16_groups[i],
             self.nccl_start_alignment_factor * dist.get_world_size(group=self.real_dp_process_group[i])).to(
                 get_accelerator().current_device_name()))
     see_memory_usage(f"After flattening and moving param group {i} to GPU", force=False)

     # Record padding required for alignment
     # 上面在打平的时候,可能在尾部添加了padding,这里要记录一下padding的个数
     if partition_id == dist.get_world_size(group=self.real_dp_process_group[i]) - 1:
         padding = self.bit16_groups_flat[i].numel() - sum(
             [t.numel() for t in self.round_robin_bit16_groups[i]])
     else:
         padding = 0
     self.groups_padding.append(padding)

     if dist.get_rank(group=self.real_dp_process_group[i]) == 0:
         see_memory_usage(f"After Flattening and after emptying param group {i} cache", force=False)

     # set model bit16 weight to slices of flattened buffer
     self._update_model_bit16_weights(i)

     # divide the flat weights into near equal partition equal to the data parallel degree
     # each process will compute on a different part of the partition
     # data_parallel_partitions 是分割好的结果
     # data_parallel_partitions 是一个字典类型,key 是 rank ,value 是分号的参数
     data_parallel_partitions: dict = self.get_data_parallel_partitions(self.bit16_groups_flat[i], i)
     self.parallel_partitioned_bit16_groups.append(data_parallel_partitions)

     # verify that data partition start locations are 4-byte aligned
     for partitioned_data in data_parallel_partitions:
         assert (partitioned_data.data_ptr() % (2 * self.nccl_start_alignment_factor) == 0)

     # A partition of the fp32 master weights that will be updated by this process.
     # Note that the params in single_partition_of_fp32_groups is cloned and detached
     # from the origin params of the model.
     # 把属于当前进程(rank)的参数移动到指定设备,然后创建一个副本
     # 这个副本用于累积梯度进行参数更新,根据配置,可以是 单精度(float32)也可以是半精度(float16)
     # 注意这个副本 detach 操作
     # 返回一个新的tensor,从当前计算图中分离下来的,但是仍指向原变量的存放位置, 不同之处只是requires_grad为false,
     # 得到的这个tensor永远不需要计算其梯度,不具有grad
     if not fp16_master_weights_and_gradients:
         self.single_partition_of_fp32_groups.append(self.parallel_partitioned_bit16_groups[i][partition_id].to(
             self.device).clone().float().detach())
     else:
         self.single_partition_of_fp32_groups.append(self.parallel_partitioned_bit16_groups[i][partition_id].to(
             self.device).clone().half().detach())
     # self.single_partition_of_fp32_groups 中只包含属于当前进程(rank)的参数

     # Set local optimizer to have flat params of its own partition.
     # After this, the local optimizer will only contain its own partition of params.
     # In that case, the local optimizer only saves the states(momentum, variance, etc.) related to its partition's params(zero stage1).
     # todo :  这里没理解,按照 detach 的说明,即使赋予 requires_grad = True 也不会计算梯度
     self.single_partition_of_fp32_groups[
         i].requires_grad = True  # keep this in case internal optimizer uses it
     # 重置了优化器的 param_group,仅包含分给当前进程(rank)的参数
     param_group['params'] = [self.single_partition_of_fp32_groups[i]]

     partition_size = len(self.bit16_groups_flat[i]) / dist.get_world_size(group=self.real_dp_process_group[i])
     params_in_partition, params_not_in_partition, first_offset = self.get_partition_info(
         self.round_robin_bit16_groups[i], partition_size, partition_id)

     self.partition_size.append(partition_size)
     self.params_in_partition.append(params_in_partition)
     self.params_not_in_partition.append(params_not_in_partition)
     self.first_offset.append(first_offset)

3.3. cpu offload

待补充