Deepseed 深度学习优化库综述

https://github.com/microsoft/DeepSpeed

教程文档:https://www.deepspeed.ai/

DeepSpeed是一个由微软开发的开源深度学习优化库,旨在提高大规模模型训练的效率和可扩展性。它通过多种技术手段来加速训练,包括模型并行化、梯度累积、动态精度缩放、本地模式混合精度等。DeepSpeed还提供了一些辅助工具,如分布式训练管理、内存优化和模型压缩等,以帮助开发者更好地管理和优化大规模深度学习训练任务。此外,deepspeed基于pytorch构建,只需要简单修改即可迁移。DeepSpeed已经在许多大规模深度学习项目中得到了应用,包括语言模型、图像分类、目标检测等等。

DeepSpeed有四大创新支柱:

DeepSpeed提供了一系列系统创新,使大规模DL培训变得有效和高效,大大提高了易用性,并在可能的规模方面重新定义了DL培训景观。这些创新,如ZeRO,3D-Mecelism,DeepSpeed-MoE,ZeRO-Infinity等都属于DeepSpeed-Training支柱。了解更多:DeepSpeed-Training

DeepSpeed汇集了张量、流水线、专家和ZeRO-parallelism等并行技术的创新,并将其与高性能自定义推理内核、通信优化和异构内存技术相结合,以前所未有的规模实现推理,同时实现无与伦比的延迟、吞吐量和成本降低。这种推理系统技术的系统组成DeepSpeed-Inference。了解更多:DeepSpeed-Inference

为了进一步提高推理效率,DeepSpeed为研究人员和从业者提供了易于使用和灵活组合的压缩技术,以压缩他们的模型,同时提供更快的速度,更小的模型大小,并显着降低压缩成本。此外,SoTA在压缩方面的创新,如ZeroQuant和XTC,都包含在DeepSpeed-Compression支柱下。了解更多:DeepSpeed-Compression

DeepSpeed库将DeepSpeed训练、推理和压缩支柱中的创新和技术实现并打包到一个易于使用的开源存储库中。它允许在单个训练,推理或压缩管道中轻松组合多种功能。DeepSpeed库被DL社区广泛采用,并已用于启用一些最强大的模型(请参阅DeepSpeed采用)。

Model Implementations for Inference(MII)是一个开源的存储库,通过减轻应用复杂系统优化技术的需求,使所有数据科学家都可以访问低延迟和高吞吐量的推理。开箱即用,MII为数千种广泛使用的DL模型提供支持,使用DeepSpeed-Inference进行优化,可以使用几行代码进行部署,同时与其香草开源版本相比,实现了显着的延迟减少。

DeepSpeed已与几种不同的流行开源DL框架集成,例如:

https://huggingface.co/docs/transformers/deepspeed

https://www.zhihu.com/people/deepspeed

NCCL–多卡训练后端

https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/overview.html

https://developer.nvidia.com/nccl

NCCL” 代表 “NVIDIA Collective Communications Library”,”NVIDIA 集体通信库“,它是一种由 NVIDIA 开发的用于高性能计算通信库。NCCL 专门设计用于加速 GPU 群集之间的通信,以便在并行计算深度学习等领域中提供更好的性能。

NVIDIA 集合通信库 (NCCL) 可实现针对 NVIDIA GPU 和网络进行性能优化的多 GPU 和多节点通信基元。NCCL 提供了 all-gather、all-reduce、broadcast、reduce、reduce-scatter、point-to-point send 和 receive 等例程,这些例程均经过优化,可通过节点内的 PCIe 和 NVLink 高速互联以及节点间的 NVIDIA Mellanox 网络实现高带宽和低延迟。

NCCL相关环境变量说明 :

【https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage.html】

  1. NCCL_TIMEOUT:设置集合操作超时阈值,单位毫秒;如果常见超时错误,适当增大该值,但不能太大NCCL_TIMEOUT 环境变量用于设置 NCCL 集体通信操作的超时时间。通过调整这个值,你可以更好地处理网络延迟和不稳定的问题,确保 NCCL 通信的稳定性和可靠性。如果在集体通信过程中遇到超时问题,可以尝试调整此环境变量以解决问题。

设置超时时间:

  • NCCL_TIMEOUT 用于定义 NCCL 集体通信操作的超时时间。超时时间是 NCCL 在执行操作时等待响应的最长时间,超出此时间将触发超时错误。

解决网络问题:

  • 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致集体通信操作超时。设置合适的 NCCL_TIMEOUT 可以帮助调节容错设置,避免训练过程中因超时错误而中断。

性能调优:

  • 根据你的集群配置和网络状况,适当调整 NCCL_TIMEOUT 可以帮助优化通信性能和稳定性。
  1. NCCL_ALGO:选择集合通信算法,如Ring, Tree;不同拓扑适合不同算法,测试选更优算法
  2. NCCL_CHUNK_SIZE:定义环形传输缓冲区大小;合理设置可提速,但也会增加内存消耗
  3. NCCL_DEBUG:打开NCCL调试日志;出现问题时打开调试,但会降低速度,不要在生产环境使用
  4. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  5. NCCL_P2P_LEVEL:设置点对点通信优化级别;增加该值可减少P2P次数,提高某些操作效率
  6. NCCL_P2P_DISABLE:禁用点对点通信,强制使用集合通信。在某些情况下,P2P 通信可能会导致性能问题或出现错误。禁用 P2P 通信可以帮助解决这些问题。如果你遇到与 P2P 通信相关的错误或不稳定性,禁用 P2P 可能有助于恢复系统的稳定性。
  7. NCCL_PXN_DISABLE:禁用使用非本地 NIC 的节点间通信,使用 NVLink 和一个中间 GPU。建议设置成1。在PyTorch中进行跨节点all-to-all通信时,如果该环境变量是0会出现异常。
  8. NCCL_SOCKET_IFNAME:选择网络接口。
  9. NCCL_SOCKET_NTHREADS 增加它的数量可以提高socker传输的效率,但是会增加CPU的负担
  10. NCCL_NET_GDR_LEVEL:设置GPUDirect RDMA的使用级别。
  11. NCCL_MAX_NRINGS:定义支持的最大NCCL环路数。
  12. NCCL_MIN_NRINGS:定义最小环路数。
  13. NCCL_BUFFSIZE:设置scratch空间大小。
  14. NCCL_BUFFLE_SIZE 缓存数据量,缓存越大一次ring传输的数据就越大自然对带宽的压力最大,但是相应的总延迟次数会少。默认值是4M(4194304),注意设置的时候使用bytes(字节大小)
  15. NCCL_NTHREADS:设置NCCL内部使用的线程数。
  16. NCCL_VERSION:显示NCCL版本信息。
  17. NCCL_MAX/MIN_NCHANNELS 最小和最大的rings,rings越多对GPU的显存、带宽的压力都越大,也会影响计算性能
  18. NCCL_CHECKS_DISABLE 在每次集合通信进行前对参数检验校对,这会增加延迟时间,在生产环境中可以设为1.默认是0
  19. NCCL_CHECK_POINTERS 在每次集合通信进行前对CUDA内存 指针进行校验,这会增加延迟时间,在生产环境中可以设为1.默认是0
  20. NCCL_NET_GDR_LEVEL GDR触发的条件,默认是当GPU和NIC挂载一个swith上面时使用GDR
  21. NCCL_IGNORE_CPU_AFFINITY 忽略CPU与应用的亲和性使用GPU与nic的亲和性为主
  22. NCCL_IB_DISABLE:禁用InfiniBand传输。

禁用 InfiniBand: 设置 NCCL_IB_DISABLE=1 会禁用 NCCL 在 InfiniBand 设备上的使用。这意味着 NCCL 将不会利用 InfiniBand 网络进行数据传输,而是回退到其他网络接口(例如以太网或其他网络接口)。

调试和兼容性: 禁用 InfiniBand 可能用于调试目的,或在系统中 InfiniBand 网络出现问题时回退到其他网络接口。如果你遇到与 InfiniBand 相关的错误或兼容性问题,禁用 InfiniBand 可能有助于解决这些问题。

  1. NCCL_IB_HCA 代表IB使用的设备:Mellanox mlx5系列的HCA设备NCCL_IB_HCA=mlx5 会默认轮询所有的设备。NCCL_IB_HCA=mlx5_0:1 指定其中一台设备。
  2. NCCL_IB_TIMEOUT 改变量用于控制InfiniBand Verbs超时。取值范围1-22。超时时间的计算公式为4.096微秒 * 2 ^ timeout,正确的值取决于网络的大小。增加该值可以在非常大的网络上提供帮助,例如 NCCL在调用ibv_poll_cq时出现错误12时。建议在大模型训练任务中设置成最大值22,可以减少不少nccl timeout异常。设置超时时间: NCCL_IB_TIMEOUT 用于控制 InfiniBand 网络操作的超时时间。通过调整这个值,你可以控制 NCCL 在遇到通信延迟或网络问题时的容忍度。解决网络问题: 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致超时错误。调整 NCCL_IB_TIMEOUT 可以帮助你在遇到网络问题时更好地调节超时设置,避免训练过程被中断。
  1. NCCL_IB_RETRY_CNT变量控制 InfiniBand 的重试次数。建议在大模型训练任务中设置成13,尽可能多重试。
  2. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  3. NCCL_IB_PCI_RELAXED_ORDERING启用 IB Verbs 传输的Relaxed Ordering。Relaxed Ordering可以极大地提高虚拟化环境下 InfiniBand 网络的性能。设置为 2,如果可用,自动使用Relaxed Ordering。设置为 1,强制使用Relaxed Ordering,如果不可用则失败。设置为 0,禁用使用Relaxed Ordering。默认值为 2。建议值为1

PyTorch 提速

摘自:https://github.com/lartpang/PyTorchTricks?tab=readme-ov-file

Note

原始文档:https://www.yuque.com/lart/ugkv9f/ugysgn

声明: 大部分内容来自知乎和其他博客的分享, 这里只作为一个收集罗列. 欢迎给出更多建议.

知乎回答 (欢迎点赞哦):

预处理提速

  • 尽量减少每次读取数据时的预处理操作, 可以考虑把一些固定的操作, 例如 resize , 事先处理好保存下来, 训练的时候直接拿来用。
  • 将预处理搬到 GPU 上加速。
    • Linux 可以使用 NVIDIA/DALI
    • 使用基于 Tensor 的图像处理操作。

IO 提速

使用更快的图片处理

  • opencv 一般要比 PIL 要快 。
    • 请注意,PIL 的惰性加载的策略使得其看上去 open 要比 opencv 的 imread 要快,但是实际上那并没有完全加载数据。可以对 open 返回的对象调用其 load() 方法,从而手动加载数据,这时的速度才是合理的。
  • 对于 jpeg 读取, 可以尝试 jpeg4py
  • 存 bmp 图 (降低解码时间)。
  • 关于不同图像处理库速度的讨论:Python 的各种 imread 函数在实现方式和读取速度上有何区别? – 知乎

整合数据为单个连续文件 (降低读取次数)

对于大规模的小文件读取,可以保存为一个可以连续读取的连续文件格式。可以选择考虑 TFRecord (Tensorflow) , recordIOhdf5pthn5lmdb

预读取数据

预读取下一次迭代需要的数据。使用案例:

借助内存

  • 直接载到内存里面。
    • 将图片读取后存到一个固定的容器对象中。
  • 把内存映射成磁盘。

借助固态

机械硬盘换成 NVME 固态。参考自 如何给你 PyTorch 里的 Dataloader 打鸡血 – MKFMIKU 的文章 – 知乎

训练策略

低精度训练

在训练中使用低精度 ( FP16 甚至 INT8 、二值网络、三值网络) 表示取代原有精度 ( FP32 ) 表示。

可以节约一定的显存并提速, 但是要小心一些不安全的操作如 mean 和 sum。

更大的 batch

更大的 batch 在固定的 epoch 的情况下往往会带来更短的训练时间。但是大的 batch 面临着超参数的设置、显存占用问题等诸多考量,这又是另一个备受关注的领域了。

代码层面

库设置

  • 在训练循环之前设置 torch.backends.cudnn.benchmark = True 可以加速计算。由于计算不同内核大小卷积的 cuDNN 算法的性能不同,自动调优器可以运行一个基准来找到最佳算法。当你的输入大小不经常改变时,建议开启这个设置。如果输入大小经常改变,那么自动调优器就需要太频繁地进行基准测试,这可能会损害性能。它可以将向前和向后传播速度提高 1.27x 到 1.70x。
  • 使用页面锁定内存,即在 DataLoader 中设定 pin_memory=True
  • 合适的 num_worker,细节讨论可见 Pytorch 提速指南 – 云梦的文章 – 知乎
  • optimizer.zero_grad(set_to_none=False 这里可以通过设置 set_to_none=True 来降低的内存占用,并且可以适度提高性能。但是这也会改变某些行为,具体可见文档。通过 model.zero_grad() 或 optimizer.zero_grad() 将对所有参数执行 memset,并通过读写操作更新梯度。但是,将梯度设置为 None 将不会执行 memset,并且将使用“只写”操作更新梯度。因此,设置梯度为 None 更快。
  • 反向传播期间设定使用 eval 模式并使用 torch.no_grad 关闭梯度计算。
  • 可以考虑使用 channels_last 的内存格式。
  • DistributedDataParallel代替DataParallel。对于多 GPU 来说,即使只有单个节点,也总是优先使用 DistributedDataParallel 而不是 DataParallel ,因为 DistributedDataParallel 应用于多进程,并为每个 GPU 创建一个进程,从而绕过 Python 全局解释器锁 (GIL) 并提高速度。

模型

  • 不要初始化任何用不到的变量,因为 PyTorch 的初始化和 forward 是分开的,他不会因为你不去使用,而不去初始化。
  • @torch.jit.script,使用 PyTroch JIT 将逐点运算融合到单个 CUDA kernel 上。PyTorch 优化了维度很大的张量的运算操作。在 PyTorch 中对小张量进行太多的运算操作是非常低效的。所以有可能的话,将计算操作都重写为批次(batch)的形式,可以减少消耗和提高性能。而如果没办法自己手动实现批次的运算操作,那么可以采用 TorchScript 来提升代码的性能。TorchScript 是一个 Python 函数的子集,但经过了 PyTorch 的验证,PyTorch 可以通过其 just in time(jtt) 编译器来自动优化 TorchScript 代码,提高性能。但更好的做法还是手动实现批次的运算操作。
  • 在使用混合精度的 FP16 时,对于所有不同架构设计,设置尺寸为 8 的倍数。
  • BN 之前的卷积层可以去掉 bias。因为在数学上,bias 可以通过 BN 的均值减法来抵消。我们可以节省模型参数、运行时的内存

数据

  • 将 batch size 设置为 8 的倍数,最大化 GPU 内存的使用。
  • GPU 上尽可能执行 NumPy 风格的操作。
  • 使用 del 释放内存占用。
  • 避免不同设备之间不必要的数据传输。
  • 创建张量的时候,直接指定设备,而不要创建后再传输到目标设备上。
  • 使用 torch.from_numpy(ndarray) 或者 torch.as_tensor(data, dtype=None, device=None),这可以通过共享内存而避免重新申请空间,具体使用细节和注意事项可参考对应文档。如果源设备和目标设备都是 CPU,torch.from_numpy 和 torch.as_tensor 不会拷贝数据。如果源数据是 NumPy 数组,使用 torch.from_numpy 更快。如果源数据是一个具有相同数据类型和设备类型的张量,那么 torch.as_tensor 可以避免拷贝数据,这里的数据可以是 Python 的 list, tuple,或者张量。
  • 使用非阻塞传输,即设定 non_blocking=True。这会在可能的情况下尝试异步转换,例如,将页面锁定内存中的 CPU 张量转换为 CUDA 张量。

对优化器的优化

模型设计

CNN

  • ShuffleNetV2,论文
    • 卷积层输入输出通道一致: 卷积层的输入和输出特征通道数相等时 MAC(内存访问消耗时间, memory access cost 缩写为 MAC ) 最小, 此时模型速度最快
    • 减少卷积分组: 过多的 group 操作会增大 MAC, 从而使模型速度变慢
    • 减少模型分支: 模型中的分支数量越少, 模型速度越快
    • 减少 element-wise 操作: element-wise 操作所带来的时间消耗远比在 FLOPs 上的体现的数值要多, 因此要尽可能减少 element-wise 操作。 depthwise convolution 也具有低 FLOPs 、高 MAC 的特点。

Vision Transformer

  • TRT-ViT: TensorRT-oriented Vision Transformer,论文解读
    • stage-level:Transformer block 适合放置到模型的后期,这可以最大化效率和性能的权衡。
    • stage-level:先浅后深的 stage 设计模式可以提升性能。
    • block-level:Transformer 和 BottleNeck 的混合 block 要比单独的 Transformer 更有效。
    • block-level:先全局再局部的 block 设计模式有助于弥补性能问题。

通用思路

  • 降低复杂度: 例如模型裁剪和剪枝, 减少模型层数和参数规模
  • 改模型结构: 例如模型蒸馏, 通过知识蒸馏方法来获取小模型

推理加速

半精度与权重量化

在推理中使用低精度 ( FP16 甚至 INT8 、二值网络、三值网络) 表示取代原有精度 ( FP32 ) 表示。

  • TensorRT 是 NVIDIA 提出的神经网络推理 (Inference) 引擎, 支持训练后 8BIT 量化, 它使用基于交叉熵的模型量化算法, 通过最小化两个分布的差异程度来实现
  • Pytorch1.3 开始已经支持量化功能, 基于 QNNPACK 实现, 支持训练后量化, 动态量化和量化感知训练等技术
  • 另外 Distiller 是 Intel 基于 Pytorch 开源的模型优化工具, 自然也支持 Pytorch 中的量化技术
  • 微软的 NNI 集成了多种量化感知的训练算法, 并支持 PyTorch/TensorFlow/MXNet/Caffe2 等多个开源框架

更多细节可参考 有三 AI:【杂谈】当前模型量化有哪些可用的开源工具?

操作融合

重参数化(Re-Parameterization)

时间分析

  • Python 自带了几个性能分析的模块 profile , cProfile 和 hotshot , 使用方法基本都差不多, 无非模块是纯 Python 还是用 C 写的。
  • PyTorch Profiler 是一种工具,可在训练和推理过程中收集性能指标。Profiler 的上下文管理器 API 可用于更好地了解哪种模型算子成本最高,检查其输入形状和堆栈记录,研究设备内核活动并可视化执行记录。

项目推荐

  • 基于 Pytorch 实现模型压缩:
    • 量化:8/4/2 bits(dorefa)、三值/二值 (twn/bnn/xnor-net)。
    • 剪枝: 正常、规整、针对分组卷积结构的通道剪枝。
    • 分组卷积结构。
    • 针对特征二值量化的 BN 融合。

扩展阅读

PyTorch 节省显存

原始文档:https://www.yuque.com/lart/ugkv9f/nvffyf

整理自: Pytorch 有什么节省内存 (显存) 的小技巧? – 知乎 https://www.zhihu.com/question/274635237

使用 In-Place 操作

  • 对于默认支持 inplace 的操作尽量启用。比如 relu 可以使用 inplace=True 。
  • 可以将 batchnorm 和一些特定的激活函数打包成 inplace_abn

损失函数

每次循环结束时删除 loss, 可以节约很少显存, 但聊胜于无。可见 Tensor to Variable and memory freeing best practices

混合精度

可以节约一定的显存并提速, 但是要小心一些不安全的操作如 mean 和 sum。

管理不需要反向传播的操作

显存清理

  • torch.cuda.empty_cache() 这是 del 的进阶版, 使用 nvidia-smi 会发现显存有明显的变化. 但是训练时最大的显存占用似乎没变. 大家可以试试: How can we release GPU memory cache?
  • 可以使用 del 删除不必要的中间变量, 或者使用 replacing variables 的形式来减少占用.

梯度累加(Gradient Accumulation)

把一个 batchsize=64 分为两个 32 的 batch,两次 forward 以后,backward 一次。但会影响 batchnorm 等和 batchsize 相关的层。

在 PyTorch 的文档 中提到了梯度累加与混合精度并用的例子。

使用梯度累加技术可以对分布式训练加速,这可以参考:[原创][深度][PyTorch] DDP 系列第三篇:实战与技巧 – 996 黄金一代的文章 – 知乎

梯度检查点(Gradient Checkpointing)

PyTorch 中提供了 torch.utils.checkpoint。这是通过在反向传播期间,在每个检查点位置重新执行一次前向传播来实现的。

论文 Training Deep Nets with Sublinear Memory Cost 基于梯度检查点技术,将显存从 O(N) 降到了 O(sqrt(N))。对于越深的模型, 这个方法省的显存就越多, 且速度不会明显变慢。

相关工具

参考资料

其他技巧

重现

可关注文档中 相关章节

强制确定性操作

避免使用非确定性算法

PyTorch 中,torch.use_deterministic_algorithms() 可以强制使用确定性算法而不是非确定性算法,并且如果已知操作是非确定性的(并且没有确定性的替代方案),则会抛出错误。

设置随机数种子

def seed_torch(seed=1029):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) # if you are using multi-GPU.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

seed_torch()

参考自https://www.zdaiot.com/MLFrameworks/Pytorch/Pytorch%E9%9A%8F%E6%9C%BA%E7%A7%8D%E5%AD%90/

PyTorch 1.9 版本前 DataLoader 中的隐藏 BUG

具体细节可见 可能 95%的人还在犯的 PyTorch 错误 – serendipity 的文章 – 知乎

解决方法可参考 文档

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    numpy.random.seed(worker_seed)
    random.seed(worker_seed)

DataLoader(..., worker_init_fn=seed_worker)

DDP分布式训练–数据加载和训练NCCL

深度学习的发展证明了大数据和大模型的价值。无论是在CV还是NLP领域,在大规模的计算资源上训练模型的能力变得日益重要。GPU以比CPU更快的矩阵乘法和加法运算,加速了模型训练。但随着数据量和模型参数的增长,单块GPU很快变得不够用。因此我们必须找到合适的方法,实现数据和模型在多个GPU甚至多个计算节点间的划分和复制,从而实现更短的训练周期和更大的模型参数量。

DDP大致的流程如下:

  1. 初始化进程组。
  2. 创建分布式并行模型,每个进程都会有相同的模型和参数。
  3. 创建数据分发Sampler,使每个进程加载一个mini batch中不同部分的数据。
  4. 网络中相邻参数分桶,一般为神经网络模型中需要进行参数更新的每一层网络。
  5. 每个进程前向传播并各自计算梯度。
  6. 模型某一层的参数得到梯度后会马上进行通讯并进行梯度平均。
  7. 各GPU更新模型参数。

今天主要来研究 3创建数据分发和Sampler :主要由三部分组成:torch.utils.data.Dataset【可以自定义】、torch.utils.data.DataLoader、以及torch.utils.data.distributed.DistributedSampler【可以自己定义】。

DistributedSampler 确保每个进程(或 GPU)处理数据集的不同部分。DataLoader 使用 DistributedSampler 生成的数据索引来分批数据,并进行数据加载和预处理。

1、 Dataset :

Dataset 是一个抽象类,用于表示数据集。你需要继承这个类并实现其方法,以定义你自己的数据集。它的主要功能包括:

  • 定义数据访问:通过实现 __getitem__ 方法,定义如何访问数据集中单个数据项。
  • 数据集大小:通过实现 __len__ 方法,返回数据集中样本的总数。
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index]

2、DataLoader:

DataLoader 是一个数据加载器,它负责从 Dataset 中批量加载数据。它提供了对数据的批量处理、随机打乱、并行加载等功能。DataLoader 主要功能包括:

  • 批量加载:将数据集分成多个批次,并在每次迭代中返回一个批次的数据。
  • 并行处理:使用多个工作线程(num_workers)来并行加载数据,提高数据加载速度。
  • 数据打乱:通过 shuffle 参数来随机打乱数据顺序。
  • 自动处理样本:使用 collate_fn 将单个样本组合成批次。

1. 数据加载和预处理

DataLoader 负责从数据集(Dataset)中加载数据,并进行必要的预处理操作。预处理可能包括数据增强、归一化等。它通过多线程或多进程的方式并行加载数据,减少了数据加载时间。

  • num_workers:指定用于数据加载的子进程数,帮助加快数据加载速度。

2. 数据分批

DataLoader 将数据集划分为多个批次(batches),以便于模型进行训练和评估。批次的大小可以通过 batch_size 参数进行设置。

  • batch_size:每个批次的数据量,这对于训练过程中每次迭代的数据量非常重要。

3. 分布式训练中的数据划分

在 DDP 下,DataLoader 结合 Sampler 来确保数据在各个进程之间的正确分配。Sampler 控制每个进程(或 GPU)获得数据集的哪一部分。

  • DistributedSampler:当进行分布式训练时,DistributedSampler 确保每个进程处理不同的数据子集,从而实现负载均衡和避免数据重复。

4. 数据的打乱和顺序

为了提高模型的泛化能力,数据通常在每个 epoch 开始时被打乱。DataLoader 提供了打乱数据的功能,这对于训练过程是非常重要的。

  • shuffle:指定是否在每个 epoch 开始时打乱数据,这有助于减少模型对数据顺序的过拟合。

5. 批次丢弃

在训练过程中,如果最后一个批次的样本数不足以构成完整的批次,可以选择丢弃这个批次,以保证每个批次的大小一致。

  • drop_last:指定是否丢弃最后一个批次(如果其大小小于 batch_size)。

6. Sampler 结合使用

DataLoader 可以与不同的 Sampler 结合使用,以支持各种数据加载策略。在 DDP 下,DistributedSampler 是常用的 Sampler,它将数据集划分为多个子集,每个进程处理一个子集。

  • batch_sampler:如果使用自定义的 Sampler,可以将其传递给 batch_sampler 参数来控制数据的分批方式。

data = [1, 2, 3, 4, 5]
dataset = MyDataset(data)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=2, shuffle=True, num_workers=2)

for batch in dataloader:
print(batch)

3、DistributedSampler:

DistributedSampler 用于在分布式训练中对数据进行采样。它的主要作用是确保每个进程(或 GPU)在分布式训练中获得数据的不同子集,从而避免数据重复和确保数据均匀分配。主要功能包括:

  • 分布式数据分配:根据进程的 rank 和总进程数,计算出每个进程应该处理的数据子集。
  • 随机打乱:支持在每个 epoch 重新打乱数据,以增加训练的随机性。
  • 同步:在多个进程之间协调数据的采样。

1. 数据分配

在分布式训练中,数据集被划分成多个子集,每个进程(或 GPU)处理数据集的一部分。Sampler 确保每个进程(或 GPU)得到不同的数据子集,以避免重复和数据丢失。

  • DistributedSampler:这是 PyTorch 提供的专门用于分布式训练的采样器。它根据当前进程的 rank 和总进程数 num_replicas 来划分数据集。每个进程获得数据集的不同部分,从而实现数据的有效分配和负载均衡。

2. 确保数据覆盖

在每个 epoch 中,每个进程需要获取数据集的不同部分,以确保整个数据集被覆盖。Sampler 可以帮助实现这种数据分配策略,避免数据遗漏和冗余。

  • 随机打乱DistributedSampler 还支持在每个 epoch 开始时打乱数据集,这对于训练模型具有更好的泛化能力是非常重要的。

3. 避免数据重复

如果不使用合适的 Sampler,多个进程可能会处理相同的数据,从而导致数据重复。这不仅浪费计算资源,还可能影响模型的训练效果。

  • 去重DistributedSampler 确保每个进程仅处理数据集的一部分,从而避免数据重复。

4. 适应批量大小

在分布式训练中,数据的分配和批处理需要适应分布式环境中的批量大小。Sampler 负责将数据分成适合训练的批次,并确保每个进程处理的数据量与其他进程一致。

  • BatchSamplerBatchSampler 将由 Sampler 生成的索引列表分成批次,以便用于训练。它与 DistributedSampler 结合使用时,可以确保每个进程处理的数据批次符合预期的批量大小。

5. 支持多样本处理策略

不同的任务和模型可能需要不同的数据处理策略,如排序、动态采样等。通过自定义 Sampler,可以实现特定的采样策略以满足任务需求。

  • 自定义采样器:可以实现自定义的 Sampler 类,来满足特定的需求,如按样本长度排序、动态调整批次大小等。
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=4, rank=0)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=2, sampler=sampler)

动手实现一个采样器:

CustomDistributedBufferDynamicBatchSampler 是一个用于分布式训练的自定义数据采样器,它结合了动态批量大小和缓冲区的排序策略。它的目的是通过更复杂的策略来生成批量,以适应各种训练需求。下面是对这个采样器的详细解释:

__iter__ 方法生成数据批次,考虑到动态批量大小和缓冲区的排序:

数据打乱:如果 shuffle 为 True,数据将被打乱。缓冲区排序:数据被分成多个缓冲区,每个缓冲区的大小由 sort_size 控制,并按样本长度进行排序。批量生成:根据 batch_sizebatch_size_sample_max 生成批量。如果当前缓冲区中的数据无法满足批次大小,则将现有数据作为一个批次。数据重复和分配:确保每个进程获得相同数量的批次。如果总批次不足以均分,重复一些批次以满足每个进程的需求。

dataset: 数据集实例。batch_size: 批次大小。batch_type: 批次的类型(例如按 token 或样本)。num_replicas: 总的进程数。rank: 当前进程的 rank。rank_split: 是否分割 rank。shuffle: 是否打乱数据。drop_last: 是否丢弃最后一个批次。is_training: 是否处于训练模式。sort_size: 缓冲区的大小,用于排序数据。start_step: 起始步数(用于从特定步数开始训练)。

def __init__(
    self,
    dataset,
    batch_size,
    batch_type="token",
    num_replicas=None,
    rank=None,
    rank_split=False,
    shuffle=True,
    drop_last=False,
    is_training: bool = True,
    sort_size: int = 1024,
    start_step: int = 0,
    **kwargs,
):
    try:
        rank = dist.get_rank()
        num_replicas = dist.get_world_size()
    except:
        rank = 0
        num_replicas = 1

    self.rank = rank
    self.num_replicas = num_replicas
    self.dataset = dataset
    self.batch_size = batch_size
    self.batch_type = batch_type
    self.is_training = is_training
    self.shuffle = shuffle and is_training
    self.drop_last = drop_last

    self.total_size = len(self.dataset)
    self.num_samples = int(math.ceil(self.total_size / self.num_replicas))
    self.epoch = 0
    self.sort_size = sort_size * num_replicas
    self.max_token_length = kwargs.get("max_token_length", 2048)
    self.length_scale_source = kwargs.get("length_scale_source", 1.0)
    self.batch_size_sample_max = kwargs.get("batch_size_sample_max", 200)
    self.start_step = start_step
    self.batch_num = 1
    if self.start_step > 0:
        logging.info(f"Warning, start_step > 0, dataloader start from step: {self.start_step}")
def __iter__(self):
    if self.shuffle:
        g = torch.Generator()
        g.manual_seed(self.epoch)
        random.seed(self.epoch)
        indices = torch.randperm(len(self.dataset), generator=g).tolist()
    else:
        indices = list(range(len(self.dataset)))

    # Create sorted buffers and form batches
    buffer_batches = []
    for i in range(0, len(indices), self.sort_size):
        buffer = sorted(
            indices[i : i + self.sort_size], key=lambda idx: self.dataset.get_source_len(idx)
        )
        batch = []
        max_len_in_batch = 0
        count = 1
        for idx in buffer:
            original_sample_length = self.dataset.get_source_len(idx)
            if original_sample_length > self.max_token_length:
                continue
            sample_length = 1 if self.batch_type == "example" else original_sample_length
            potential_batch_length = max(max_len_in_batch, sample_length) * (len(batch) + 1)
            if potential_batch_length <= self.batch_size and count < self.batch_size_sample_max:
                batch.append(idx)
                max_len_in_batch = max(max_len_in_batch, sample_length)
                count += 1
            else:
                buffer_batches.append(batch)
                batch = [idx]
                max_len_in_batch = sample_length
                count = 1
        if batch:
            buffer_batches.append(batch)

    # Ensure each rank gets the same number of batches, duplicate data if needed
    batches_per_rank = math.ceil(len(buffer_batches) / self.num_replicas)
    total_batches_needed = batches_per_rank * self.num_replicas
    extra_batches = total_batches_needed - len(buffer_batches)
    buffer_batches += random.choices(buffer_batches, k=extra_batches)

    # Evenly distribute batches from buffer_batches to each rank
    rank_batches = [[] for _ in range(self.num_replicas)]
    for i, batch in enumerate(buffer_batches):
        rank_batches[i % self.num_replicas].append(batch)

    # Assign all batches for the current rank directly
    final_batches = rank_batches[self.rank][self.start_step :]
    self.batch_num = len(final_batches)

    logging.info(
        f"rank: {self.rank}, dataloader start from step: {self.start_step}, batch_num: {len(rank_batches[self.rank])}, after: {self.batch_num}"
    )
    return iter(final_batches)

CustomDistributedBufferDynamicBatchSampler 通过以下方式增强了数据采样:

  • 动态批量大小:根据数据的实际长度动态调整批量大小。
  • 缓冲区排序:使用排序缓冲区策略提高数据处理效率。
  • 数据均匀分配确保每个进程获得相同数量的批次,避免数据不均衡。

这些特性使得 CustomDistributedBufferDynamicBatchSampler 能够更好地处理大规模数据集,并在分布式训练中提供高效的数据加载和批次生成策略。

数据均匀分配至关重要:如果分配不均,会导致某个节点的GPU显存爆炸,导致短筒效应,所以需要对数据进行平均分配:

分布式训练的时候 如何定义自己的samper,如何保证不同的节点使用不同的数据训练?

根据rank数量将索引分成不同的rank份。 分割数据以确保每个进程获取不同的索引

        if self.num_replicas is not None and self.rank is not None:
            # 每个进程处理的数据索引范围
            num_samples = int(np.ceil(len(indices) / self.num_replicas))
            start = self.rank * num_samples
            end = min(start + num_samples, len(indices))
            indices = indices[start:end]

1. 定义自定义Sampler

自定义Sampler需要继承torch.utils.data.Sampler并实现__iter__方法,返回数据索引的迭代器。以下是一个简单的示例:

python复制代码import torch
import numpy as np

class CustomSampler(torch.utils.data.Sampler):
    def __init__(self, data_source, num_replicas=None, rank=None):
        self.data_source = data_source
        self.num_replicas = num_replicas
        self.rank = rank

    def __iter__(self):
        # 获取所有样本索引
        indices = np.arange(len(self.data_source))

        # 分割数据以确保每个进程获取不同的索引
        if self.num_replicas is not None and self.rank is not None:
            # 每个进程处理的数据索引范围
            num_samples = int(np.ceil(len(indices) / self.num_replicas))
            start = self.rank * num_samples
            end = min(start + num_samples, len(indices))
            indices = indices[start:end]

        # 打乱数据
        np.random.shuffle(indices)
        return iter(indices)

    def __len__(self):
        if self.num_replicas is not None and self.rank is not None:
            num_samples = int(np.ceil(len(self.data_source) / self.num_replicas))
            return num_samples
        return len(self.data_source)

2. 初始化分布式环境

在训练脚本中,初始化分布式环境并创建自定义采样器。

python复制代码import torch
import torch.distributed as dist

dist.init_process_group(backend='nccl')  # 或 'gloo'
local_rank = dist.get_rank()
world_size = dist.get_world_size()

# 数据集
from torchvision import datasets, transforms
transform = transforms.Compose([transforms.ToTensor()])
dataset = datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)

# 创建自定义采样器
sampler = CustomSampler(dataset, num_replicas=world_size, rank=local_rank)

# 创建数据加载器
dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, sampler=sampler)

3. 在训练时设置采样器的epoch

如果你的自定义Sampler需要在每个epoch中更改数据顺序,可以在每个epoch开始时调用sampler.set_epoch(epoch)

python复制代码for epoch in range(num_epochs):
    sampler.set_epoch(epoch)  # 如果你的自定义Sampler支持这个方法
    for batch in dataloader:
        # 训练代码

这样,你就可以定义一个适合你需求的自定义Sampler,并在分布式训练中使用它。

DDP分布式训练时候 batchsize设置是指单卡还多卡所有的总batch?

在分布式数据并行(DDP)训练中,batch_size的设置是指每个单卡(即每个GPU)的batch size。总的batch size是每个单卡的batch size乘以GPU的数量。【在samper采样的时候,根据rank数量,将index 分割成 rank份,每一份里面进行batchsize的采样,所以bs指的是单个GPU的bs】

例如,如果你有4个GPU,并且每个GPU的batch size设置为32,那么总的batch size就是32 * 4 = 128。每个GPU在每次训练迭代中处理32个样本,所有4个GPU在每次训练迭代中处理总共128个样本。

如果你使用的是分布式数据并行的训练策略,确保将batch_size设置为每个GPU上希望的大小,而不是总的batch size。

datalaoder中设置的 number_work在DDP训练中如何工作的?

首先明确一点: num_works指的是单个GPU的num_works数据加载进程数量。

  • **num_workers**参数定义了并行数据加载的进程数量。每个进程独立地从数据集中读取和预处理数据。
  • **collate_fn**可以自定义如何将数据项组合成batch。
  • 数据加载进程将预处理后的数据批次传递给主进程,主进程将这些批次数据送入模型进行训练。

使用多个数据加载进程可以提高数据预处理的速度,减少GPU在训练时的等待时间,从而加快整体训练过程。

num_workers的作用

  • 数据加载: num_workers决定了用于加载数据的子进程的数量。更多的工作进程可以并行地读取和预处理数据,从而加快数据加载速度,减少GPU的等待时间。
  • 性能影响: 增加num_workers的数量通常可以提高数据加载速度,但也会增加系统的内存使用。合理设置num_workers的值可以在数据加载效率和系统资源使用之间找到平衡。

在DDP训练中的考虑

  1. 每个进程的num_workers: 每个分布式进程(即每个GPU)都有自己的数据加载子进程。这意味着总的num_workers会是每个GPU上num_workers的值乘以GPU的数量(分布式进程数)。例如,如果有4个GPU,并且每个GPU的num_workers设置为4,那么总的工作进程数将是4 * 4 = 16。
  2. 避免数据重叠: 在分布式训练中,需要确保每个进程处理的数据子集是不同的。使用DistributedSampler可以确保数据在各个进程间均匀分配,从而避免数据重复和丢失。
  3. 同步和通信开销: 增加num_workers的数量可能会增加进程间的同步和通信开销,特别是在多GPU的情况下。需要根据具体的硬件配置和数据集大小来调整num_workers
  4. 内存和CPU资源: 每增加一个工作进程,都会消耗额外的CPU资源和内存。确保你的系统有足够的资源来支持设置的num_workers值。

单个numberwork子进程单独负责一个batch的数据吗,然后多个进程负责加载多个不同batch数据?

在PyTorch中,DataLoader中的子进程(由num_workers参数指定)并不一定是每个进程负责一个完整的batch的数据。实际操作中,多个子进程负责并行地预处理数据并将其送入主进程。下面是详细的解释:

数据加载进程的工作方式

  1. 子进程的任务: 每个数据加载进程从数据集中提取样本,并执行预处理任务。子进程会从数据集中读取单个样本(或多个样本),进行必要的转换和预处理,然后将这些处理后的样本返回给主进程。
  2. 批次的生成: DataLoader在主进程中处理批次的生成。主进程负责将从各个子进程中接收到的样本聚合成一个完整的batch。具体来说,当主进程需要一个新的batch时,它会从子进程中获取样本,然后使用collate_fn将这些样本组合成一个batch。
  3. 数据并行处理: 如果设置了num_workers > 1DataLoader会启动多个子进程来并行地加载数据。这些子进程并不独立处理完整的batch,而是并行地从数据集中提取和预处理样本。主进程会从这些子进程中收集样本,并在主进程中将样本组成batch。

数据加载示例

假设你设置了num_workers=4,这意味着会有4个子进程并行地处理数据。数据加载的过程大致如下:

  1. 子进程读取数据:
    • 每个子进程从数据集中读取样本。
    • 子进程对样本进行预处理(如数据增强、标准化等)。
  2. 主进程收集数据:
    • 主进程从子进程中收集处理后的样本。
    • 主进程使用collate_fn将样本组合成一个batch。
  3. 生成批次:
    • 主进程将生成的batch传递给训练循环。

为什么numberwork设置大了会增加CPU内存?

设置较大的 num_workers 值会增加 CPU 内存使用的原因有几个方面:

1. 进程数量和内存占用

  • 每个子进程的内存消耗: 每个数据加载子进程(由 num_workers 定义)都会独立地运行,并加载一部分数据集。每个子进程会使用自己的内存来存储数据和进行预处理操作。
  • 内存需求: 如果 num_workers 设置得很高,系统将会启动多个子进程,这些进程会同时存在并占用内存。每个进程都需要一定的内存来存储数据和运行预处理代码,从而导致总的内存使用增加。

2. 数据预处理和缓存

  • 数据缓冲: DataLoader 使用子进程来并行加载和预处理数据。在预处理过程中,子进程可能会创建和维护缓存,这些缓存可能会消耗额外的内存。
  • 数据加载: 进程在数据加载过程中可能会在内存中保持一定量的数据,以提高数据处理效率。这种内存的占用也会随着 num_workers 的增加而增加。

3. 并发处理

  • 并发开销: 启动大量的子进程进行数据处理会增加系统的并发开销。操作系统需要为每个进程分配内存和管理资源,这会导致系统整体的内存使用增加。
  • 进程间通信: 多个子进程之间可能会有数据交换和同步操作,这些操作也可能增加内存开销。

大模型训练中的数据加载和NCCL通信问题

A、训练大模型时候,有两亿的数据,数据索引保存到了jsonl文件中,在torch dataloader 加载数据jsonl文件时候爆内存,如何解决

1. 使用分块加载(Chunk Loading)【法1】

将数据分块处理,而不是一次性加载所有数据。可以在Dataset类中实现这一点。示例代码如下:

import json
import torch
from torch.utils.data import Dataset, DataLoader

class LargeJSONLDataset(Dataset):
    def __init__(self, jsonl_file, chunk_size=1000):
        self.jsonl_file = jsonl_file
        self.chunk_size = chunk_size
        self.data = []
        self._load_chunk(0)

    def _load_chunk(self, chunk_index):
        start_line = chunk_index * self.chunk_size
        end_line = start_line + self.chunk_size
        self.data = []
        with open(self.jsonl_file, 'r') as f:
            for i, line in enumerate(f):
                if start_line <= i < end_line:
                    self.data.append(json.loads(line))
                if i >= end_line:
                    break

    def __len__(self):
        with open(self.jsonl_file, 'r') as f:
            return sum(1 for _ in f)

    def __getitem__(self, idx):
        chunk_index = idx // self.chunk_size
        self._load_chunk(chunk_index)
        local_idx = idx % self.chunk_size
        return self.data[local_idx]

# 创建 Dataset 和 DataLoader
dataset = LargeJSONLDataset('data.jsonl')
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

实现的逻辑:采样器sampler 获取 index = len(self.dataset),然后进行index随机抽样,将抽到的id送给dataloader加载器,dataloader根据这些id,去dataset类里面执行getitem。 dataset 不在需要加载所有的jsonl文件,只需要根据id//self.chunk_size判断数据在第几个chunk,然后对应需要加载目标chunk的数据即可,然后在id% self.chunk_size 得到在该chunk的真实id,读取。这样做缺点是每次都需要重新laod jsonl文件,加载时间变慢。

2. 使用内存映射

内存映射可以帮助将大文件映射到内存中而不是完全加载。jsonl格式通常不支持直接内存映射,但可以使用分块处理与内存映射结合的方法。

内存映射是一种将磁盘上的文件映射到内存中的方法。通过使用内存映射,我们可以在不将整个文件加载到内存中的情况下访问文件的内容。这对于处理大型数据集非常有用,因为它可以节省内存空间,并且可以快速访问文件的任意部分。

内存映射:将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read、write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。

使用内存映射有以下几个优点:

  1. 节省内存空间:通过内存映射,我们可以在不将整个文件加载到内存中的情况下访问文件的内容。这对于处理大型数据集非常有用,因为它可以节省大量的内存空间。
  2. 快速访问文件的任意部分:由于内存映射将文件映射到内存中,我们可以快速访问文件的任意部分,而不需要读取整个文件。这对于随机访问大型文件非常有用。
  3. 支持并发访问:多个进程可以同时访问内存映射文件,而不会发生冲突。这使得内存映射非常适合多进程的数据处理任务。

https://github.com/DACUS1995/pytorch-mmap-dataset

3. 优化数据存储格式

考虑将数据存储为其他格式,如HDF5或Parquet,这些格式支持更高效的分块读写和压缩。例如,可以使用pandas将JSONL文件转换为Parquet格式,然后使用pandas读取它们。

4. 使用数据流处理

使用生成器逐行读取数据,而不是将整个文件加载到内存中:

def data_generator(file_path):
    with open(file_path, 'r') as f:
        for line in f:
            yield json.loads(line)

# 在 DataLoader 中使用生成器
def collate_fn(batch):
    # 自定义你的批处理操作
    return batch

dataset = data_generator('data.jsonl')
dataloader = DataLoader(dataset, batch_size=32, collate_fn=collate_fn)

5. 多进程数据加载

使用torch.utils.data.DataLoadernum_workers参数来并行加载数据:

dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)

6. 数据预处理

在数据加载之前进行预处理,将数据处理成更紧凑的格式或者将其划分为多个较小的文件进行分段加载。这样可以减少每次加载的数据量。

7. 使用分块加载【法2】

方法1 每次读取单个数据,都需要重新读取一边jsonl文件,大大增加了数据加载的时间,为了尽量不影响数据加载时间,我们考虑牺牲一部分随机性来提高速度。

具体方法为:jsonl数据被分成N份,在训练1轮中,数据datalaoder先加载第一份的jsonl数据,然后part1数据加载训练结束后,继续加载part2的jsonl数据…..直到所有的jsonl数据加载完成,训练1轮结束。这样做的好处是每次batch不需要重新读取jsonl,但缺点就是不同part的jsonl之间数据不互通,数据的随机性降低,具体代码实现参考:FunASR

1:在训练1个epoch时候:传递data_split_num【数据分成几份】 data_split_i 【当前第几分】

2、datalaoder的 build_iter代码实现:本质上就是 重新执行 torch.utils.data.Dataset【可以自定义】、torch.utils.data.DataLoader、以及torch.utils.data.distributed.DistributedSampler【可以自己定义】 ,需要向 Dataset 传递 data_split_i 参数;

    def build_iter(self, epoch=0, data_split_i=0, start_step=0, **kwargs):

        # reload dataset slice
        if self.data_split_num > 1:
            del self.dataset_tr
            self.dataset_tr = self.dataset_class(
                self.kwargs.get("train_data_set_list"),
                frontend=self.frontend,
                tokenizer=self.tokenizer,
                is_training=True,
                **self.kwargs.get("dataset_conf"),
                data_split_i=data_split_i,
            )

        # dataloader
        batch_sampler = self.kwargs["dataset_conf"].get("batch_sampler", "BatchSampler")
        batch_sampler_val = None
        if batch_sampler is not None:
            batch_sampler_class = tables.batch_sampler_classes.get(batch_sampler)
            batch_sampler = batch_sampler_class(
                self.dataset_tr, start_step=start_step, **self.kwargs.get("dataset_conf")
            )
            batch_sampler_val = batch_sampler_class(
                self.dataset_val, is_training=False, **self.kwargs.get("dataset_conf")
            )

        batch_sampler["batch_sampler"].set_epoch(epoch)
        batch_sampler_val["batch_sampler"].set_epoch(epoch)
        dataloader_tr = torch.utils.data.DataLoader(
            self.dataset_tr, collate_fn=self.dataset_tr.collator, **batch_sampler
        )
        dataloader_val = torch.utils.data.DataLoader(
            self.dataset_val, collate_fn=self.dataset_val.collator, **batch_sampler_val
        )

        return dataloader_tr, dataloader_val

3、 Dataset 的具体实现:

可以看出,AudioDataset里面实际上利用的index_ds来具体读取jsonl文件内容的。

4、index_ds的实现:只返回部分jsonl数据,虽然函数里面加载了整个文件,但函数结束file_list_all解释放掉了,最后只有file_list一直在占用内存。

8、pytorch pin_memory 设置为Fasle【牺牲时间换空间】

在PyTorch中,何时使用pin_memory?【CPU内存不足,建议关闭该功能】 当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。

pin_memory就是锁页内存,创建DataLoader时,设置pin_memory=True,则意味着生成的Tensor数据最开始是属于内存中的锁页内存,这样将内存的Tensor转义到GPU的显存就会更快一些。pin_memory=False表示将load进数据放至非锁页内存区,速度会较慢。

当计算机的内存充足的时候,设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。

主机中的内存,有两种存在方式: 一是锁页,二是不锁页,

锁页内存存放的内容在任何情况下都不会与主机的虚拟内存进行交换(注:虚拟内存就是硬盘),而不锁页内存在主机内存不足时,数据会存放在虚拟内存中。显卡中的显存全部是锁页内存,当计算机的内存充足的时候,可以设置pin_memory=True。

在使用PyTorch进行数据加载时,pin_memory是一个可选的,它通常用于将数据存储在主机内存(RAM)中的固定内存页(pinned memory)上,以便更高效地将数据传输到GPU内存。

主要作用如下:

  1. 提高数据传输效率:当使用GPU进行训练时,通常需要将数据从主机内存传输到GPU内存。使用pin_memory可以将数据存储在固定内存页中,减少数据传输的时间和开销,提高数据传输的效率。
  2. 减少数据传输延迟:主机内存和GPU内存之间的数据传输通常涉及内存拷贝操作,而内存拷贝是一项相对较慢的操作。pin_memory可以在数据加载时将数据直接存放在固定内存页中,避免不必要的内存拷贝过程,从而减少数据传输的延迟。

需要注意的是,使用pin_memory会占用额外的主机内存,并且只在使用CUDA设备的情况下才有效果。

锁页内存和GPU显存之间的拷贝速度大约是6GB/s
可分页内存和GPU显存间的拷贝速度大约是3GB/s。
GPU内存间速度是30GB/s,CPU间内存速度是10GB/s

通常我们的主机处理器是支持虚拟内存系统的,也就是使用硬盘空间来代替内存。大多数系统中虚拟内存空间被划分成许多页,它们是寻址的单元,页的大小至少是4096个字节。虚拟寻址能使一个连续的虚拟地址空间映射到物理内存并不连续的一些页。

如果某页的物理内存被标记为换出状态,它就可以被更换到磁盘上,也就是说被踢出内存了。如果下次需要该页了,则重新加载到内存里。显然如果这一页切换的非常频繁,那么会浪费不少时间。

锁页(pinned page)是操作系统常用的操作,就是为了使硬件外设直接访问CPU内存,从而避免过多的复制操作。被锁定的页面会被操作系统标记为不可被换出的,所以设备驱动程序给这些外设编程时,可以使用页面的物理地址直接访问内存,CPU也可以访问上述锁页内存,但是此内存是不能移动或换页到磁盘上的。另外,在GPU上分配的内存默认都是锁页内存,这只是因为GPU不支持将内存交换到磁盘上。

Host(例如CPU)的数据分配默认是**pageable(可分页的)**,但是GPU是没法直接读取pageable内存里的数据的,所以需要先创建一个临时的缓冲区(pinned memory),把数据从pageable内存拷贝pinned内存上,然后GPU才能从pinned内存上读取数据,如上图(左)所示。

9、number_works降低参数值

从磁盘加载数据到 host 的page-locked内存. 采用多个 worker 进程并行地数据加载 ,会增加内存占用,因此为了降低内存占用,可以考虑number_work从低到高设置:2、4、8、16,知道训练速度达到最优。

每个进程的num_workers: 每个分布式进程(即每个GPU)都有自己的数据加载子进程。这意味着总的num_workers会是每个GPU上num_workers的值乘以GPU的数量(分布式进程数)。

例如,如果有4个GPU,并且每个GPU的num_workers设置为4,那么总的工作进程数将是4 * 4 = 16。

避免数据重叠: 在分布式训练中,需要确保每个进程处理的数据子集是不同的。使用DistributedSampler可以确保数据在各个进程间均匀分配,从而避免数据重复和丢失。

同步和通信开销: 增加num_workers的数量可能会增加进程间的同步和通信开销,特别是在多GPU的情况下。需要根据具体的硬件配置和数据集大小来调整num_workers

内存和CPU资源: 每增加一个工作进程,都会消耗额外的CPU资源和内存。确保你的系统有足够的资源来支持设置的num_workers值。

在给Dataloader设置worker数量(num_worker)时,到底设置多少合适?这个worker到底怎么工作的?

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

参数详解:

每次dataloader加载数据时:dataloader一次性创建num_worker个worker,(也可以说dataloader一次性创建num_worker个工作进程,worker也是普通的工作进程),并用batch_sampler将指定第几个batch分配给指定worker,worker将它负责的batch加载进RAM。

然后,dataloader从RAM中找本轮迭代要用的batch,如果找到了,就使用。如果没找到,就要num_worker个worker继续加载batch到内存,直到dataloader在RAM中找到目标batch。一般情况下都是能找到的,因为batch_sampler指定batch时当然优先指定本轮要用的batch。

num_worker设置得大,好处是寻batch速度快,因为下一轮迭代的batch很可能在上一轮/上上一轮…迭代时已经加载好了。坏处是内存开销大,也加重了CPU负担(worker加载数据到RAM的进程是CPU复制的嘛)。num_workers的经验设置值是自己电脑/服务器的CPU核心数,如果CPU很强、RAM也很充足,就可以设置得更大些。

如果num_worker设为0,意味着每一轮迭代时,dataloader不再有自主加载数据到RAM这一步骤(因为没有worker了),而是在RAM中找batch,找不到时再加载相应的batch。缺点当然是速度更慢。

  1. 根据硬件配置调整: 在多核 CPU 环境下,设置较高的 num_workers(如 4 到 16)可以有效利用多核资源,提高数据加载速度。具体的最佳值需要根据系统的 CPU 核心数和内存情况来调整。
  2. 数据加载瓶颈: 如果你发现训练时 GPU 经常处于等待数据的状态,这可能是因为数据加载成为了瓶颈。增加 num_workers 可以帮助缓解这一问题。
  3. 系统负载: 在某些情况下,设置过高的 num_workers 可能会导致系统负载过高,影响其他任务或整体系统性能。因此需要找到一个平衡点。
  4. 实验调整: 实际应用中,最好的做法是从较小的值开始(如 2 或 4),然后逐步增加,观察训练过程中的数据加载速度和系统资源使用情况,从而确定最佳设置。

DistributedDataParallel 消除了 DataParallel 中上述不足. 其不再需要主 GPU,每个 GPU 分别进行各自任务. 每个 GPU 上的训练是其独立进程,而在 DataParallel 中是采用多线程(multi-thread) 的.

DistributedDataParallel 的工作过程如,

[1] – 从磁盘加载数据到 host 的page-locked内存. 采用多个 worker 进程并行地数据加载;其中,distributed data sampler 确保了加载的数据在跨进程间是不重叠的.

[2] – 将 mini-batch 数据由 page-locked 内存转移到 GPU. 不需要任何数据广播. 因为每个 GPU 分别有模型副本,因此也不需要模型广播.

[3] – 分别在各 GPU 独立进行前向计算和损失函数计算. 因此,也不需要收集各 GPUs 的输出.

[4] – 后向梯度计算,梯度是跨GPUs all-reduced的. 确保在后向传播结束时,每个 GPU 最终得到相同的平均梯度的副本.

[5] – 更新模型参数. 由于每个 GPU 是由相同的模型副本开始的,且梯度是 all-reduced 的,因此所有 GPUs 上的权重更新是相同的,无需再进行模型同步.

以上即完成了一次迭代. 这种设计确保了模型参数的更新是相同的,因此消除了每次开始时的模型同步.

B 、NCCL通信超时问题

[PG 1 Rank 9] Timeout at NCCL work: 957, last enqueued NCCL work: 957, last completed NCCL work: 956.
[rank9]:[E ProcessGroupNCCL.cpp:577] [Rank 9] Some NCCL operations have failed or timed out. Due to the asynchronous nature of CUDA kernels, subsequent GPU operations might run on corrupted/incomplete data.
[rank9]:[E ProcessGroupNCCL.cpp:583] [Rank 9] To avoid data inconsistency, we are taking the entire process down.

这种报错需要具体情况具体分析

1、尝试增加NCCL 超时时间/设置过NCCL变量

如何设置:

1、查看变量:查看环境变量 NCCL_IB_TIMEOUT 的值

echo $NCCL_IB_TIMEOUT # 如果环境变量已设置,这个命令将显示其值;如果没有设置,则不会有任何输出。

printenv 命令可以显示所有环境变量的值,也可以查看特定的环境变量:

printenv NCCL_IB_TIMEOUT #如果环境变量未设置,该命令不会输出任何内容。

也可以使用 env 命令来列出所有环境变量,并查找 NCCL_IB_TIMEOUT

env | grep NCCL_IB_TIMEOUT

NCCL相关环境变量说明 【https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage.html】

  1. NCCL_TIMEOUT:设置集合操作超时阈值,单位毫秒;如果常见超时错误,适当增大该值,但不能太大NCCL_TIMEOUT 环境变量用于设置 NCCL 集体通信操作的超时时间。通过调整这个值,你可以更好地处理网络延迟和不稳定的问题,确保 NCCL 通信的稳定性和可靠性。如果在集体通信过程中遇到超时问题,可以尝试调整此环境变量以解决问题。

设置超时时间:

  • NCCL_TIMEOUT 用于定义 NCCL 集体通信操作的超时时间。超时时间是 NCCL 在执行操作时等待响应的最长时间,超出此时间将触发超时错误。

解决网络问题:

  • 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致集体通信操作超时。设置合适的 NCCL_TIMEOUT 可以帮助调节容错设置,避免训练过程中因超时错误而中断。

性能调优:

  • 根据你的集群配置和网络状况,适当调整 NCCL_TIMEOUT 可以帮助优化通信性能和稳定性。
  1. NCCL_ALGO:选择集合通信算法,如Ring, Tree;不同拓扑适合不同算法,测试选更优算法
  2. NCCL_CHUNK_SIZE:定义环形传输缓冲区大小;合理设置可提速,但也会增加内存消耗
  3. NCCL_DEBUG:打开NCCL调试日志;出现问题时打开调试,但会降低速度,不要在生产环境使用
  4. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  5. NCCL_P2P_LEVEL:设置点对点通信优化级别;增加该值可减少P2P次数,提高某些操作效率
  6. NCCL_P2P_DISABLE:禁用点对点通信,强制使用集合通信。在某些情况下,P2P 通信可能会导致性能问题或出现错误。禁用 P2P 通信可以帮助解决这些问题。如果你遇到与 P2P 通信相关的错误或不稳定性,禁用 P2P 可能有助于恢复系统的稳定性。
  7. NCCL_PXN_DISABLE:禁用使用非本地 NIC 的节点间通信,使用 NVLink 和一个中间 GPU。建议设置成1。在PyTorch中进行跨节点all-to-all通信时,如果该环境变量是0会出现异常。
  8. NCCL_SOCKET_IFNAME:选择网络接口。
  9. NCCL_SOCKET_NTHREADS 增加它的数量可以提高socker传输的效率,但是会增加CPU的负担
  10. NCCL_NET_GDR_LEVEL:设置GPUDirect RDMA的使用级别。
  11. NCCL_MAX_NRINGS:定义支持的最大NCCL环路数。
  12. NCCL_MIN_NRINGS:定义最小环路数。
  13. NCCL_BUFFSIZE:设置scratch空间大小。
  14. NCCL_BUFFLE_SIZE 缓存数据量,缓存越大一次ring传输的数据就越大自然对带宽的压力最大,但是相应的总延迟次数会少。默认值是4M(4194304),注意设置的时候使用bytes(字节大小)
  15. NCCL_NTHREADS:设置NCCL内部使用的线程数。
  16. NCCL_VERSION:显示NCCL版本信息。
  17. NCCL_MAX/MIN_NCHANNELS 最小和最大的rings,rings越多对GPU的显存、带宽的压力都越大,也会影响计算性能
  18. NCCL_CHECKS_DISABLE 在每次集合通信进行前对参数检验校对,这会增加延迟时间,在生产环境中可以设为1.默认是0
  19. NCCL_CHECK_POINTERS 在每次集合通信进行前对CUDA内存 指针进行校验,这会增加延迟时间,在生产环境中可以设为1.默认是0
  20. NCCL_NET_GDR_LEVEL GDR触发的条件,默认是当GPU和NIC挂载一个swith上面时使用GDR
  21. NCCL_IGNORE_CPU_AFFINITY 忽略CPU与应用的亲和性使用GPU与nic的亲和性为主
  22. NCCL_IB_DISABLE:禁用InfiniBand传输。

禁用 InfiniBand: 设置 NCCL_IB_DISABLE=1 会禁用 NCCL 在 InfiniBand 设备上的使用。这意味着 NCCL 将不会利用 InfiniBand 网络进行数据传输,而是回退到其他网络接口(例如以太网或其他网络接口)。

调试和兼容性: 禁用 InfiniBand 可能用于调试目的,或在系统中 InfiniBand 网络出现问题时回退到其他网络接口。如果你遇到与 InfiniBand 相关的错误或兼容性问题,禁用 InfiniBand 可能有助于解决这些问题。

  1. NCCL_IB_HCA 代表IB使用的设备:Mellanox mlx5系列的HCA设备NCCL_IB_HCA=mlx5 会默认轮询所有的设备。NCCL_IB_HCA=mlx5_0:1 指定其中一台设备。
  2. NCCL_IB_TIMEOUT 改变量用于控制InfiniBand Verbs超时。取值范围1-22。超时时间的计算公式为4.096微秒 * 2 ^ timeout,正确的值取决于网络的大小。增加该值可以在非常大的网络上提供帮助,例如 NCCL在调用ibv_poll_cq时出现错误12时。建议在大模型训练任务中设置成最大值22,可以减少不少nccl timeout异常。设置超时时间: NCCL_IB_TIMEOUT 用于控制 InfiniBand 网络操作的超时时间。通过调整这个值,你可以控制 NCCL 在遇到通信延迟或网络问题时的容忍度。解决网络问题: 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致超时错误。调整 NCCL_IB_TIMEOUT 可以帮助你在遇到网络问题时更好地调节超时设置,避免训练过程被中断。
  1. NCCL_IB_RETRY_CNT变量控制 InfiniBand 的重试次数。建议在大模型训练任务中设置成13,尽可能多重试。
  2. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  3. NCCL_IB_PCI_RELAXED_ORDERING启用 IB Verbs 传输的Relaxed Ordering。Relaxed Ordering可以极大地提高虚拟化环境下 InfiniBand 网络的性能。设置为 2,如果可用,自动使用Relaxed Ordering。设置为 1,强制使用Relaxed Ordering,如果不可用则失败。设置为 0,禁用使用Relaxed Ordering。默认值为 2。建议值为1

2、增加 dist.init_process_group 超时时间,还要对应修改NCCL变量: export TORCH_NCCL_BLOCKING_WAIT !!

dist.init_process_group(backend=kwargs.get(“backend”, “nccl”), init_method=”env://”,timeout=timedelta(seconds=7200000)) # 7200s 等待2h


export TORCH_NCCL_BLOCKING_WAIT=1  # 是否堵塞等待某节点错误超时 “0” 不堵塞等待  “1” 堵塞等待
echo $TORCH_NCCL_BLOCKING_WAIT
printenv TORCH_NCCL_BLOCKING_WAIT  # 新版本torch

export TORCH_NCCL_ASYNC_ERROR_HANDLING=1 # 是否堵塞等待某节点错误超时 “0” 不堵塞等待  “1” 堵塞等待
echo $TORCH_NCCL_ASYNC_ERROR_HANDLING
printenv TORCH_NCCL_ASYNC_ERROR_HANDLING # 新版本torch

export NCCL_BLOCKING_WAIT=1
echo $NCCL_BLOCKING_WAIT
printenv NCCL_BLOCKING_WAIT      #旧版本torch

export NCCL_ASYNC_ERROR_HANDLING=1
echo $NCCL_ASYNC_ERROR_HANDLING
printenv NCCL_ASYNC_ERROR_HANDLING   #旧版本torch

在使用 torch.distributed.init_process_group 初始化分布式训练时,timeout 参数用于指定集群中进程之间进行集体通信操作时的超时时间。这个超时时间决定了分布式进程在等待其他进程响应时的最长时间。

torch.distributed.init_process_group(backend=Noneinit_method=Nonetimeout=Noneworld_size=-1rank=-1store=Nonegroup_name=”pg_options=Nonedevice_id=None)

说明文档:https://pytorch.org/docs/stable/distributed.html

新版本torch
旧版本torch

超时设置:

  • timeout 参数用于设置分布式通信操作的超时时间。超时时间是 timedelta 对象,表示在等待其他进程响应时的最长时间。
  • 在你提供的示例中,timeout 被设置为 timedelta(seconds=108000),即 30 小时。这意味着分布式通信操作将在 30 小时内等待其他进程响应。

用途:

  • 容错性: 提高容错性,确保在长时间等待期间不会因为网络延迟或通信问题导致进程失败。
  • 调试: 在调试和测试中,设置较长的超时时间可以帮助识别是否因为超时设置过短而导致的通信问题。
  • 防止死锁: 在复杂的分布式训练任务中,长时间的超时时间有助于防止因通信死锁而导致的进程失败。

超时处理:

  • 如果在指定的超时时间内没有收到预期的响应,init_process_group 将会引发超时错误。这通常表示进程之间的通信出现了问题,可能需要检查网络连接、进程配置或其他潜在问题。

TORCH_NCCL_BLOCKING_WAIT 是一个环境变量,用于控制 PyTorch 在使用 NCCL 后端时的通信等待策略。具体来说,它决定了 NCCL 操作是否使用阻塞等待方式来处理通信操作。

TORCH_NCCL_BLOCKING_WAIT 的作用

  • TORCH_NCCL_BLOCKING_WAIT=1:
    • 启用阻塞等待: 当设置为 1 时,PyTorch 在执行 NCCL 操作(如 all-reducebroadcast)时,会使用阻塞等待的方式。这意味着 PyTorch 会等待操作完全完成或超时之后才继续执行。这种设置可以帮助确保所有进程在继续之前都完成了通信,有助于解决因异步操作引起的数据同步问题或错误。
  • TORCH_NCCL_BLOCKING_WAIT=0:
    • 禁用阻塞等待: 默认情况下(即设置为 0),PyTorch 使用非阻塞等待方式。NCCL 操作在后台异步进行,可能会导致在操作完成之前程序继续执行。这种方式可能会在网络延迟或系统负载较高时引发通信超时或数据不一致的问题。

如何设置 TORCH_NCCL_BLOCKING_WAIT

你可以通过以下方式设置 TORCH_NCCL_BLOCKING_WAIT 环境变量:

  1. 临时设置: 在运行程序时,可以在命令行中临时设置环境变量:bash复制代码TORCH_NCCL_BLOCKING_WAIT=1 python your_training_script.py
  2. 永久设置: 在终端会话中,可以通过 export 命令永久设置:bash复制代码export TORCH_NCCL_BLOCKING_WAIT=1 这个设置会在当前终端会话中生效,直到会话结束或重新启动。
  3. 在脚本中设置: 如果你希望在 Python 脚本内部设置这个变量,可以在脚本的开头添加:python复制代码import os os.environ['TORCH_NCCL_BLOCKING_WAIT'] = '1'

使用场景

  • 调试和稳定性:
    • 启用阻塞等待有助于调试和解决 NCCL 操作中的同步问题。它确保所有通信操作完成后才继续执行,有助于提高系统的稳定性。
  • 网络不稳定和负载高:
    • 在网络延迟较高或系统负载较大的环境中,启用阻塞等待可以减少由于异步操作导致的超时和错误。

注意事项

  • 性能影响:
    • 阻塞等待可能会增加通信操作的等待时间,影响整体训练性能,特别是在大规模分布式训练任务中。
  • 超时问题:
    • 如果超时时间设置过短或网络状况较差,启用阻塞等待可能导致更多的超时错误。因此,需要平衡稳定性和性能。

总结

TORCH_NCCL_BLOCKING_WAIT 环境变量控制 PyTorch 使用 NCCL 后端时的通信等待策略。设置为 1 可以启用阻塞等待,有助于提高系统稳定性和调试能力,但可能会影响性能。根据具体的训练任务和环境,可以选择合适的设置来优化训练过程。

相关环境变量解释:

https://pytorch.org/docs/stable/torch_nccl_environment_variables.html

3、增加 num_workers 来加快处理数据【Dataloader阶段导致 NCCL超时】

如果是在数据加载的时间过长,导致NCCL通信超时,考虑增加num_workers来提高数据加载速度。

减少数据加载瓶颈:

  • 增加 num_workers 可以提高数据加载速度,减少训练过程中因数据加载而导致的等待时间。这可以间接减少由于数据处理缓慢而可能引发的 NCCL 超时问题。

提高训练效率:

  • 更高效的数据加载可以提高整体训练效率,使训练过程更加顺畅,从而可能减少由于系统负载不均导致的通信超时问题。

4、 DistributedSampler 采样阶段导致 NCCL超时:

如果分布式训练中 NCCL 超时问题发生在采样阶段(特别是在使用 DistributedSampler 或自定义的采样器时),可能表明存在某些潜在的问题,这些问题可能导致训练进程之间的同步或数据传输效率低下。以下是一些可能的原因和解决方法:

可能的原因

  1. 数据加载和采样速度问题:
    • 如果采样器的性能不佳,可能会导致数据加载速度变慢,从而影响训练过程。虽然这不会直接导致 NCCL 超时,但它会间接影响整体训练性能。
  2. 进程同步问题:
    • 在使用 DistributedSampler 时,所有进程需要同步以确保数据的一致性。如果采样器在某些进程中出现延迟或阻塞,可能会导致通信超时。
  3. 数据分布不均:
    • 如果数据分布不均,某些进程可能会比其他进程处理更多的数据,从而导致通信延迟和超时问题。
  4. 数据预处理复杂:数据预处理太复杂,会导致数据加载过慢,也有可能导致超时

解决方法:

  1. 优化采样器和数据加载:
    • 确保自定义采样器或 DistributedSampler 以高效的方式进行数据采样和分配。优化数据加载速度,确保每个进程在采样时不会长时间等待。
    • 使用 num_workers 设置合理的数量,以加快数据加载速度,但要注意 CPU 内存和系统负载。
  2. 调整超时时间:
    • 增加 NCCL_TIMEOUT 环境变量值或 dist.init_process_group 中的 timeout 参数,以允许更长的等待时间。

pytorch分布式 训练参数设置

# 自己的数据获取
dataset = MyDataset(input_size, data_size)
 
# 使用 DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
 
trainloader = DataLoader(dataset=dataset,
                         pin_memory=true,
                         shuffle=(train_sampler is None),   # 使用分布式训练 shuffle 应该设置为 False
                         batch_size=args.batch_size,
                         num_workers=args.workers,
                         sampler=train_sampler)

需要注意的几个参数:batch_size、num_workers、shuffle、pin_memory在进行多机多卡以及单机多卡的设置。

1、 Batch_size设置:

Dataparallel : 设置 batch_size 是指总多卡的Batch size,数据被直接划分到多个 GPU 上

DistributedDataParallel batch size 设置成单卡一样即可,因为各个GPU对应的进程独立从磁盘中加载数据这里的 Batch_size指的是单卡的。

2、shuffle设置:

shuffle:

Dataparallel  :设置 ‘shuffle’: True

DistributedDataParallel  :为了能够按顺序划分数据子集,拿到不同部分数据,所以数据集不能够进行随机打散,所以用了参数 ‘shuffle’: False

3、 pin_memory 设置:

是否提前申请CUDA内存(默认为False,但有说法除非数据集很小,否则在N卡上推荐总是打开)。

如果开了pin memory:
每个worker都需要缓存一个batch的数据.
batch size和num_workers都大, 显存会炸

为什么 设置 pip_memory=true, 看解释:
多GPU训练的时候注意机器的内存是否足够(一般内存为显卡显存x2),如果不够,建议关闭pin_memory(锁页内存)选项。
采用DistributedDataParallel多GPUs训练的方式比DataParallel更快一些,如果你的Pytorch编译时有nccl的支持,那么最好使用DistributedDataParallel方式。
关于什么是锁页内存:
pin_memory就是锁页内存,创建DataLoader时,设置pin_memory=True,则意味着生成的Tensor数据最开始是属于内存中的锁页内存,这样将内存的Tensor转义到GPU的显存就会更快一些。
主机中的内存,有两种存在方式,一是锁页,二是不锁页,锁页内存存放的内容在任何情况下都不会与主机的虚拟内存进行交换(注:虚拟内存就是硬盘),而不锁页内存在主机内存不足时,数据会存放在虚拟内存中。显卡中的显存全部是锁页内存,当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。因为pin_memory与电脑硬件性能有关,pytorch开发者不能确保每一个炼丹玩家都有高端设备,因此pin_memory默认为False。

当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。pin_memory默认为False。

4、 num_workers 设置:num_worker的设置值一般是所运行机子上的CPU核心数

可以设置set num_workers =4 x number of available GPUs 

um_worker大: 下一轮迭代的batch可能在上一轮/上上一轮…迭代时已经加载好了。 坏处是GPU memory开销大 (这是开了pin memory的情况吧) ,也加重了CPU负担。

CPU的物理个数:grep ‘physical id’ /proc/cpuinfo | sort | uniq | wc -l 结果为2,说明CPU有两个。 每个CPU的核数:cat /proc/cpuinfo |grep “cores”|uniq 10,说明每个10核。 cpu核数 = 2×10

1、cpu个数

grep ‘physical id’ /proc/cpuinfo | sort -u

2、核心数【当数据集较大时建议采用,num_works一般设置为(CPU 核心数 +-1)为最佳】

grep ‘core id’ /proc/cpuinfo | sort -u | wc -l

3、线程数

grep ‘processor’ /proc/cpuinfo | sort -u | wc -l

一般建议 num_workers 的值接近 CPU 核心数,但不要超过,以免导致过多的上下文切换。

如果数据集较大且预处理复杂,较高的 num_workers 值可能会更有效。反之,如果数据集较小或者预处理简单,则可能不需要太多的工作线程。

Num workers:只要你的 GPU 计算占用没有用满,说明 GPU 要等数据准备。可以试着增加进程数目,同时观察是否是硬盘 IO 瓶颈,如果是多机训练,还要注意网络瓶颈。不过,最大也不能超过核心数,一般还要减一点,因为主进程,多卡多进程训练,都会占用核心。

num_worker通过影响数据加载速度,从而影响训练速度。 每轮dataloader加载数据时:dataloader一次性创建num_worker个worker,worker就是普通的工作进程。并用batch_sampler将指定batch分配给指定的worker,worker将它负责的batch加载进RAM。然后,dataloader从RAM中找本轮迭代要用的batch,如果找到了,就使用;如果没找到,就用num_worker个worker继续加载batch到RAM,直到dataloader在RAM中找到目标batch。

pytorch单机多卡训练【分布式数据并行 和 数据并行方案】

https://github.com/KaiiZhang/DDP-Tutorial/blob/main/DDP-Tutorial.md

数据并行和分布式数据并行方案:

第一: 数据并行 , 开一个进程(process),该进程下每个线程(threading)负责一部分数据,分别跑在不同卡上,前向传播,devices各玩各的,计算loss时候需要所有devices的输出输送到主GPU【默认device0】上计算梯度均值,并更新device0上的参数,然后将参数广播到其他device上。总结:单机-多线程,通过torch.nn.DataParallel 实现。
第二: 分布式数据并行,开多个进程,一个进程运行在一张卡上,每个进程负责一部分数据。在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast 到所有进程。各进程用该梯度来更新参数。由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。

总结:单机/多机-多进程,通过torch.nn.parallel.DistributedDataParallel 实现。

毫无疑问,第一种简单,第二种复杂,毕竟 进程间 通信比较复杂。

torch.nn.DataParallel 和 torch.nn.parallel.DistributedDataParallel,下面简称为DPDDP

总结: 两个函数主要用于在多张显卡上训练模型,也就是所谓的分布式训练

数据并行 torch.nn.DataParallel  :

原理:

  • 网络前向传播前,输入数据被分成几份送到不同显卡上,网络模型每个显卡上拷贝一份。
  • 前向传播时,devices各玩各的。
  • 前向传播完成后,每张显卡上的网络输出会送到主device上(默认第一张卡),在主device上计算loss。然后,loss送给每个device,每个device计算得到梯度,再把梯度送到主device上,主device对汇总得到的梯度求均值后,更新主device上的网络参数。最后,将更新后的网络权重广播(broadcast)到其它device上,实现所有device网络权重同步。
  • torch.nn.DataParallel是把每张卡的输出聚合到GPU0上,然后在GPU0上与label计算loss,根据计算图反向传播,让每张卡上获得自己的梯度。优化器则对梯度进行聚合,在主GPU更新模型参数,再把新的参数分发到每个GPU。

从上面介绍可知,DataParallel 对主device依赖较高,会造成负载不均衡,限制模型训练速度。

DP使用教程:

主程序DP_main.py中,下面这行代码实现数据并行化分布式训练。

相比单卡单机代码:只需要修改以下代码:

model_train = torch.nn.DataParallel(model)	

通过终端运行命令,

CUDA_VISIBLE_DEVICES=0,1 python3 DP_main.py

DP_main.py代码:

import torch
import torchvision
import torch.nn as nn
import torch.backends.cudnn as cudnn
import torchvision.transforms as transforms
from net import ToyModel
import torch.optim as optim


#---------------------------#
#   获得学习率
#---------------------------#
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

#---------------------------#
#   获得数据集
#---------------------------#
def get_dataset():
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    CIFAR10_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, 
        download=True, transform=transform_train)
    
    # ----------------------------------------------------------#
    #   num_workers:加载数据集使用的线程数
    #   pin_memory=True:锁页内存, 可以加速数据读取. (可能会导致Bug)
    # ----------------------------------------------------------#
    trainloader = torch.utils.data.DataLoader(CIFAR10_trainset, 
        batch_size=16, num_workers=2, pin_memory=True)
    return trainloader

#---------------------------#
#   训练
#---------------------------#
def train(model, device, trainloader, optimizer, loss_func, print_frequence, epoch):
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(trainloader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_func(outputs, targets)
        loss.backward()
        optimizer.step()

        # loss.item()把其中的梯度信息去掉,没.item()可能会导致程序所占内存一直增长,然后被计算机killed
        train_loss += loss.item()       
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        if batch_idx % print_frequence == print_frequence - 1 or print_frequence == trainloader.__len__() - 1:
            print('epoch: %d | Loss: %.3f | Acc: %.3f%% (%d/%d)' % (
                epoch, train_loss / (batch_idx + 1), 100. * correct / total, correct, total))
    torch.save(model.state_dict(), "%d.ckpt" % epoch)	
    # torch.save(model.module.state_dict(), "%d.ckpt" % epoch)	用双卡训练保存权重,重新加载时,也需要这样保存,否则,权重前面会多module
    
    # -------------------------------------#
    #   只是想看看lr有没有衰减
    # -------------------------------------#
    lr = get_lr(optimizer)
    print("lr:", lr)
    lr_scheduler.step()


if __name__ == '__main__':
    trainloader = get_dataset()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = ToyModel()
    print(model)

    model_train = model.train()
    if torch.cuda.is_available():   
        model_train = torch.nn.DataParallel(model)  # 单GPU跑套DP的话,指标可能会降
        cudnn.benchmark = True
        model_train = model_train.cuda()            # 等效于model_train = model_train.to(device)

    loss_func = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model_train.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    # -------------------------------------#
    #   step_size控制多少个epoch衰减一次学习率
    # -------------------------------------#
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)   
    
    print_frequence = 500
    epochs = 100
    for epoch in range(0, epochs):
        train(model_train, device, trainloader, optimizer, loss_func, print_frequence, epoch)

分布式并行DistributedDataParallel 

  • 更快的训练速度
  • 多进程的运行方式
  • 支持单机多卡和多机多卡
  • 平衡的GPU使用

DDP原理:

先说分布式几个名词:
一个world里进程个数为world_size,全局看,每个进程都有一个序号rank;分开看,一个进程在每台机器里面也有序号local_rank。

  • group:进程组,默认一个组,即一个world
  • world_size:全局进程个数
  • rank:进程序号,用于进程间通信。rank=0为GPU主卡,主要用于多机多卡。本文中仅涉及到一台机器内多张卡。
  • locak_rank:进程(一台机器)内的GPU编号,通过指令torch.distributed.run自动指定,不需要用户输入该参数。

DDP 在每次迭代中,操作系统会为每个GPU创建一个进程,每个进程具有自己的 optimizer ,并独立完成所有的优化步骤,进程内与一般的训练无异。在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast 到所有进程。各进程用该梯度来更新参数。由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。

而在 DataParallel 中,全程维护一个 optimizer,对各 GPU 上梯度进行求和,在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU。相较于 DP,DDP传输的数据量更少,速度更快,效率更高。

DDP的流程示意图如上图所示,DDP需要额外的建立进程组阶段(Construction)。在Construction阶段需要首先明确通信协议和总进程数。通信协议是实现DDP的底层基础,我们在之后单独介绍。总进程数就是指有多少个独立的并行进程,被称为worldsize。根据需求每个进程可以占用一个或多个GPU,但并不推荐多个进程共享一个GPU,这会造成潜在的性能损失。为了便于理解,在本文的所有示例中我们假定每个进程只占用1个GPU,占用多个GPU的情况只需要简单的调整GPU映射关系就好。

并行组建立之后,每个GPU上会独立的构建模型,然后GPU-1中模型的状态会被广播到其它所有进程中以保证所有模型都具有相同的初始状态。值得注意的是Construction只在训练开始前执行,在训练中只会不断迭代前向和后向过程,因此不会带来额外的延迟。

相比于DataParallel,DDP的前向后向过程更加简洁。推理、损失函数计算,梯度计算都是并行独立完成的。DDP实现并行训练的核心在于梯度同步。梯度在模型间的同步使用的是allreduce通信操作,每个GPU会得到完全相同的梯度。如图中后向过程的步骤2,GPU间的通信在梯度计算完成后被触发(hook函数)。图中没有画出的是,通常每个GPU也会建立独立的优化器。由于模型具有同样的初始状态和后续相同的梯度,因此每轮迭代后不同进程间的模型是完全相同的,这保证了DDP的数理一致性。

为了优化性能,DDP中针对allreduce操作进行了更深入的设计。梯度的计算过程和进程间的通信过程分别需要消耗一定量的时间。等待模型所有的参数都计算完梯度再进行通信显然不是最优的。如下图所示,DDP中的设计是通过将全部模型参数划分为无数个小的bucket,在bucket级别建立allreduce。当所有进程中bucket0的梯度计算完成后就立刻开始通信,此时bucket1中梯度还在计算。这样可以实现计算和通信过程的时间重叠。这种设计能够使得DDP的训练更高效。

在最后我们对DDP的通信部分进行介绍。DDP后端的通信由多种CPP编写的协议支持,不同协议具有不同的通信算子的支持,在开发中可以根据需求选择。

对于CV和NLP常用GPU训练的任务而言,选择Gloo或NCCL协议即可。一个决定因素是你使用的计算机集群的网络环境:

  • 当使用的是Ethernet(以太网,大部分机器都是这个环境):那么优先选择NCCL,具有更好的性能;如果在使用中遇到了NCCL通信的问题,那么就选择Gloo作为备用。(经验:单机多卡直接NCCL;多机多卡先尝试NCCL,如果通信有问题,而且自己解决不了,那就Gloo。
  • 当使用的是InfiniBand:只支持NCCL。

另一个决定性因素是二者支持的算子范围不同,因此在使用时还需要结合代码里的功能来确定。下图记录了每种通信协议能够支持的算子,Gloo能够实现GPU中最基本的DDP训练,而NCCL能够支持更加多样的算子.

不同Backend的算子支持情况

DDP使用:

  • 设备间通信
    为了保证不同卡上的模型参数同步,设备间需要通讯。
    设备间通讯通过后端backend实现,GPU上用nccl,CPU上用gloo
torch.distributed.init_process_group('nccl')
  • 指定GPU
    指定使用哪些GPU,作用相当于CUDA_VISIBLE_DEVICES命令。
torch.cuda.set_device(args.local_rank)   
  • 构造模型
    构造DDP model,[args.local_rank]是一个list
model = DistributedDataParallel(model, device_ids=[args.local_rank], 
   										output_device=args.local_rank)
  • 构建数据集
    构建数据集中需要用到train_sampler来shuffle数据,继而实现把trainset中的样本随机分配到不同的GPU上,
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
# ---------------------------------------------------------------#
#   sampler参数和shuffle参数是互斥的,两个传一个就好,都用于数据打乱。
# ----------------------------------------------------------------#
trainloader = torch.utils.data.DataLoader(trainset, 
        batch_size=16, num_workers=2, sampler=train_sampler)
  • 数据放到多卡上
    模型、损失函数、输入数据要放到多卡上,代码例如:
data = data.to(args.local_rank)		# 等效于data.cuda(args.local_rank)

通过终端运行命令,

# CUDA_VISIBLE_DEVICES="gpu_0, gpu1,..." python -m torch.distributed.launch --nproc_per_node n_gpus DDP_main.py
CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node=2 DDP_main.py # 因为是单机多卡,所以只需要指定nproc_per_node【GPU数量】即可。local_rank不需要设置。
大概内容就是,这个命令行参数“–loacl_rank”是必须声明的,但它不是由用户填写的,而是由pytorch为用户填写,也就是说这个值是会被自动赋值为当前进程在本机上的rank

DDP_main.py中内容如下:

import argparse         # 从命令行接受参数
from tqdm import tqdm   # 用于进度条
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from net import ToyModel
import torchvision.transforms as transforms
# ---------------------------#
#   下面两个包用于分布式训练
# ---------------------------#
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# ---------------------------#
#   获得数据集
# ---------------------------#
def get_dataset():
    transform = torchvision.transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True, 
        download=True, transform=transform)
    # -----------------------------------------------#
    #   train_sampler主要用于DataLoader中shuffle数据
    #       把trainset中的样本随机分配到不同的GPU上
    # -----------------------------------------------#
    train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
    # ---------------------------------------------------------------#
    #   batch_size:每个进程(GPU/卡)下的batch_size。
    #       总batch_size = 这里的batch_size * 进程并行数
    #       全局进程个数world_size = 节点数量 * 每个节点上process数量
    #       总卡数                =  电脑数  * 每台电脑上有多少张卡
    #   sampler参数和shuffle参数是互斥的,两个传一个就好,都用于数据打乱。
    #   在DDP中,用sampler参数
    # ----------------------------------------------------------------#
    trainloader = torch.utils.data.DataLoader(trainset, 
        batch_size=16, num_workers=2, sampler=train_sampler)
    return trainloader

#---------------------------#
#   训练
#---------------------------#
def train(model, trainloader, optimizer, loss_func, lr_scheduler, epoch):
    model.train()
    iterator = tqdm(range(epoch))       # 为了进度条显示而已
    for epoch in iterator:
        # ------------------------------------------------------------------#
        #   设置sampler的epoch,DistributedSampler需要这个来指定shuffle方式,
        #   通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。
        #   这一步是必须的,让数据充分打乱,训练效果更好
        # ------------------------------------------------------------------#
        trainloader.sampler.set_epoch(epoch)

        for data, label in trainloader:
            data, label = data.to(args.local_rank), label.to(args.local_rank)
            optimizer.zero_grad()
            prediction = model(data)
            loss = loss_func(prediction, label)
            loss.backward()
            iterator.desc = "loss = %0.3f" % loss
            optimizer.step()
        # ------------------------------------------------------------------#
        #   save模型的时候:保存的是model.module而不是model,
        #       因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。
        #   只需要在进程0(local_rank=0)上保存一次就行了,避免多次重复保存。
        # ------------------------------------------------------------------#
        if dist.get_rank() == 0:        # 等效于 if local_rank == 0:
            torch.save(model.module.state_dict(), "%d.ckpt" % epoch)
        
        lr_scheduler.step()

# -----------------------------------------------#
# 初始化配置local_rank配置
# -----------------------------------------------#
parser = argparse.ArgumentParser()
# local_rank:当前这个节点上的第几张卡,从外部传入
#   该步骤必须有,launch会自动传入这个参数
parser.add_argument("--local_rank",help="local device id on current node", type=int)
args = parser.parse_args()
local_rank = args.local_rank        # 纯属想写代码时用local_rank还是args.local_rank都行
print('local_rank:', args.local_rank)
"""
local_rank: 0
local_rank: 1
"""


if __name__ == "__main__":
    # DDP 初始化
    torch.cuda.set_device(args.local_rank)   # 作用相当于CUDA_VISIBLE_DEVICES命令,修改环境变量
    dist.init_process_group(backend='nccl')  # 设备间通讯通过后端backend实现,GPU上用nccl,CPU上用gloo

    # 准备数据,要在DDP初始化之后进行
    trainloader = get_dataset()

    # 初始化model
    model = ToyModel().to(args.local_rank)    # 等效于model = ToyModel().cuda(args.local_rank)

    # Load模型参数要在构造DDP model之前,且只需要在 master卡 上加载即可
    ckpt_path = None
    if dist.get_rank() == 0 and ckpt_path is not None:
        model.load_state_dict(torch.load(ckpt_path))

    # 构造DDP model
    model = DDP(model, device_ids=[args.local_rank], output_device=args.local_rank)

    # 初始化optimizer,要在构造DDP model之后
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

    # 学习率衰减方式
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)   

    # 初始化loss
    loss_func = nn.CrossEntropyLoss().to(args.local_rank)

    # 模型训练
    train(model, trainloader, optimizer, loss_func, lr_scheduler, epoch=100)
# ----------------------------------------------------------------------------------#
#   CUDA_VISIBLE_DEVICES:来决定使用哪些GPU,个数和后面n_gpus相同
#   torch.distributed.launch:启动DDP模式,构建多个进程,也会向代码中传入local_rank参数,
#       没有CUDA_VISIBLE_DEVICES限制的话,传入为从 0 到 n_gpus-1 的索引
#   --nproc_per_node=n_gpus:单机多卡,用几个gpu
# -----------------------------------------------------------------------------------#
# 用 2 张卡跑
CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 DDP_main.py
# 用 3 张卡跑     
CUDA_VISIBLE_DEVICES="1,2,3" python -m torch.distributed.launch --nproc_per_node 3 DDP_main.py  

pytorch多机多卡训练【DistributedDataParallel】

https://github.com/KaiiZhang/DDP-Tutorial/blob/main/DDP-Tutorial.md#distributeddataparallel

原理

DDP的流程示意图如上图所示,DDP需要额外的建立进程组阶段(Construction)。在Construction阶段需要首先明确通信协议和总进程数。通信协议是实现DDP的底层基础,我们在之后单独介绍。总进程数就是指有多少个独立的并行进程,被称为worldsize。根据需求每个进程可以占用一个或多个GPU,但并不推荐多个进程共享一个GPU,这会造成潜在的性能损失。为了便于理解,在本文的所有示例中我们假定每个进程只占用1个GPU,占用多个GPU的情况只需要简单的调整GPU映射关系就好。

并行组建立之后,每个GPU上会独立的构建模型,然后GPU-1中模型的状态会被广播到其它所有进程中以保证所有模型都具有相同的初始状态。值得注意的是Construction只在训练开始前执行,在训练中只会不断迭代前向和后向过程,因此不会带来额外的延迟。

相比于DataParallel,DDP的前向后向过程更加简洁。推理、损失函数计算,梯度计算都是并行独立完成的。DDP实现并行训练的核心在于梯度同步。梯度在模型间的同步使用的是allreduce通信操作,每个GPU会得到完全相同的梯度。如图中后向过程的步骤2,GPU间的通信在梯度计算完成后被触发(hook函数)。图中没有画出的是,通常每个GPU也会建立独立的优化器。由于模型具有同样的初始状态和后续相同的梯度,因此每轮迭代后不同进程间的模型是完全相同的,这保证了DDP的数理一致性。

为了优化性能,DDP中针对allreduce操作进行了更深入的设计。梯度的计算过程和进程间的通信过程分别需要消耗一定量的时间。等待模型所有的参数都计算完梯度再进行通信显然不是最优的。如下图所示,DDP中的设计是通过将全部模型参数划分为无数个小的bucket,在bucket级别建立allreduce。当所有进程中bucket0的梯度计算完成后就立刻开始通信,此时bucket1中梯度还在计算。这样可以实现计算和通信过程的时间重叠。这种设计能够使得DDP的训练更高效。

在最后我们对DDP的通信部分进行介绍。DDP后端的通信由多种CPP编写的协议支持,不同协议具有不同的通信算子的支持,在开发中可以根据需求选择。

对于CV和NLP常用GPU训练的任务而言,选择Gloo或NCCL协议即可。一个决定因素是你使用的计算机集群的网络环境:

  • 当使用的是Ethernet(以太网,大部分机器都是这个环境):那么优先选择NCCL,具有更好的性能;如果在使用中遇到了NCCL通信的问题,那么就选择Gloo作为备用。(经验:单机多卡直接NCCL;多机多卡先尝试NCCL,如果通信有问题,而且自己解决不了,那就Gloo。)
  • 当使用的是InfiniBand:只支持NCCL。

另一个决定性因素是二者支持的算子范围不同,因此在使用时还需要结合代码里的功能来确定。下图记录了每种通信协议能够支持的算子,Gloo能够实现GPU中最基本的DDP训练,而NCCL能够支持更加多样的算子

综上,得益于DDP的分布式并行设计,DDP并不受PythonGIL争用的影响,是以多进程的方式运行的。这也使得DDP可以支持多机多卡的训练。我们将DDP的优缺点概括如下:

不同Backend的算子支持情况

优点

  • 更快的训练速度
  • 多进程的运行方式
  • 支持单机多卡和多机多卡
  • 平衡的GPU使用

缺点

  • 需要更多的代码书写和设计

代码实现和参数讲解:

本文首先会基于MNIST图像分类建立一个最小原型,然后逐步改进它以实现多机多卡的训练和混合精度的支持。在讲述的思路上本文借鉴了Kevin Kaichuang Yang的教程,但在实现细节上有较大的差异。特别的是本文增加了对DDP启动方式的探讨,并且介绍了多进程通信操作的使用样例。

名词解释:一个world里进程个数为world_size【对于2卡2GPU, world_size =4】,全局看,每个进程都有一个序号rank【0为主机GPU主卡】;分开看,一个进程在每台机器里面也有序号local_rank。

  • group:进程组,默认一个组,即一个world
  • world_size:全局进程个数【对于2卡2GPU, world_size =4】
  • rank:进程序号,用于进程间通信。rank=0为GPU主卡,主要用于多机多卡。本文中仅涉及到一台机器内多张卡。
  • locak_rank:进程内的GPU编号,通过指令torch.distributed.run自动指定,不需要认为设置。

非多进程示例

首先引入了所有用到的库。

from datetime import datetime
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from tqdm import tqdm

定义一个简单的卷积神经网络模型。

class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

定义主函数,添加一些启动脚本的可选参数。

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-g', '--gpuid', default=0, type=int,
                        help="which gpu to use")
    parser.add_argument('-e', '--epochs', default=2, type=int, 
                        metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('-b', '--batch_size', default=4, type=int, 
                        metavar='N',
                        help='number of batchsize')         

    args = parser.parse_args()
    train(args.gpuid, args)

然后给出训练函数的详细内容。

def train(gpu, args):
    model = ConvNet()
    model.cuda(gpu)
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().to(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)

    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=args.batch_size,
                                               shuffle=True,
                                               num_workers=0,
                                               pin_memory=True,
                                               sampler=None)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        model.train()
        for i, (images, labels) in enumerate(tqdm(train_loader)):
            images = images.to(gpu)
            labels = labels.to(gpu)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step,
                                                                   loss.item()))
    print("Training complete in: " + str(datetime.now() - start))

最后确保主函数被启动。

if __name__ == '__main__':
    main()

以上是我们的MNIST图像分类最小原型,可以通过如下命令启动在指定单个GPU上的训练:

python train.py -g 0

多进程示例

在开始对最小原型的改造之前,我们还需要交代一些事情。在DDP的代码实现中,最重要的步骤之一就是初始化。所谓初始化对应于上文介绍的Construction阶段,每个进程中需要指明几个关键的参数:

  • backend:明确后端通信方式,NCCL还是Gloo
  • init_method:初始化方式,TCP还是Environment variable(Env),可以简单理解为进程获取关键参数的地址和方式
  • world_size:总的进程数有多少
  • rank:当前进程是总进程中的第几个

初始化方式不同会影响代码的启动部分。本文会分别给出TCP和ENV模式的样例。TCP模式

让我们先从TCP开始,注意那些标记被更改的代码部分:

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-g', '--gpuid', default=0, type=int,
                        help="which gpu to use")
    parser.add_argument('-e', '--epochs', default=1, type=int, 
                        metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('-b', '--batch_size', default=4, type=int, 
                        metavar='N',
                        help='number of batchsize')   
    ##################################################################################
    parser.add_argument('--init_method', default='tcp://localhost:18888',            #
                        help="init-method")                                          #
    parser.add_argument('-r', '--rank', default=0, type=int,                         #
                    help='rank of current process')                                  #
    parser.add_argument('--world_size', default=2, type=int,                         #
                        help="world size")                                           #
    parser.add_argument('--use_mix_precision', default=False,                        #
                        action='store_true', help="whether to use mix precision")    #
    ##################################################################################                  
    args = parser.parse_args()
    train(args.gpuid, args)

在main函数中需要增加了以下参数:

  • args.init_method:url地址,用来指明的初始化方法。在tcp初始化方法中,其格式应为:tcp:[ IP ]:[ Port ] 。IP为rank=0进程所在的机器IP地址,Port为任意一个空闲的端口号。当采用的是单机多卡模式时,IP可以默认为//localhost
  • args.rank:当前进程在所有进程中的序号
  • args.world_size:进程总数【一共几块GPU】
  • args.use_mix_precision:布尔变量,控制是否使用混合精度
def train(gpu, args):
    ########################################    N1    ####################################################################
    dist.init_process_group(backend='nccl', init_method=args.init_method, rank=args.rank, world_size=args.world_size)    #
    ######################################################################################################################
    model = ConvNet()
    model.cuda(gpu)
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().to(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Wrap the model
    #######################################    N2    ########################
    model = nn.SyncBatchNorm.convert_sync_batchnorm(model)                  #
    model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])    #
    scaler = GradScaler(enabled=args.use_mix_precision)                   #
    #########################################################################
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    ####################################    N3    #######################################
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)      #
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,                   #
                                               batch_size=args.batch_size,              #
                                               shuffle=False,                           #
                                               num_workers=0,                           #
                                               pin_memory=True,                         #
                                               sampler=train_sampler)                   #
    #####################################################################################
    start = datetime.now()
    total_step = len(train_loader) # The number changes to orignal_length // args.world_size
    for epoch in range(args.epochs):
        ################    N4    ################
        train_loader.sampler.set_epoch(epoch)    #
        ##########################################
        model.train()
        for i, (images, labels) in enumerate(tqdm(train_loader)):
            images = images.to(gpu)
            labels = labels.to(gpu)
            # Forward pass
            ########################    N5    ################################
            with torch.cuda.amp.autocast(enabled=args.use_mix_precision):    #
                outputs = model(images)                                      #
                loss = criterion(outputs, labels)                            #
            ##################################################################  
            # Backward and optimize
            optimizer.zero_grad()
            ##############    N6    ##########
            scaler.scale(loss).backward()    #
            scaler.step(optimizer)           #
            scaler.update()                  #
            ##################################
            ################    N7    ####################
            if (i + 1) % 100 == 0 and args.rank == 0:    #
            ##############################################   
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step,
                                                                   loss.item()))            
    ############    N8    ###########
    dist.destroy_process_group()    #                                       
    if args.rank == 0:              #
    #################################
        print("Training complete in: " + str(datetime.now() - start))

在训练函数中增加/修改了以下内容:

  • N1:增加了DDP初始化的代码,需要指明backend、init_method、rank和world_size。其含义在前文都有介绍。
  • N2:在并行环境下,对于用到BN层的模型需要转换为同步BN层;其次,用DistributedDataParallel将模型封装为一个DDP模型,并复制到指定的GPU上。封装时不需要更改模型内部的代码;设置混合精度中的scaler,通过设置enabled参数控制是否生效。
  • N3:DDP要求定义distributed.DistributedSampler,通过封装train_dataset实现;在建立DataLoader时指定sampler。此外还要注意:shuffle=False。DDP的数据打乱需要通过设置sampler,参考N4。
  • N4:在每个epoch开始前打乱数据顺序。(注意total_step已经变为orignal_length // args.world_size。)
  • N5:利用torch.cuda.amp.autocast控制前向过程中是否使用半精度计算。
  • N6: 当使用混合精度时,scaler会缩放loss来避免由于精度变化导致梯度为0的情况。
  • N7:为了避免log信息的重复打印,可以只允许rank0号进程打印。
  • N8: 清理进程;然后,同上。

假设服务器环境为2台服务器(也称为2个node),每台服务器两块GPU。启动方式为:

# Node 0 : ip 192.168.1.201  port : 12345
# terminal-0
python mnist-tcp.py --init_method tcp://192.168.1.201:12345 -g 0 --rank 0 --world_size 4 --use_mix_precision
# terminal-1
python mnist-tcp.py --init_method tcp://192.168.1.201:12345 -g 1 --rank 1 --world_size 4 --use_mix_precision

# Node 1 : 
# terminal-0
python tcp_init.py --init_method tcp://192.168.1.201:12345 -g 0 --rank 2 --world_size 4 --use_mix_precision
# terminal-1
python tcp_init.py --init_method tcp://192.168.1.201:12345 -g 1 --rank 3 --world_size 4 --use_mix_precision

TCP模式启动很好理解,需要在bash中独立的启动每一个进程,并为每个进程分配好其rank序号。缺点是当进程数多的时候启动比较麻烦。完整的脚本文件见这里


ENV模式

ENV模式启动会更简洁,对于每个进程并不需要在dist.init_process_group中手动的指定其rank、world_size和url。程序会在环境变量中去寻找这些值。代码如下:

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-g', '--gpuid', default=0, type=int,
                        help="which gpu to use")
    parser.add_argument('-e', '--epochs', default=1, type=int, 
                        metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('-b', '--batch_size', default=4, type=int, 
                        metavar='N',
                        help='number of batchsize')   
    ##################################################################################
    parser.add_argument("--local_rank", type=int,                                    #
                        help='rank in current node')                                 #
    parser.add_argument('--use_mix_precision', default=False,                        #
                        action='store_true', help="whether to use mix precision")    #
    ##################################################################################                  
    args = parser.parse_args()
    #################################
    train(args.local_rank, args)    #
    #################################
  • args.local_rank:这里指的是当前进程在当前机器中的序号,注意和在全部进程中序号的区别。在ENV模式中,这个参数是必须的,由启动脚本自动划分,不需要手动指定。要善用local_rank来分配GPU_ID。
  • train(args.local_rank, args):一般情况下保持local_rank与进程所用GPU_ID一致。
def train(gpu, args):
    ##################################################################
    dist.init_process_group(backend='nccl', init_method='env://')    #
    args.rank = dist.get_rank()                                      #
    ##################################################################
    model = ConvNet()
    ...
  • 训练函数中仅需要更改初始化方式即可。在ENV中只需要指定init_method='env://'。TCP所需的关键参数模型会从环境变量中自动获取,环境变量可以在程序外部启动时设定,参考启动方式。
  • 当前进程的rank值可以通过dist.get_rank()得到
  • 之后的代码与TCP完全相同

假设服务器环境为2台服务器(也称为2个node),每台服务器两块GPU。ENV模式的启动方式为:

# Node 0 : ip 192.168.1.201  port : 12345
# terminal-0
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=2 --node_rank=0 --master_addr="192.168.1.201" --master_port=12345 mnist-env.py --use_mix_precision

# Node 1 : 
# terminal-0
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=2 --node_rank=1 --master_addr="192.168.1.201" --master_port=12345 mnist-env.py --use_mix_precision

ENV模式可以使用pytorch中的启动脚本torch.distributed.launch启动。在启动命令中需要指明多个参数:

  • nproc_per_node: 每台机器中运行几个进程【每台机器几个GPU】
  • nnodes:一共使用多少台机器
  • node_rank:当前机器的序号【非GPU序号】
  • master_addr:0号机器的IP
  • master_port:0号机器的可用端口

可以看到无论一台机器中的进程数为多少,只需要一行命令就可以启动,相比于TCP模式启动方式更加简洁。

训练中对模型在验证集上进行验证也是必不可少的步骤之一,那么如何在上述demo中增加模型验证的代码呢?如何实现模型的并行验证?

####################################    N11    ##################################
def evaluate(model, gpu, test_loader, rank):
    model.eval()
    size = torch.tensor(0.).to(gpu)
    correct = torch.tensor(0.).to(gpu)
    with torch.no_grad():
        for i, (images, labels) in enumerate(tqdm(test_loader)):
            images = images.to(gpu)
            labels = labels.to(gpu)
            outputs = model(images)
            size += images.shape[0]
            correct += (outputs.argmax(1) == labels).type(torch.float).sum() 
    dist.reduce(size, 0, op=dist.ReduceOp.SUM) # 群体通信 reduce 操作 change to allreduce if Gloo
    dist.reduce(correct, 0, op=dist.ReduceOp.SUM) # 群体通信 reduce 操作 change to allreduce if Gloo
    if rank==0:
        print('Evaluate accuracy is {:.2f}'.format(correct / size))
 #################################################################################

def train(gpu, args):
    ...
    ####################################    N9    ###################################
    test_dataset = torchvision.datasets.MNIST(root='./data',                        #
                                               train=False,                         #
                                               transform=transforms.ToTensor(),     #
                                               download=True)                       #
    test_sampler = torch.utils.data.distributed.DistributedSampler(test_dataset)    #
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset,                 #
                                               batch_size=args.batch_size,               #
                                               shuffle=False,                       #
                                               num_workers=0,                       #
                                               pin_memory=True,                     #
                                               sampler=test_sampler)                #
    #################################################################################
    start = datetime.now()
    total_step = len(train_loader) # The number changes to orignal_length // args.world_size
    for epoch in range(args.epochs):
        ...
        #####################    N10    #################
        evaluate(model, gpu, test_loader, args.rank)    #
        #################################################
    ...        

省略了代码不变的部分,完整的程序见脚本

  • N9:增加验证集的DataLoader,设置sampler实现数据的并行切分
  • N10:在每个epoch结束前验证模型
  • N11: 利用群体通信Reduce操作,将计算准确率所需的正确预测数和全局样本数收集到rank0进程中

只需要利用群体通信将验证集样本数和预测正确的样本数汇集在rank0中即可实现并行的模型验证,对于其它任务也可以参考这个思路实现。例如图像语义分割中计算mIoU只需要将每个进程的混淆矩阵汇总相加到rank0即可。

一些可能遇到的问题

网络防火墙有可能在首次多机多卡训练时造成计算节点间的通信失败。单机多卡成功运行的代码在扩展至多机多卡遇到问题后可以首先尝试将init_method切换为Gloo,能够回避掉一些潜在的问题。记录一下本人在实践中遇到的问题和解决方法。

address family mismatch 错误

解决方案是手动设置通信的网络端口。机器的网络端口通过ifconfig命令查询,有多个网口时可以都尝试一下。

当backend==NCCL

# Node 0 
# terminal-0
export NCCL_SOCKET_IFNAME=eth0
python ...

# Node 1 : 
# terminal-0
export NCCL_SOCKET_IFNAME=eth0
python ...

当backend==Gloo

# Node 0 
# terminal-0
export GLOO_SOCKET_IFNAME=eth0
python ...

# Node 1 : 
# terminal-0
export GLOO_SOCKET_IFNAME=eth0
python ...

参考

  1. https://pytorch.org/docs/stable/distributed.html#choosing-the-network-interface-to-use
  2. https://pytorch.org/tutorials/beginner/dist_overview.html
  3. Li, S., Zhao, Y., Varma, R., Salpekar, O., Noordhuis, P., Li, T., … & Chintala, S. (2020). Pytorch distributed: Experiences on accelerating data parallel training. arXiv preprint arXiv:2006.15704.
  4. https://zhuanlan.zhihu.com/p/76638962
  5. https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html
  6. https://medium.com/huggingface/training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups-ec88c3e51255

torchrun-torch多机多卡分布式训练命令

https://pytorch.org/docs/stable/elastic/run.html

torchrun 提供了作为 torch.distributed.launch 的功能的超集,并具有以下附加功能:

  1. Worker failures are handled gracefully by restarting all workers.
    通过重新启动所有工作进程来优雅地处理工作进程故障。
  2. Worker RANK and WORLD_SIZE are assigned automatically.
    工作器 RANK 和 WORLD_SIZE 自动分配。
  3. Number of nodes is allowed to change between minimum and maximum sizes (elasticity).
    允许节点数量在最小和最大大小之间变化(弹性)。

torchrun 是setup.py中 entry_points 配置中声明的主模块torch.distributed.run的Python控制台脚本。它相当于调用 python -m torch.distributed.run 。

torchrun启动单机多卡DDP并行训练:

启动方式:

  • 使用 torchrun 命令来启动程序
  • torchrun –standalone –nproc_per_node=gpu XXX.py
  1. --standalone 代表单机运行
  2. --nproc_per_node=gpu 代表使用所有可用GPU。等于号后也可写gpu数量n,这样会使用前n个GPU

如果想要进一步指定要运行的 GPU,可以通过 CUDA_VISIBLE_DEVICES 设置GPU可见性,比如:

CUDA_VISIBLE_DEVICES=2,3 torchrun –standalone –nproc_per_node=gpu multi_gpu_torchrun.py

多机多卡

torchrun –nproc_per_node=4 –nnodes=3 –node_rank=0 –master_addr=192.168.0.101 –master_port=29500 test_mpi.py

1.指定每个节点(机器)上的进程数,这里是4个。意味着每个机器将启动4个进程来参与分布式训练。 –nproc_per_node=4 【一般设置为为节点GPU数量】

2.指定总共的节点数,这里是3个。意味着总共有3个机器参与分布式训练。

--nnodes=3

3.指定当前节点(机器)的排名,这里是0。排名从0开始,用于在分布式环境中区分不同的节点。

--node_rank=0 【0代表主节点】

4.指定主节点的IP地址,这里是192.168.0.101(更根据实际修改)。主节点用于协调分布式训练过程。

--master_addr=192.168.0.101

5.指定主节点的端口号,这里是29500。主节点使用指定的端口来与其他节点进行通信。

–master_port=29500

6.单机运行

--standalone