.. _ch_deepspeed_stage3_init: ############################################ Stage3 - 参数分割 ############################################ 在 ``DeepSpeedEngine`` 引擎中,会根据配置创建对应阶段的优化器实例, ``Stage-3`` 阶段对应的优化器是类 ``DeepSpeedZeroOptimizer_Stage3`` , 我们从这个类开始下钻分析。 DeepSpeedZeroOptimizer_Stage3 ############################################ 先看下这个类来自哪里, .. code-block:: python from deepspeed.runtime.zero.stage3 import DeepSpeedZeroOptimizer_Stage3 接下来,看下这个类的 ``__init__`` 方法, .. code-block:: python def __init__( self, module, # 待处理的模型 init_optimizer, # 基础优化器 timers, ds_config, # 配置信息 static_loss_scale=1.0, dynamic_loss_scale=False, dynamic_loss_args=None, verbose=True, contiguous_gradients=True, reduce_bucket_size=500000000, prefetch_bucket_size=50000000, max_reuse_distance=1000000000, max_live_parameters=1000000000, param_persistence_threshold=100000, model_persistence_threshold=sys.maxsize, dp_process_group=None, reduce_scatter=True, overlap_comm=False, offload_optimizer_config=None, offload_param_config=None, sub_group_size=1000000000000, mpu=None, clip_grad=0.0, gradient_accumulation_dtype=torch.float32, communication_data_type=torch.float16, postscale_gradients=True, gradient_predivide_factor=1.0, gradient_accumulation_steps=1, elastic_checkpoint=False, aio_config=None, all2all_process_group=None, zero_hpz_partition_size=1, zero_quantized_weights=False, zero_quantized_nontrainable_weights=False, ): ... self.optimizer = init_optimizer # 基础优化器,可能是 torch的优化器对象 ... # 参数的处理核心逻辑在这里 # self.parameter_offload 是一个 DeepSpeedZeRoOffload 对象 self.parameter_offload : DeepSpeedZeRoOffload = self.initialize_ds_offload( module=module, timers=timers, ds_config=ds_config, overlap_comm=overlap_comm, prefetch_bucket_size=prefetch_bucket_size, max_reuse_distance=max_reuse_distance, max_live_parameters=max_live_parameters, param_persistence_threshold=param_persistence_threshold, model_persistence_threshold=model_persistence_threshold, offload_param_config=offload_param_config, mpu=mpu, zero_param_parallel_group=zero_param_parallel_group, zero_quantized_weights=zero_quantized_weights, zero_quantized_nontrainable_weights=zero_quantized_nontrainable_weights) ... 顺着跟踪,看下类方法 ``initialize_ds_offload`` 的实现, 它其实就只是创建了一个 ``DeepSpeedZeRoOffload`` 的对象。 .. code-block:: python def initialize_ds_offload( self, module, timers, ds_config, overlap_comm, prefetch_bucket_size, max_reuse_distance, max_live_parameters, param_persistence_threshold, model_persistence_threshold, offload_param_config, mpu, zero_param_parallel_group, zero_quantized_weights, zero_quantized_nontrainable_weights, ): return DeepSpeedZeRoOffload(module=module, timers=timers, ds_config=ds_config, overlap_comm=overlap_comm, prefetch_bucket_size=prefetch_bucket_size, max_reuse_distance=max_reuse_distance, max_live_parameters=max_live_parameters, param_persistence_threshold=param_persistence_threshold, model_persistence_threshold=model_persistence_threshold, offload_param_config=offload_param_config, mpu=mpu, zero_param_parallel_group=zero_param_parallel_group, zero_quantized_weights=zero_quantized_weights, zero_quantized_nontrainable_weights=zero_quantized_nontrainable_weights) DeepSpeedZeRoOffload ############################################## Stage3 中对参数划分的的核心逻辑在 类 ``DeepSpeedZeRoOffload`` 中实现, 这个类在文件 ``deepspeed.runtime.zero.parameter_offload`` 中, 同样,查看这个类的初始化方法。 .. code-block:: python class DeepSpeedZeRoOffload(object): def __init__( self, module, timers, ds_config, overlap_comm=True, prefetch_bucket_size=50000000, max_reuse_distance=1000000000, max_live_parameters=1000000000, param_persistence_threshold=100000, model_persistence_threshold=sys.maxsize, offload_param_config=None, mpu=None, zero_param_parallel_group=None, zero_quantized_weights=False, zero_quantized_nontrainable_weights=False, ): ... # 参数分割的逻辑 self._convert_to_zero_parameters(ds_config, module, mpu) for m in module.modules(): _init_external_params(m) _inject_parameters(module, ZeROOrderedDict) ... ``__init__`` 方法中调用了方法 ``_convert_to_zero_parameters`` , 从方法名字也能看出就是我们想要的。 .. code-block:: python def _convert_to_zero_parameters(self, ds_config, module, mpu): # 没有 ds_id 属性的参数,即还没有被 deepspeed 处理过的参数 non_zero_params = [p for p in module.parameters() if not is_zero_param(p)] if non_zero_params: # 存在没有处理过的参数 zero_params = [p for p in module.parameters() if is_zero_param(p)] if zero_params: # 存已经处理过参数 # 这意味着这个模型,之前被初始化过,现在额外多了一些未处理的新参数 # 为了保持一致,调用原先的处理过程 zero_params[0].convert_to_zero_parameters(param_list=non_zero_params) else: # 模型参数是首次初始化 group = None if mpu: group = mpu.get_data_parallel_group() # 核心的参数分割过程在这里,来自 deepspeed.runtime.zero.partition_parameters.Init Init(module=module, data_parallel_group=group, dtype=self.dtype, config_dict_or_path=ds_config, remote_device=self.offload_device, pin_memory=self.offload_param_pin_memory, mpu=mpu, zero_param_parallel_group=self.zero_param_parallel_group, zero_quantized_weights=self.zero_quantized_weights, zero_quantized_nontrainable_weights=self.zero_quantized_nontrainable_weights) 这个方法最终又跳到了 ``Init`` 类,创建了 ``Init`` 实例, 真实够绕的。 Init 模块 #################################################### 这个 ``Init`` 类来自 .. code-block:: python from deepspeed.runtime.zero.partition_parameters import Init 先看官方的注释:: A context to enable massive model construction for training with ZeRO-3. Models are automatically partitioned (or, sharded) across the system and converted to half precision. 这个 ``Init`` 类就是 ``Stage-3`` 阶段对参数进行分割的核心所在了, 还是先从 ``__init__`` 方法开始。 .. code-block:: python # Replaces all parameters in module with Scattered Parameters class Init(InsertPostInitMethodToModuleSubClasses): param_id = 0 param_persistence_threshold = get_config_default(DeepSpeedZeroConfig, "param_persistence_threshold") model_persistence_threshold = get_config_default(DeepSpeedZeroConfig, "model_persistence_threshold") num_persisted_parameters = 0 num_persisted_elements = 0 apply_param_persistence = False override_module_apply = get_config_default(DeepSpeedZeroConfig, "override_module_apply") def __init__( self, module=None, data_parallel_group=None, mem_efficient_linear=True, remote_device=None, pin_memory=False, config_dict_or_path=None, config=None, enabled=True, dtype=None, mpu=None, zero_param_parallel_group=None, zero_quantized_weights=False, zero_quantized_nontrainable_weights=False, ): ... # 当前进程应该部署的设备 self.local_device = torch.device(get_accelerator().device_name(os.environ["LOCAL_RANK"])) get_accelerator().set_device(self.local_device) if module is not None: assert isinstance(module, torch.nn.Module) # 参数分割在这里 self._convert_to_zero_parameters(module.parameters(recurse=True)) ... .. _ch_deepspeed_stage3_convert_to_deepspeed_param: _convert_to_deepspeed_param --------------------------------------------------- 从这里开始才是进入到参数分割的过程, .. code-block:: python def _convert_to_zero_parameters(self, param_list): for param in param_list: # 跳过已经被处理过的参数 if is_zero_param(param): continue # 首先把参数数据移到目标设备 param.data = param.data.to(self.local_device) # 逐一处理每个参数 self._zero_init_param(param) def _zero_init_param(self, param): # 首先把原来的 torch.parameter 对象的参数进行扩展 # 这里的扩展就是增加必要的属性和方法 self._convert_to_deepspeed_param(param) if dist.get_world_group() == self.get_dp_process_group(): dist.broadcast(param, 0, self.get_dp_process_group()) else: dist.broadcast(param, dist.get_global_rank(self.get_dp_process_group(), 0), self.get_dp_process_group()) # 上面 self._convert_to_deepspeed_param(param) 为param 扩展了 partition() 方法 # 这里调用 partition()方法进行参数分割 param.partition() def _convert_to_deepspeed_param(self, param): """ 这个方法为原始的 param 实例对象扩展一些必要的属性和方法。 这里代码真实一言难尽。。。 """ # Partitioned, Normal, Remote param.ds_param_type = ZeroParamType.PARTITIONED # Replicated vs Partitioned vs Inflight param.ds_status = ZeroParamStatus.AVAILABLE # Stores the shape of the original tensor param.ds_shape = param.shape # Stores the number of elements in the original parameter without padding param.ds_numel = param.numel() # Stores the partitioned copy of the tensor # 保存分割后的参数向量,只是原来参数向量的一部分 param.ds_tensor = None # Keeps track of how many active sub-modules need this param at any given point in time param.ds_active_sub_modules = set() # If this flag is true, then the parameters are replicated throughput training # And only partitioned before the step if Init.apply_param_persistence and param.ds_numel <= Init.param_persistence_threshold and Init.num_persisted_elements + param.ds_numel <= Init.model_persistence_threshold: param.ds_persist = True Init.num_persisted_parameters += 1 Init.num_persisted_elements += param.ds_numel else: param.ds_persist = False param.is_external_param = False # The group that the parameter is scattered across. param.ds_process_group = self.ds_process_group # Stores the secondary partitioned copy of the tensor param.ds_secondary_tensor = None # Process group for secondary partition all (group) gather param.ds_zero_param_process_group = self.zero_param_process_group param.ds_secondary_tensor_group_size = self.num_ranks_in_param_group param.ds_secondary_tensor_num_of_groups = self.num_param_groups # This is set to the Async Param swapper if remote device is nvme # else this is set to None param.nvme_swapper = self.param_swapper # DeepSpeed Param ID param.ds_id = Init.param_id Init.param_id += 1 def all_gather(param_list=None, async_op=False, hierarchy=0): cls = param if param_list is None: param_list = [cls] return self._all_gather(param_list, async_op=async_op, hierarchy=hierarchy) @instrument_w_nvtx def all_gather_coalesced(params: Iterable[Parameter], forward: bool = True, safe_mode: bool = False, quantize: bool = False) -> AllGatherCoalescedHandle: """这个方法是用来恢复还原这个参数的,稍后会详细讨论""" ... # 这个方法内容太多,暂时省略掉,在后面的前向计算过程再讨论。 def partition(param_list=None, backward=False, hierarchy=0, has_been_updated=False): cls = param print_rank_0(f"{'--' * hierarchy}----Partitioning param {debug_param2name_id_shape_device(cls)}", force=False) if param_list is None: param_list = [cls] self._partition(param_list, has_been_updated=has_been_updated) def reduce_gradients_at_owner(param_list=None, hierarchy=0): cls = param if param_list is None: param_list = [cls] print_rank_0( f"{'--' * hierarchy}----Reducing Gradients for param with ids {[param.ds_id for param in param_list]} to owner" ) self._reduce_scatter_gradients(param_list) def partition_gradients(param_list=None, partition_buffers=None, hierarchy=0, accumulate=False): cls = param print_rank_0( f"{'--' * hierarchy}----Partitioning param gradient with id {debug_param2name_id_shape_device(cls)}") if param_list is None: param_list = [cls] if isinstance(partition_buffers, torch.Tensor): partition_buffers = [partition_buffers] self._partition_gradients(param_list, partition_buffers=partition_buffers, accumulate=accumulate) def aligned_size(): return self._aligned_size(param) def padding_size(): return self._padding_size(param) def partition_numel(): return self._partition_numel(param) def item_override(): param.all_gather() return param._orig_item() def ds_summary(slf: torch.Tensor, use_debug_name: bool = False) -> dict: return { "id": debug_param2name_id(slf) if use_debug_name else slf.ds_id, "status": slf.ds_status.name, "numel": slf.numel(), "ds_numel": slf.ds_numel, "shape": tuple(slf.shape), "ds_shape": tuple(slf.ds_shape), "requires_grad": slf.requires_grad, "grad_shape": tuple(slf.grad.shape) if slf.grad is not None else None, "persist": slf.ds_persist, "active_sub_modules": slf.ds_active_sub_modules, "ds_tensor.shape": slf.ds_tensor.shape if slf.ds_tensor is not None else None } def convert_to_zero_parameters(param_list): self._convert_to_zero_parameters(param_list) def allgather_before(func: Callable) -> Callable: def wrapped(*args, **kwargs): param.all_gather() return func(*args, **kwargs) return wrapped # Collectives for gathering and partitioning parameters param.all_gather = all_gather param.all_gather_coalesced = all_gather_coalesced param.partition = partition # Collective for averaging gradients param.reduce_gradients_at_owner = reduce_gradients_at_owner param.partition_gradients = partition_gradients # Partitioning size utilities param.aligned_size = aligned_size param.padding_size = padding_size param.partition_numel = partition_numel param.ds_summary = types.MethodType(ds_summary, param) param.item = allgather_before(param.item) param.convert_to_zero_parameters = convert_to_zero_parameters 上述代码中,为 ``param`` 赋予了一个方法 ``partition`` 方法,方法实现如下所示, 这个方法接收一个参数列表,并跳转到 ``Init::_partition`` 进行参数分割处理。 .. code-block:: python def partition(param_list=None, backward=False, hierarchy=0, has_been_updated=False): cls = param print_rank_0(f"{'--' * hierarchy}----Partitioning param {debug_param2name_id_shape_device(cls)}", force=False) if param_list is None: param_list = [cls] self._partition(param_list, has_been_updated=has_been_updated) .. _ch_deepspeed_stage3_partition: partition ------------------------------------- 再次吐槽,这代码写的,生怕别人看懂。 .. code-block:: python self.remote_device = self.local_device if remote_device in [None, OffloadDeviceEnum.none] else remote_device # Enable fp16 param swapping to NVMe # self.dtype 根据配置文件,self.dtype 有可能是半精度 if self.remote_device == OffloadDeviceEnum.nvme: self.param_swapper = AsyncPartitionedParameterSwapper(_ds_config, self.dtype) else: self.param_swapper = None 先把需要的一些变量展示处理,接下来终于进入参数分割逻辑了,真不容易。。。 .. code-block:: python def _partition_param(self, param, buffer=None, has_been_updated=False): assert param.ds_status is not ZeroParamStatus.INFLIGHT, f" {param} Cannot partition a param in flight" global reuse_buffers print_rank_0(f"Param id {param.ds_id} status is {param.ds_status}", force=False) if param.ds_status is ZeroParamStatus.AVAILABLE: print_rank_0(f"Partitioning param id {param.ds_id} reuse buffers {reuse_buffers}", force=False) # 这个参数已经被分割过了 if param.ds_tensor is not None and not has_been_updated: ##param already partitioned see_memory_usage(f'Before partitioning param {param.ds_id} {param.shape}', force=False) # param.data does not store anything meaningful in partitioned state # 释放原参数的向量 free_param(param) see_memory_usage(f'After partitioning param {param.ds_id} {param.shape}', force=False) if param.ds_tensor.final_location == OffloadDeviceEnum.nvme: print_rank_0(f"Param {param.ds_id} partition released since it exists in nvme", force=False) param.nvme_swapper.remove_partition_and_release_buffers([param]) print_rank_0( f"after swap Param {param.ds_id} {param.ds_tensor.shape} partition released since it exists in nvme", force=False) return # 需要把参数张量切分成 self.num_partitions 份 # 如果参数张量的元素数量不是 self.num_partitions 的整数倍,需要 padding 对齐 tensor_size = self._aligned_size(param) # self.num_partitions == self.dp_world_size == dist.get_world_size(group=self.ds_process_group) # self.ds_process_group 默认是全体进程组成的默认组,亦可以通过入参 data_parallel_group 指定 partition_size = tensor_size // self.num_partitions # 每一份的尺寸 if param.ds_tensor is None: # 记录参数向量最终被卸载到哪个设备 final_location = None # self.remote_device 表示参数卸载的最终设备 # self.param_swapper 是和 nvme 设备交换的工具类,其内部已经按照 self.dtype 设置数据类型 if self.remote_device == OffloadDeviceEnum.nvme and self.param_swapper.swappable_tensor( numel=partition_size): # 如果卸载到 nvme,并且nvme还有足够的缓存空间,可以把 partitioned_tensor 直接创建在nvme缓冲区 final_location = OffloadDeviceEnum.nvme buffer = self.param_swapper.get_buffer(param, partition_size) # 注意这里新的参数和原参数数据类型保持一致 partitioned_tensor = torch.empty(0, dtype=param.dtype, device=buffer.device) partitioned_tensor.data = buffer.data print_rank_0(f"ID {param.ds_id} Initializing partition for the first time for nvme offload.") else: # 不卸载到 nvme 或者 nvme 没有空间了 if param.ds_persist: # ds_persist 表示当前参数不进行卸载(Offload) device = self.local_device # 保留在原设备中 elif self.remote_device == OffloadDeviceEnum.nvme: # 这里应为nvme没有空间了,即使最终要卸载到 nvme,这里暂时用 cpu 内存进行缓冲 device = OffloadDeviceEnum.cpu else: device = self.remote_device # 先创建一个空的向量 device = gpu_x or cpu partitioned_tensor = torch.empty(partition_size, dtype=param.dtype, device=device) # quantize the tensor if it's not trainable # 如果参数不需要求梯度,并且启动了量化功能,就对这个参数进行量化 if not param.requires_grad and self.quantized_nontrainable_weights: partitioned_tensor, partitioned_tensor.ds_quant_scale = self.quantizer_module.quantize( partitioned_tensor) if device == OffloadDeviceEnum.cpu and self.pin_memory: # 启动 pin_memory partitioned_tensor = get_accelerator().pin_memory(partitioned_tensor) # partitioned_tensor.requires_grad = False # 分割后参数向量只是暂存,不需要求梯度 param.ds_tensor = partitioned_tensor # 这个属性存储分割后参数片段 param.ds_tensor.ds_numel = partition_size param.ds_tensor.status = PartitionedParamStatus.AVAILABLE # 状态标记好 param.ds_tensor.final_location = final_location # 记录下这个片段存储的设备 # 根据rank编号,判断归属于当前进程的片段起始位置 start = partition_size * self.get_partition_rank() end = start + partition_size # 把张量打平成一维连续数组 one_dim_param = param.contiguous().view(-1) if start < param.ds_numel and end <= param.ds_numel: # 取出片段,并复制到 param.ds_tensor src_tensor = one_dim_param.narrow(0, start, partition_size) param.ds_tensor.copy_(src_tensor) else: if start < param.ds_numel: elements_to_copy = param.ds_numel - start param.ds_tensor.narrow(0, 0, elements_to_copy).copy_(one_dim_param.narrow(0, start, elements_to_copy)) see_memory_usage(f'Before partitioning param {param.ds_id} {param.shape}', force=False) # 释放原来的参数张量 free_param(param) see_memory_usage(f'After partitioning param {param.ds_id} {param.shape}', force=False) # 如果最终是要卸载到 nvme,这里需要用 self.param_swapper 交换出去 if param.ds_tensor.final_location == OffloadDeviceEnum.nvme: self.param_swapper.swap_out_and_release([param]) print_rank_0(f"ID {param.ds_id} Offloaded to nvme offload and buffers released.") see_memory_usage(f"ID {param.ds_id} Offloaded to nvme offload and buffers released.", force=False) print_rank_0(f"ID {param.ds_id} partitioned type {param.dtype} dev {param.device} shape {param.shape}") 参数分割逻辑其实不复杂,就是细节很多,每个 rank 只保留参数的一个片段, 并且这个片段还可以存储在 ``cpu`` 内存或则 ``nvme`` 高速SSD中,这样能最大限度降低对 ``GPU`` 显存的需求。 弄清楚参数分割的过程后,接下来就是看在前后向过程是如何还原参数的, 可以跳到章节 :numref:`ch_deepspeed_hook` 进行跟踪。 partition_param_sec ------------------------------------- 待补充。。。