AnyGPT:具有离散序列建模的统一多模态大语言模型

code:  https://github.com/OpenMOSS/AnyGPT
paper: https://arxiv.org/abs/2402.12226
Demos: https://junzhan2000.github.io/AnyGPT.github.io/

AnyGPT,一种任意到任意的多模态语言模型,它利用离散表示来统一处理各种模态,包括语音、文本、图像和音乐。AnyGPT可以稳定地训练,而无需对当前的大型语言模型(LLM)架构或训练范式进行任何更改。相反,它完全依赖于数据级预处理,促进新模式无缝集成到LLMs中,类似于新语言的合并。基础模型将四种模态对齐,允许不同模态和文本之间的多式联运转换。此外,我们基于各种生成模型构建了AnyInstruct数据集,其中包含任意模态相互转换的指令。它由108k个多回合对话样本组成,这些样本错综复杂地交织在各种模态中,从而使模型能够处理多模态输入和输出的任意组合。经过此数据集的训练,我们的聊天模型可以进行免费的多模式对话,其中可以随意插入多模式数据。实验结果表明,AnyGPT能够促进任何到任何多模态对话,同时实现与所有模态的专用模型相当的性能,证明离散表示可以有效且方便地统一语言模型中的多个模态。

基本模型可以执行各种任务,包括文本到图像、图像字幕、自动语音识别(ASR)、零触发文本到语音(TTS)、文本到音乐和音乐字幕。我们可以按照特定的指令格式执行推理。

所有的模态都转换为离散token

提出了一个可以统一训练的综合框架。 如图1所示,该框架由三个主要组件组成:( 1 )多模态分词器,( 2 )作为主干的多模态语言模型,以及( 3 )多模态de-tokenizer。  分词器将连续的非文本模态转换为离散的标记,这些标记随后被排列成多模态交织序列。 然后,使用下一个标记预测训练目标通过语言模型训练序列。 在推理过程中,多模态令牌被解码回其原始表示的相关的多模态token。 为了丰富生成的质量,可以部署多模态增强模块来对生成的结果进行后处理,包括语音克隆或图像超分辨率等应用。 

AnyInstruct多模态交错指令数据合成过程:

生成108k多轮对话数据,每个对话都交错包含text、speech、image、music模态的数据

1.Generation of text-based conversations incorporating multimodal elements.

•脑暴100个meta topics覆盖大量的音视元素相关场景,再用GPT-4扩展至20000个特定主题 •提示LLM生成基于这些主题的具体对话场景,由于LLM在生成多模态元素时有局限,准备了一些样例尽量包含多个模态 •根据场景使用GPT-4生成多轮对话,多模态用详细的文字进行表述’

2.用DALL-E-3、MusicGen、微软TTS API根据用户提示和模型的文本回复生成对应模态的数据。

最终包含205k图片,503k语音,113k音乐

模型预训练数据:

为了实现从任何模态到任何其他模态的生成,至关重要的是要在这些模态之间保持良好的数据一致。不幸的是,这种数据非常稀缺。为了解决这一挑战,我们构建了一个以文本为中心的双峰对齐数据集。在这里,文本是作为一个重要的中介桥梁之间的差距差距各种形式。通过将不同的模态与语言模型中的文本模态对齐,我们的目标是实现所有模态之间的相互对齐。

预训练数据分布,按令牌计数分段,内部部分指示模态,中间部分详细说明数据类型,外部部分指定单个数据集。

Image & Text :我们利用来自LAION-2B的图像-文本对(Schuhmann 等人,2022)、LAION-COCO(lai,2022 b)、LAION-Aesthetics(lai,2022 a)和JouneyDB(Pan 等人,2023年)。LAION-2B提供了与来自网络的嘈杂替代文本配对的图像,而LAION-COCO代表了其中的600 M子集,由BLIP标题。我们通过过滤文本质量、图像纵横比和剪辑得分等来细化这些数据集,产生了3亿对的高质量语料库。为了提高整体图像生成的保真度,我们用高质量的LAION-Aesthetics子集和Midjourney的合成数据集ColonneyDB补充了我们的数据。

Speech & Text :我们收集了几个大规模的英语自动语音识别(ASR)数据集,Gigaspeech  (Chen et al., 2021), Common Voice (Ardila et al., 2020), and Multilingual LibriSpeech(MLS) (Pratap et al., 2020). 共同构成了57,000小时的语音文本对语料库,涵盖了各种各样的说话人,领域和录音环境。

Music&Text:从互联网上抓取了超过一百万个音乐视频,开始了广泛的数据收集过程。核心步骤涉及使用Spotify API将这些视频的标题与相应的歌曲进行匹配。随后,我们为每个音乐音频收集了一组全面的元数据,包括视频标题、描述、关键字、播放列表名称和Spotify歌词。该元数据被格式化为JSON并输入GPT-4处理。GPT-4作为智能字幕生成器的作用至关重要;它利用嘈杂的元数据提取有意义的信息,并将其简洁地概括为连贯的句子。这种方法使我们能够为大量的音乐音频生成高质量的文本字幕,有效地减少了数据集中幻觉的发生。

Tokenization

Image Tokenizer:

使用SEED tokenizer:

ViT encoder 224x224input 16x16patches

Causal Q-Former 32token

VQ Codebook Codebook大小8192

MLP 将视觉code解码至生成空间的embedding(与unCLIP-SD对齐)

UNet decoder复原图像

Speech Tokenizer:

采用具有残差矢量量化(RVQ)的编码器-解码器架构

•RVQ 8层,每层codebook大小1024

•每秒编码为50帧,10s即500×8 •量化器第1层捕捉语义信息,
2~8层编码副语言细节 •AnyGPT中使用LLM处理第1层语义token,
其余层由声音克隆模型提供?(VALLE中表示声音主要受第1层影响),因此Speech这块新增的token数是1024

Music Tokenizer :

对于音乐,我们采用Encodec(D’efossez 等人,2022),具有使用残差矢量量化(RVQ)量化的潜在空间的卷积自动编码器,作为音乐令牌化器。

•RVQ 4层,每层codebook2048 •每秒编码为50帧 •不同之处在于LLM会依次预测每帧4层的token,然后再下一帧

Language Model Backbone

为了将多模态离散表示合并到预先训练的LLMs中,我们使用新的模态特定令牌扩展词汇表,从而扩展相应的嵌入和预测层,新合并的参数随机初始化。 来自所有模态的标记联合收割机以形成新的词汇表,其中每个模态在语言模型内被训练以在共享的表征空间中对齐。

•统一的多模态语言模型:通过特定模态的tokenizer,可以将多模态数据压缩为离散的token序列,并使用LLM的next token prediction loss训练,天然地统一各种任务形式 • •使用LLaMA-2-7B作为backbone

Multimodal Generation:

为了高质量生成采用2阶段框架,语义信息建模和感知信息建模。LLM在语义层面生成经过融合对齐的内容,然后NAR模型在感知层面将语义token转换成高保真的多模态内容,达到效果与效率间的平衡

•Image
使用Diffusion Model解码SEED tokens生成图片

•Speech
使用SoundStorm从SpeechTokenizer的semantic tokens生成acoustic tokens(LLM生成第1层,SoundStorm生成2~7层?),再用SpeechTokenizer解码生成语音
具备Zero-shot的声音克隆功能

•Music
使用Encodec解码生成音频

Instruction Tuning

实验:

局限性和未来工作:

任意到任意多模式LLM基准测试:任何对任何多模态大型语言模型(LLMs)领域是一个新兴的研究领域。然而,缺乏一个专门的基准来评估模型在多个维度上的能力,以及减轻潜在风险,这是一个相当大的挑战。因此,必须制定一个全面的基准。

增强LLMs:尽管具有离散表示的多模态LLMs可以稳定地训练,但与单模态训练相比,观察到更高的损失,从而阻止了每种模态的最佳性能。改进多模态融合的潜在策略可能涉及扩展LLMs和标记器或采用混合专家(莫伊)架构,以更好地管理不同的数据并优化性能。

更好的Tokenizer:可以从各种角度来增强 Tokenizer ,包括采用上级码本训练方法,开发更具凝聚力的多模态表示,以及在各种模态中应用信息解纠缠。

更长的上下文:多模态内容,例如图像和音频,通常跨越广泛的序列。例如,AnyGPT将音乐建模限制在5秒,这大大限制了其音频输出的实际用途。此外,对于任何对任何多模态对话,扩展的上下文允许更多数量的会话交换,从而丰富交互的深度和复杂性。

AE、VAE、VQ-VAE、GAN、Diffusion-生成模型系列

深度生成模型应用于图像、音频、视频合成和自然语言处理等不同领域。随着深度学习技术的快速发展,近年来出现了不同的深度生成模型。这导致了越来越多的兴趣,比较和评估这些模型的性能和适用性,以不同的领域。在本文中,我们旨在提供深度生成模型的全面比较,包括扩散模型,生成对抗网络(GAN)和变分自动编码器(VAE)。我将回顾它们的基本原则、优点和缺点。我的目标是提供对这些模型之间的差异和相似性的清晰理解,以指导研究人员和实践者为他们的特定应用选择最合适的深度生成模型。

GAN学习生成类似于训练数据集的新数据。它由两个神经网络,一个生成器和一个判别器组成。生成器接受从正态分布中采样的随机值并生成合成样本,而判别器尝试区分真实的和生成的样本。生成器被训练为产生可以欺骗判别器的真实输出,而 判别器 被训练为正确区分真实的和生成的数据。图1的最上面一行显示了它的工作方案。

AE包含一个编码器和一个解码器。在训练时,输入图像x会被编码成一个较短的向量z,再被解码回另一幅长得差不多的图像。网络的学习目标是让重建出来的图像和原图像尽可能相似。注意该模型只能做图像压缩,无法完成图像的生成任务。AE的编码器编码出来的向量空间是不规整的。也就是说,解码器只认识经编码器编出来的向量,而不认识其他的向量。如果你把自己随机生成出来的向量输入给解码器,解码器是生成不出有意义的图片的。AE不能够随机生成图片,所以它不能很好地完成图像生成任务,只能起到把图像压缩的作用。

VAE由编码器和解码器组成。编码器将高维输入数据映射成低维表示,而解码器试图通过将该表示映射回其原始形式来重构原始高维输入数据。编码器通过预测平均值和标准偏差向量来输出潜码的正态分布作为低维表示。图1的中间行演示了它的工作。

扩散模型包括正向扩散和反向扩散过程。前向扩散是一种马尔可夫链,它逐渐向输入数据中添加噪声,直到获得白噪声。这不是一个可以学习的过程,通常需要1000个步骤。逆扩散过程的目的是将正向过程逐步反向,去除噪声以恢复原始数据。使用可训练的神经网络来实现反向扩散过程。图1的最下面一行显示了这一点。

AE 自编码器

自编码器是一种无监督学习技术,它使用神经网络来寻找给定数据分布的非线性潜在表示。神经网络由两部分组成:编码网络(encoder) z=f(x) 和解码网络(decoder) ^x=g(z) 。

从模型图中可以很明显的看出,对于AutoEncoder模型,采取的是无监督训练的方式,对于输入的x经过一个Encoder层后得到一个特征向量z,再将该向量z通过一个Decoder层得到最终输出x%20%E2%80%99,通过最小化重构模型的输入x和模型的输出x%E2%80%99的误差作为损失函数训练模型得到一个较好的关于输入x的特征向量z,模型设计的初衷的获得一个对应于源数据x的一个低维特征向量z,在获得此向量的基础上可以应用在很多分类任务上,但是AE模型并不适用于生成任务。

尽管AE已经可以获得较好的向量表示,在还原任务上可以做出较好的效果,但其并不是一个生成式模型,这是因为对于一个生成模型而言,他一般需要满足两个条件限制:

1.生成模型的编码器和解码器是可以分离开的;

2.对于固定维度下任意采样出的编码,解码器都能产生一张真实且清晰的图片。

AE模型并不满足第二点的条件,举个例子来说,对于输入的全月图和半月图,通过对AE模型的训练可以很好的完成还原任务,但是我们对于二者特征向量中取一个点,对于一个正常的生成模型而言,应该生成一个介于全月和半月之间的图片。然而,对于真实的AE而言,它生成的结果要么是乱码要么就是异常模糊的图片。为什么会发生这种情况呢?因为模型在训练的时候并没有显性对中间变量z的分布p(z)进行建模,在模型训练时所采用的f(z)是有限的,而对于z所处的空间存在大量f(z)外的点而言模型是并不理解的,如果像该例子随机在全月和半月中采样一个点,大概率得到不能够生成有效图片的点。

变分自编码器 (VAE)

https://zhuanlan.zhihu.com/p/34998569

在 VAE 中,encoder不再直接输出 隐向量z,而是输出 隐向量z的分布 的 均值μ 与 标准差σ,再从这个分布中采样得到 隐向量z。

AE 将输入encode成隐空间里的单个点,而 VAE 则是将输入encode成隐空间里的分布(distribution)。

从encode单个点变成encode一个分布,这样就不太容易有空隙(gap)了。但是,训练过程中,为了尽可能减小 reconstruction loss,模型会学成:均值μ 相差很远,标准差σ 很小。这样一来,VAE 就跟 普通的AE 差不多了,仍然容易有空隙。而我们希望学到的 隐空间Z 是连续、稠密、聚在一团而又能很好分开(distinct)不同标签的。

为了避免这个问题,就在损失函数中加入了KL loss。这样,loss = reconstruction loss + KL loss。 KL loss 是 方差为μ,标准差为σ的分布N(μ, σ²) 与 标准正态分布N(0, 1) 的 KL散度(KL散度可以衡量两个概率分布之间的距离)。加入了 KL loss 后,隐空间Z 就会呈正态分布聚在一团。

VQ-VAE:向量量化

量化自编码器

VQ-VAE 与 VAE 的主要区别是:VAE 想学到 连续(continuous)的隐空间,而 VQ-VAE 想学到 离散(discrete)的隐空间。

VQ-VAE 通过向网络添加离散的 codebook 组件来扩展标准自编码器。codebook 是与相应索引关联的向量列表。它用于量化自编码器的瓶颈;将编码器网络的输出与 codebook 的所有向量进行比较,并将欧氏距离最接近的 codebook 向量喂给解码器。

这个 argminargmin 操作有点令人担忧,因为它相对于编码器是不可微分的。但在实践中,可以通过这样的方式将 decoder 的梯度直接传递给 encoder (encoder 和 codebook 向量的梯度设置为1,其他 codebook 向量梯度设置为0)。

然后,解码器的任务是重构来自该量化矢量的输入,就像在标准自编码器公式中那样。

生成多个codes

当解码器只能接受一组 codebook 向量作为输入时,人们怎么能指望它产生大量多样化的图像呢?

我们需要为每个训练点提供一个唯一的离散值,以便能够重建所有数据。如果情况确实如此,那么模型难道不会通过将每个训练点映射到不同的离散 code 来记住数据吗?

如果编码器只输出一个矢量,这的确会成为问题,但在实际的 VQ-VAE 中,编码器通常会产生一系列矢量。例如,对于图像,编码器可能会输出一个32×32 的矢量网格,每个网格都被量化,然后将整个网格送到解码器。所有向量都被量化为相同的 codebook,因此离散值的数量不会改变,但是通过输出多个codes,我们能够成倍地增加解码器可以构造的数据点的数量。

例如,假设我们正在处理图像,我们有一个尺寸为512的密码本,我们的编码器输出一个 32×32 的矢量网格。在这种情况下,我们的解码器可以输出 51232×32=29216 个不同图像!

当然,模型仍然可以记住训练数据,但是通过编码器中嵌入正确的归纳偏差(即对图像使用conv-net)和使用正确的隐变量结构(即用于图像的 32×32 网格),模型应该能够学习到一个很好地表示数据的离散空间。

VQ-VAE使用了如下方式关联编码器的输出与解码器的输入:假设嵌入空间已经训练完毕,对于编码器的每个输出向量ze(x),找出它在嵌入空间里的最近邻zq(x),把ze(x)替换成zq(x)作为解码器的输入。

学习 Codebook

就像编码器和解码器网络一样,codebook 通过梯度下降来学习的。理想情况下,我们的编码器将输出一个接近学习到的 codebook 向量。这里本质上存在一个双向问题:学习与编码器输出对齐的 codebook 向量和学习与codebook 向量对齐的编码器输出。

这两个问题可以通过向损失函数添加项来解决。整个VQ-VAE 损失函数是:

在这里,我们使用与上一节中相同的符号,sg[x]sg[x] 代表“停止梯度”。

第一项是标准的重构损失;第二项是 codebook 对齐损失,其目标是使所选的 codebook 矢量尽可能接近编码器输出。编码器输出有一个停止梯度运算符,因为这项仅用于更新 codebook。第三项与第二项类似,但它将停止梯度放在 codebook 向量上,因为它旨在更新编码器输出,让其尽可能接近 codebook 向量。这项称为codebook 损失,其对总体损失的重要性由超参数 ββ 调整。当然,如果有多个,则最后两项在模型的每个量化向量输出上取平均值。

这个损失函数基本上完成了我们对 VQ-VAE 的描述。

这样,我们可以完整地训练一个 VQ-VAE,能够重构一组不同的图像,这些图像与下图中的原始图像不同。我们还可以训练 VQ-VAE 来重构其他模态,如音频或视频。

优化编码器和解码器

为了优化编码器和解码器,我们先来制订一下VQ-VAE的整体优化目标。由于VQ-VAE其实是一个AE,误差函数里应该只有原图像和目标图像的重建误差。

VQ-VAE使用了一种叫做”straight-through estimator”的技术来完成梯度复制。这种技术是说,前向传播和反向传播的计算可以不对应。你可以为一个运算随意设计求梯度的方法。基于这一技术,VQ-VAE使用了一种叫做sg(stop gradient,停止梯度)的运算:

也就是说,前向传播时,sg里的值不变;反向传播时,sg按值为0求导,即此次计算无梯度。(反向传播其实不会用到式子的值,只会用到式子的梯度。反向传播用到的loss值是在前向传播中算的)。

GAN

GAN(Generative Adversarial Network)是一种由生成器和判别器组成的对抗性模型。生成器试图生成与真实数据相似的数据,而判别器则试图区分真实数据和生成器生成的数据。两个模型通过对抗训练来提高自己的性能,最终生成器可以生成高质量的数据。

Diffusion

Diffusion Models是一种基于概率的生成模型,它通过模拟数据的扩散过程来生成新的数据。这种模型的核心思想是将已有的数据视为初始状态,然后通过不断迭代的扩散过程,逐步生成新的数据。Diffusion Models的优势在于其理论基础较为严密,其背后的数学推导和概率理论较为深入。这使得Diffusion Models在一些特定的数据生成任务上具有较好的表现。

相比GAN来说,扩散模型训练更稳定,而且能够生成更多样的样本,OpenAI的论文Diffusion Models Beat GANs on Image Synthesis也证明了扩散模型能够超越GAN。简单来说,扩散模型包含两个过程:前向扩散过程反向生成过程,前向扩散过程是对一张图像逐渐添加高斯噪音直至变成随机噪音,而反向生成过程是去噪音过程,我们将从一个随机噪音开始逐渐去噪音直至生成一张图像,这也是我们要求解或者训练的部分。

扩散模型包括两个过程:前向过程(forward process)反向过程(reverse process),其中前向过程又称为扩散过程(diffusion process),如下图所示。无论是前向过程还是反向过程都是一个参数化的马尔可夫链(Markov chain),其中反向过程可以用来生成数据,这里我们将通过变分推断来进行建模和求解。

模型设计:

可以选择采用AutoEncoder架构来作为噪音预测模型。DDPM所采用的模型是一个基于residual block和attention block的U-Net模型。如下所示:

U-Net属于encoder-decoder架构,其中encoder分成不同的stages,每个stage都包含下采样模块来降低特征的空间大小(H和W),然后decoder和encoder相反,是将encoder压缩的特征逐渐恢复。U-Net在decoder模块中还引入了skip connection,即concat了encoder中间得到的同维度特征,这有利于网络优化。DDPM所采用的U-Net每个stage包含2个residual block,而且部分stage还加入了self-attention模块增加网络的全局建模能力。 另外,扩散模型其实需要的是T个噪音预测模型,实际处理时,我们可以增加一个time embedding(类似transformer中的position embedding)来将timestep编码到网络中,从而只需要训练一个共享的U-Net模型。具体地,DDPM在各个residual block都引入了time embedding,如上图所示。

ChatTTS

https://github.com/2noise/ChatTTS/blob/main/docs/cn/README.md
https://2noise.com/
https://github.com/libukai/Awesome-ChatTTS
  1. 对话式 TTS: ChatTTS 针对对话式任务进行了优化,能够实现自然且富有表现力的合成语音。它支持多个说话者,便于生成互动式对话。
  2. 精细的控制: 该模型可以预测和控制精细的韵律特征,包括笑声、停顿和插入语。
  3. 更好的韵律: ChatTTS 在韵律方面超越了大多数开源 TTS 模型。我们提供预训练模型以支持进一步的研究和开发。

数据集和模型

  • 主模型使用了 100,000+ 小时的中文和英文音频数据进行训练。
  • HuggingFace 上的开源版本是一个在 40,000 小时数据上进行无监督微调的预训练模型。
###################################
# Sample a speaker from Gaussian.

rand_spk = chat.sample_random_speaker()
print(rand_spk) # save it for later timbre recovery

params_infer_code = ChatTTS.Chat.InferCodeParams(
    spk_emb = rand_spk, # add sampled speaker 
    temperature = .3,   # using custom temperature
    top_P = 0.7,        # top P decode
    top_K = 20,         # top K decode
)

###################################
# For sentence level manual control.

# use oral_(0-9), laugh_(0-2), break_(0-7) 
# to generate special token in text to synthesize.
params_refine_text = ChatTTS.Chat.RefineTextParams(
    prompt='[oral_2][laugh_0][break_6]',
)

wavs = chat.infer(
    texts,
    params_refine_text=params_refine_text,
    params_infer_code=params_infer_code,
)

###################################
# For word level manual control.

text = 'What is [uv_break]your favorite english food?[laugh][lbreak]'
wavs = chat.infer(text, skip_refine_text=True, params_refine_text=params_refine_text,  params_infer_code=params_infer_code)
torchaudio.save("output2.wav", torch.from_numpy(wavs[0]), 24000)

模型组成:LlamaModel、DVAE(VQVAE)、VOCOS声码器

文本控制

  • 1. Input Text : 需要转换的文本,支持中文和英文混杂
  • 2. Refine text : 是否对文本进行口语化处理
  • 3. Text Seed : 配置文本种子值,不同种子对应不同口语化风格
  • 4. 🎲 : 随机产生文本种子值
  • 5. Output Text : 口语化处理后生成的文本

音色控制

  • 6. Timbre : 预设的音色种子值
  • 7. Audio Seed : 配置音色种子值,不同种子对应不同音色
  • 8. 🎲 : 随机产生音色种子值
  • 9. Speaker Embedding : 音色码,详见 音色控制

情感控制

  • 10. temperate : 控制音频情感波动性,范围为 0-1,数字越大,波动性越大
  • 11. top_P :控制音频的情感相关性,范围为 0.1-0.9,数字越大,相关性越高
  • 12. top_K :控制音频的情感相似性,范围为 1-20,数字越小,相似性越高

系数控制

  • 13. DVAE Coefficient : 模型系数码
  • 14. Reload : 重新加载模型系数

播放控制

  • 15. Auto Play : 是否在生成音频后自动播放
  • 16. Stream Mode : 是否启用流式输出
  • 17. Generate : 点击生成音频文件
  • 18. Output Audio : 音频生成结果
  • 19. ↓ : 点击下载音频文件
  • 20. ▶️ : 点击播放音频文件

示例控制

  • 21. Example : 点击切换示例配置

快速体验

网址类型
Original Web原版网页版体验
Forge WebForge 增强版体验
LinuxPython 安装包
Samples音色种子示例
Cloning音色克隆体验

热门分支

功能增强

项目Star亮点
jianchang512/ChatTTS-ui提供 API 接口,可在第三方应用中调用
6drf21e/ChatTTS_colab提供流式输出,支持长音频生成和分角色阅读
lenML/ChatTTS-Forge提供人声增强和背景降噪,可使用附加提示词
CCmahua/ChatTTS-Enhanced支持文件批量处理,以及导出 SRT 文件
HKoon/ChatTTS-OpenVoice配合 OpenVoice 进行声音克隆

功能扩展

项目Star亮点
6drf21e/ChatTTS_Speaker音色角色打标与稳定性评估
AIFSH/ComfyUI-ChatTTSComfyUi 版本,可作为工作流节点引入
MaterialShadow/ChatTTS-manager提供了音色管理系统和 WebUI 界面

Deepseed 深度学习优化库综述

https://github.com/microsoft/DeepSpeed

教程文档:https://www.deepspeed.ai/

DeepSpeed是一个由微软开发的开源深度学习优化库,旨在提高大规模模型训练的效率和可扩展性。它通过多种技术手段来加速训练,包括模型并行化、梯度累积、动态精度缩放、本地模式混合精度等。DeepSpeed还提供了一些辅助工具,如分布式训练管理、内存优化和模型压缩等,以帮助开发者更好地管理和优化大规模深度学习训练任务。此外,deepspeed基于pytorch构建,只需要简单修改即可迁移。DeepSpeed已经在许多大规模深度学习项目中得到了应用,包括语言模型、图像分类、目标检测等等。

DeepSpeed有四大创新支柱:

DeepSpeed提供了一系列系统创新,使大规模DL培训变得有效和高效,大大提高了易用性,并在可能的规模方面重新定义了DL培训景观。这些创新,如ZeRO,3D-Mecelism,DeepSpeed-MoE,ZeRO-Infinity等都属于DeepSpeed-Training支柱。了解更多:DeepSpeed-Training

DeepSpeed汇集了张量、流水线、专家和ZeRO-parallelism等并行技术的创新,并将其与高性能自定义推理内核、通信优化和异构内存技术相结合,以前所未有的规模实现推理,同时实现无与伦比的延迟、吞吐量和成本降低。这种推理系统技术的系统组成DeepSpeed-Inference。了解更多:DeepSpeed-Inference

为了进一步提高推理效率,DeepSpeed为研究人员和从业者提供了易于使用和灵活组合的压缩技术,以压缩他们的模型,同时提供更快的速度,更小的模型大小,并显着降低压缩成本。此外,SoTA在压缩方面的创新,如ZeroQuant和XTC,都包含在DeepSpeed-Compression支柱下。了解更多:DeepSpeed-Compression

DeepSpeed库将DeepSpeed训练、推理和压缩支柱中的创新和技术实现并打包到一个易于使用的开源存储库中。它允许在单个训练,推理或压缩管道中轻松组合多种功能。DeepSpeed库被DL社区广泛采用,并已用于启用一些最强大的模型(请参阅DeepSpeed采用)。

Model Implementations for Inference(MII)是一个开源的存储库,通过减轻应用复杂系统优化技术的需求,使所有数据科学家都可以访问低延迟和高吞吐量的推理。开箱即用,MII为数千种广泛使用的DL模型提供支持,使用DeepSpeed-Inference进行优化,可以使用几行代码进行部署,同时与其香草开源版本相比,实现了显着的延迟减少。

DeepSpeed已与几种不同的流行开源DL框架集成,例如:

https://huggingface.co/docs/transformers/deepspeed

https://www.zhihu.com/people/deepspeed

FunAudioLLM:人类和LLMs之间自然交互的语音理解和生成基础模型

https://funaudiollm.github.io/  [阿里团队]
arxiv.org/abs/2407.04051

[Paper] [Code] [Modelscope:SenseVoiceCosyVoice] [HuggingFace: SenseVoice]

我们介绍FunAudioLLM,本报告介绍了FunAudioLLM,这是一个旨在增强人类与大型语言模型之间的自然语音交互的框架(LLMs)。其核心是两个创新模型:SenseVoice用于高精度多语言语音识别,情感识别和音频事件检测;CosyVoice用于多语言,音色和情感控制的自然语音生成。SenseVoice具有极低的延迟并支持50多种语言,而CosyVoice在多语言语音生成、零触发语音生成、跨语言语音克隆和指令遵循功能方面表现出色。与SenseVoice和CosyVoice相关的模型已经在Modelscope和Huggingface上开源,沿着相应的训练,推理和微调代码发布在GitHub上。 通过将这些模型与LLMs集成,FunAudioLLM可以实现语音翻译、情感语音聊天、交互式播客和富有表现力的有声读物叙述等应用,从而推动语音交互技术的发展。

SenseVoice支持多语言语音识别,其训练时间超过30万小时。具体来说,SenseVoice-Small在推理方面非常高效,其中识别延迟小于80 ms,分别比Whisper-Small和Whisper-large快5倍和15倍以上,SenseVoice-Large支持50多种语言的高精度ASR。此外,SenseVoice支持丰富的转录,包括最先进的情感识别、音频事件检测、反向文本标准化(Pusateri 等人,2017)和标点符号(Chen 等人,2020年)。

语音生成模型,CosyVoice,可以生成多语言的语音,这是超过17万小时和五种语言,包括中文(ZH),英语(EN),日语(JP),广东话(Yue)和韩语(KO)的训练。CosyVoice生成的样本可以实现低于2%的WER和超过75%的说话人相似度,达到人类平价的质量水平。CosyVoice支持零样本上下文学习,这使得语音克隆只需要3秒的提示语音。音色、情感、韵律和风格可以在语言内部或跨语言复制。我们还发布了一个指令模型,它可以控制说话者身份,说话风格(例如,情感)和其他具有自然纹理指令的细粒度语言特征。

Speech-to-Speech Translation:

通过集成SenseVoice、LLMs和CosyVoice,我们可以毫不费力地执行语音到语音翻译(S2ST)。

Emotional VoiceChat 情感语音聊天:

通过集成SenseVoice、LLMs和CosyVoice,我们可以开发一个情感语音聊天应用程序。

Interactive Podcast 互动播客:通过集成SenseVoice,一个基于LLM的多智能体系统,具有实时世界知识,以及CosyVoice,我们可以创建一个交互式播客。

有声书:通过LLMs的分析能力来构建和识别书籍中的情感,并将其与CosyVoice合成,我们实现了具有增强表现力的有声读物。

CosyVoice:

在推理阶段概述CosyVoice模型。概括地说,CosyVoice包括一个自回归Transformer,用于为输入文本生成相应的语音标记,一个基于ODE的扩散模型,流匹配,用于从生成的语音标记重建Mel频谱,以及一个基于HiFiGAN的声码器,用于合成波形。虚线模块在特定模型用途中是可选的,例如跨语言、SFT推理等。[论文]

图1:所提出的CosyVoice模型的概述。(a)演示了 𝒮3
标记器,其中虚线模块仅在训练阶段使用。(b)是CosyVoice的示意图,由文本到令牌LLM和令牌到语音流匹配模型组成。 S、E和T表示“start of sequence”、“end of sequence”和“turn of speech”标记。虚线表示推理阶段的自回归解码。(c)提供了我们的流匹配模型的放大视图,该模型以概率密度路径上的时间步长 t处的说话者嵌入 𝐯、语义标记 μ、掩蔽语音特征 X~和中间状态 Xt为条件。
图2:(a)零触发上下文学习和(B)跨语言语音克隆的序列构建。LID表示目标语言标识符。

CosyVoice由四个组件组成,即文本编码器、语音分词器[ speech tokenizer]、大语言模型和条件流匹配模型。具体地说,文本编码器用于对齐文本和语音token的语义空间,而  speech tokenizer 用于提取语义记号,如图1(a)所示。我们采用一个大的语言模型来学习整个文本编码和语音标记序列,将TTS重新表述为一个给定文本作为提示的自回归序列生成问题。然后,如图1(c)所示,利用条件流匹配模型,通过最佳路径上的去噪过程将语音令牌转换为Mel频谱图 。2020)用于以所生成的Mel频谱图作为输入来合成波形。

语音的受监督语义令牌 [ speech tokenizer] :

采用有监督的自动语音识别(ASR)模型来导出用于语音的有监督的语义语音( 𝒮3 )分词器。该模型是我们专有的SenseVoice ASR模型的微调版本。它接受了多语言音频数据的训练,并具有丰富的音频内容理解能力。 与原始ASR模型不同,我们将编码器分为两部分,并在它们之间插入矢量量化层。给定Mel频谱图 X 作为输入,其经历位置编码和 Encoder1 以获得上下文感知表示 H :

然后,一个矢量量化器(VQ)参与获得离散令牌。 对于帧 l 处的隐藏表示 𝐡l ,码本 C 中的最近嵌入的索引被视为该时间步处的语音令牌 μl :

语音令牌的对应码本嵌入被用作量化的隐藏表示 H¯={𝐜μ1,𝐜μ2,…,𝐜μL} ,并通过剩余的编码器层 Encoder2 :

 在 Encoder2 之后,接下来是基于transformer的ASR解码器,预测文本标签的后验概率:

TTS的大型语言模型:

我们将TTS任务表述为具有大型语言模型的自回归语音令牌生成问题(LLM)。对于LLM,序列构建是最重要的事项,其构建如下:

S and E denote the start and end of sequence, respectively.T is “turn of speech” tokens. 𝐯 is a speaker embedding vector extracted from the speech X with a pre-trained voice-print model2. The text encodings Y¯={𝐲¯u}u⁣∈⁣[1:U] is obtained by passing the text through a Byte Pair Encoded (BPE) tokenizer and text encoder:

由于文本和语音标记位于不同的语义层,因此文本编码器用于对齐它们的语义空间并有利于LLM建模。 在文本编码和语音标记 {μl}l⁣∈⁣[1:L] 之间插入开始标识符T报告问题,语音标记 {μl}l⁣∈⁣[1:L] 是用2.1中描述的监督语义标记器提取的。在训练阶段,我们采用教师强迫方案,其中左移序列作为模式输入,原始序列作为期望输出。 注意,在训练期间仅考虑语音标记的交叉熵损失和:

Optimal-transport Conditional Flow Matching:

在CosyVoice中,采用最优传输条件流匹配模型(OT-CFM)来学习Mel谱图的分布,并以生成的语音令牌为条件从其生成样本。 与扩散概率模型(DPM)相比,OT-CFM可以实现更好的性能,具有更简单的梯度,更容易的训练和更快的生成.

  在连续时间归一化流(CNF)中,从先验分布 p0⁢(X) 到Mel谱图 q⁢(X) 的数据分布构造概率密度路径。 概率密度路径由依赖于时间的矢量场 νt⁢(X):[0,1]×ℝL∗D→ℝL∗D 定义,其通过以下常微分方程(ODE)生成流 ϕt :

流匹配模型(The flow matching model)用于估计条件概率 P(S|X, v, Sref)。其中,X 和 v 分别表示语音片段和说话人嵌入,S 和 Sref 分别表示目标和参考语音的梅尔频谱。该模型使用卷积 Transformer U-Net 来确定最优传输 ODE 中先验分布与目标分布之间的矢量场。在推理阶段,只需五到十次迭代即可生成令人满意的梅尔频谱图。此外,还采用无分类器指导技术,通过屏蔽 70% 到 100% 的前置特征条件来增强上下文学习能力。

在从预测的梅尔频谱图合成波形时,我们使用改进的 HiFTNet 声码器,以支持流式生成。

可以实现的任务:

(a)零触发上下文学习和(B)跨语言语音克隆的序列构建。LID表示语言标识符。

Multi-lingual Voice Generation 【多语言的语音合成】

Zero-shot In-context Generation 零样本上下文生成

CosyVoice模型具有零触发的上下文学习能力,允许仅用简短的参考语音样本复制任意语音。这个过程需要仔细构造令牌语言模型(LM)的输入序列,如图2所示。 对于同一语言的提示语音和输入文本,我们将它们合并成一个统一的输入,将提示语音标记视为预生成的。利用该输入序列,自回归LM迭代地预测后续令牌,直到其遇到针对前一元素的“序列结束”令牌E。 然而,当提示语音和输入文本在语言上不同时,我们省略与提示相关联的文本和标记,以防止原始语言的韵律特征影响目标语言。 重要的是要注意,提示文本(对应于提示语音的内容)可以通过人工注释或ASR模型(如SenseVoice)转录。与提示文本类似,提示令牌是使用 𝒮3 tokenizer从提示语音中提取的。在生成语音标记之后,它们被附加在提示标记之后,形成流匹配模型的复合条件。此外,说话人嵌入和梅尔声谱图的提示语音,以进一步提高音色和环境的一致性。

Instructed Voice Generation指令语音生成:

Speaker Identity Control、细粒度控制、Style Control、情感丰富的声音生成、Speaker Fine-tune、Speaker Interpolation

为了进一步实现对CosyVoice的可控性,我们尝试集成额外的指令微调(Ji 等人,2023年)。CosyVoice-instruct扩展了CosyVoice-base,具有增强的后续功能。具体地说,它支持对诸如说话人身份(即,说话者的特征)、说话风格(包括情感、性别、语速和音调)以及细粒度的副语言特征。这些功能包括插入笑声、呼吸、边笑边说以及强调某些单词的能力。

SenseVoice:

SenseVoice 是具有音频理解能力的音频基础模型,包括语音识别(ASR)、语种识别(LID)、语音情感识别(SER)和声学事件分类(AEC)或声学事件检测(AED)。提出了具有不同大小和架构的两个模型以适应不同的要求:SenseVoice-Small,用于快速语音理解的仅编码器语音基础模型,以及SenseVoice-Large,编码器-解码器(Vaswani 等人,2017)语音基础模型,用于更准确的语音理解,支持更多语言。

SenseVoice模型概述。SenseVoice是一个语音基础模型,具有多种语音理解功能,包括ASR、LID、SER和AED。SenseVoice-Small是一种用于快速语音理解的仅编码器语音基础模型,SenseVoice-Large是一种编码器-解码器语音基础模型,用于更准确的语音理解,支持更多语言。

𝐞LID 、 𝐞SER 、 𝐞AEC 、 𝐞ITN/NoITN 是四个特殊标记的嵌入:

⟨LID⟩ 表示LID任务。如果 ⟨LID⟩ 被放置 ,则模型被训练以预测输出的对应位置处的语言标记。 在训练阶段,我们根据概率0.8用真实语言标记随机替换 ⟨LID⟩ ,以便模型可以预测语言标记,或者在推理阶段配置指定的语言标记。

⟨SER⟩ 表示SER任务。如果 ⟨SER⟩ 被放置,则训练模型以预测输出的对应位置处的语音情感标签。

⟨AEC⟩ 表示AEC任务。如果 ⟨AEC⟩ 被放置 ,则模型被训练以预测输出的对应位置处的音频事件标签。

⟨ITN⟩ 或 ⟨NoITN⟩ 指定转录样式。如果提供了 ⟨ITN⟩ ,则模型被训练为使用反向文本规范化(ITN)和标点符号进行转录。如果提供了 ⟨NoITN⟩ ,则模型被训练为在没有ITN和标点符号的情况下转录。

在训练阶段,利用交叉熵损失对LID、SER和AEC任务进行优化。ASR任务使用CTC损失来优化

SenseVoice-Large是一个自回归编码器-解码器模型,用于多语言ASR和多语音理解任务。与Whisper类似(拉德福 等人,2023),SenseVoice-Large通过解码器的输入令牌序列来指定任务。具体来说,我们通过分别包括 ⟨LID⟩ 、 ⟨SER⟩ 、 ⟨AED⟩ 令牌来指定是否预测具有时间戳的语言、语音情感和音频事件。与SenseVoice-Small相比,SenseVoice-Large的优势在于转录准确性和支持大量语言(50+)。

SenseVoice 专注于高精度多语言语音识别、情感辨识和音频事件检测

  • 多语言识别: 采用超过 40 万小时数据训练,支持超过 50 种语言,识别效果上优于 Whisper 模型。
  • 富文本识别:
    • 具备优秀的情感识别,能够在测试数据上达到和超过目前最佳情感识别模型的效果。
    • 支持声音事件检测能力,支持音乐、掌声、笑声、哭声、咳嗽、喷嚏等多种常见人机交互事件进行检测。
  • 高效推理: SenseVoice-Small 模型采用非自回归端到端框架,推理延迟极低,10s 音频推理仅耗时 70ms,15 倍优于 Whisper-Large。
  • 微调定制: 具备便捷的微调脚本与策略,方便用户根据业务场景修复长尾样本问题。
  • 服务部署: 具有完整的服务部署链路,支持多并发请求,支持客户端语言有,python、c++、html、java 与 c# 等。

推理效率:

表1. 比较了SenseVoice、Paraformer和Whisper的模型结构、参数规模、支持的语言和推理效率。SenseVoice-small采用非自回归架构,与Whisper相比,它在推理效率方面具有显着优势。

性能评测:

我们在开源基准数据集(包括AISHELL-1、AISHELL-2、Wenetspeech、Librisepeech和Common Voice)上比较了SenseVoice和Whisper的多语言识别性能和推理效率。使用A800机器进行推理效率评估。SenseVoice-small采用非自回归端到端架构,推理延迟极低-与Whisper-small相比快7倍,与Whisper-large相比快17倍。

1、文本识别:[在开源基准数据集(包括 AISHELL-1、AISHELL-2、Wenetspeech、Librispeech 和 Common Voice)上比较了 SenseVoice 与 Whisper 的多语言语音识别性能和推理效率。在中文和粤语识别效果上,SenseVoice-Small 模型具有明显的效果优势。]

2、情感识别

SenseVoice也可以用于离散情感识别。支持快乐、悲伤、愤怒和中立。我们在7个流行的情感识别数据集上对其进行了评估。SenseVoice-Large可以在大多数数据集上接近或超过SOTA结果,即使没有目标语料库微调。

由于目前缺乏被广泛使用的情感识别测试指标和方法,我们在多个测试集的多种指标进行测试,并与近年来 Benchmark 上的多个结果进行了全面的对比。所选取的测试集同时包含中文 / 英文两种语言以及表演、影视剧、自然对话等多种风格的数据,在不进行目标数据微调的前提下,SenseVoice 能够在测试数据上达到和超过目前最佳情感识别模型的效果。

我们还在测试集上对多个开源情感识别模型进行对比,结果表明,SenseVoice-Large 模型可以在几乎所有数据上都达到了最佳效果,而 SenseVoice-Small 模型同样可以在多数数据集上取得超越其他开源模型的效果。

3、事件检测

SenseVoice-Small和SenseVoice-Large模型都可以检测到语音中的音频事件,包括音乐、掌声、笑声。SenseVoice-Large可以预测音频事件的开始和结束位置,而SenseVoice Small只能预测音频中发生了什么(只有一个事件),但是,它可以检测更多的事件,例如在人机交互过程中可能发生的咳嗽,打喷嚏,呼吸和哭泣。

尽管 SenseVoice 只在语音数据上进行训练,它仍然可以作为事件检测模型进行单独使用。我们在环境音分类 ESC-50 数据集上与目前业内广泛使用的 BEATS 与 PANN 模型的效果进行了对比。SenseVoice 模型能够在这些任务上取得较好的效果,但受限于训练数据与训练方式,其事件分类效果专业的事件检测模型相比仍然有一定的差距。

限制:

1、SenseVoice有一些需要解决的局限性。首先,对于资源不足的语言,ASR性能通常要低得多。其次,SenseVoice不是为流式转录而设计的。因此,未来的工作可能会集中在开发基于SenseVoice的流式语音理解模型。

2、CosyVoice也有一些限制。首先,它支持的语言数量有限。虽然它可以根据明确的指令表达情感和说话风格,但它不能根据文本的语义内容推断出适当的情感或风格。此外,CosyVoice在唱歌时表现不佳。在保持声音原有音色的同时,实现富有表现力的情感变化仍有改进的空间。

3、另一个限制是FunAudioLLM中的两个创新模型没有使用LLMs进行端到端的训练。这种流水线方法可能会引入错误传播,这可能会影响整体性能。

sensevoice 推理代码:

    def inference(
        self,
        data_in,
        data_lengths=None,
        key: list = ["wav_file_tmp_name"],
        tokenizer=None,
        frontend=None,
        **kwargs,
    ):


        meta_data = {}
        if (
            isinstance(data_in, torch.Tensor) and kwargs.get("data_type", "sound") == "fbank"
        ):  # fbank
            speech, speech_lengths = data_in, data_lengths
            if len(speech.shape) < 3:
                speech = speech[None, :, :]
            if speech_lengths is None:
                speech_lengths = speech.shape[1]
        else:
            # extract fbank feats
            time1 = time.perf_counter()
            audio_sample_list = load_audio_text_image_video(
                data_in,
                fs=frontend.fs,
                audio_fs=kwargs.get("fs", 16000),
                data_type=kwargs.get("data_type", "sound"),
                tokenizer=tokenizer,
            )
            # print(audio_sample_list)
            time2 = time.perf_counter()
            meta_data["load_data"] = f"{time2 - time1:0.3f}"
            speech, speech_lengths = extract_fbank(
                audio_sample_list, data_type=kwargs.get("data_type", "sound"), frontend=frontend
            )
            time3 = time.perf_counter()
            meta_data["extract_feat"] = f"{time3 - time2:0.3f}"
            meta_data["batch_data_time"] = (
                speech_lengths.sum().item() * frontend.frame_shift * frontend.lfr_n / 1000
            )

        speech = speech.to(device=kwargs["device"])
        speech_lengths = speech_lengths.to(device=kwargs["device"])
        print("speech", speech.shape, speech_lengths)
        language = kwargs.get("language", "auto")
        language_query = self.embed(
            torch.LongTensor(
                [[self.lid_dict[language] if language in self.lid_dict else 0]]
            ).to(speech.device)
        ).repeat(speech.size(0), 1, 1)
        print("language_query", language_query.shape)
        use_itn = kwargs.get("use_itn", False)
        textnorm = kwargs.get("text_norm", None)
        if textnorm is None:
            textnorm = "withitn" if use_itn else "woitn"
        textnorm_query = self.embed(
            torch.LongTensor([[self.textnorm_dict[textnorm]]]).to(speech.device)
        ).repeat(speech.size(0), 1, 1)
        print("textnorm_query", textnorm_query.shape)
        speech = torch.cat((textnorm_query, speech), dim=1)
        speech_lengths += 1
        print("speech_add_textnorm", speech.shape, speech_lengths)
        event_emo_query = self.embed(torch.LongTensor([[1, 2]]).to(speech.device)).repeat(
            speech.size(0), 1, 1
        )
        print("event_emo_query", event_emo_query.shape)
        input_query = torch.cat((language_query, event_emo_query), dim=1)
        print("input_query", input_query.shape)
        speech = torch.cat((input_query, speech), dim=1)
        speech_lengths += 3
        print("speech_final", speech.shape, speech_lengths)

        # Encoder
        encoder_out, encoder_out_lens = self.encoder(speech, speech_lengths)
        print("encoder_out", encoder_out.shape, encoder_out_lens)
        if isinstance(encoder_out, tuple):
            encoder_out = encoder_out[0]

        # c. Passed the encoder result and the beam search

        # 束搜索和CTC解码
        ctc_logits = self.ctc.log_softmax(encoder_out)

        results = []
        b, n, d = encoder_out.size()
        if isinstance(key[0], (list, tuple)):
            key = key[0]
        if len(key) < b:
            key = key * b
        for i in range(b):

#对每个 batch 样本提取 CTC logits 输出的前 encoder_out_lens[i] 帧。
#使用 argmax 找到每个时间步概率最大的类别 ID (yseq)。
#使用 torch.unique_consecutive 去除连续的重复类别 ID(CTC 解码中的常见步骤,用于去除重复的符号)。
            x = ctc_logits[i, : encoder_out_lens[i].item(), :]
            yseq = x.argmax(dim=-1)
            yseq = torch.unique_consecutive(yseq, dim=-1) # 使用 torch.unique_consecutive 去除连续的重复类别 ID(CTC 解码中的常见步骤,用于去除重复的符号)

            ibest_writer = None
            if kwargs.get("output_dir") is not None:
                if not hasattr(self, "writer"):
                    self.writer = DatadirWriter(kwargs.get("output_dir"))
                ibest_writer = self.writer[f"1best_recog"]

#使用 mask 去掉 CTC 解码中的 blank ID。
#将整数 ID 列表转化为对应的字符或单词(通过 tokenizer.decode)。
            mask = yseq != self.blank_id
            token_int = yseq[mask].tolist()

            # Change integer-ids to tokens
            text = tokenizer.decode(token_int)

            result_i = {"key": key[i], "text": text}
            results.append(result_i)

            if ibest_writer is not None:
                ibest_writer["text"][key[i]] = text

        return results, meta_data

CTC使用blank id来对齐不同长度的输入和输出:

  • 在语音识别等任务中,输入的语音帧数往往远多于输出的字符数。CTC 通过引入 blank ID 来解决这个问题,使模型能够生成对齐(alignment),从而允许输入长度大于输出长度。
  • blank 用来表示在某个时间步模型没有输出任何字符,或者保持上一个字符的状态不变。

去除重复和冗余:

  • 语音帧与字符之间的对齐并不是一一对应的,CTC 会允许模型在多个时间步中输出相同的字符,同时在其他时间步输出 blank
  • 解码过程中,当遇到连续的相同字符时,只保留第一个字符,忽略重复出现的字符和 blank,这帮助去除冗余。
  • 例如,模型输出可能是 [a, blank, blank, a, a, blank, t, blank, blank],最终解码结果会变为 "a, t"

NCCL–多卡训练后端[持续补充]

本文主要记录和学习pytorch后端NCCL相关的知识点,为后续大模型训练打好基础

https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/overview.html

https://developer.nvidia.com/nccl

NCCL” 代表 “NVIDIA Collective Communications Library”,”NVIDIA 集体通信库“,它是一种由 NVIDIA 开发的用于高性能计算通信库。NCCL 专门设计用于加速 GPU 群集之间的通信,以便在并行计算深度学习等领域中提供更好的性能。

NVIDIA 集合通信库 (NCCL) 可实现针对 NVIDIA GPU 和网络进行性能优化的多 GPU 和多节点通信基元。NCCL 提供了 all-gather、all-reduce、broadcast、reduce、reduce-scatter、point-to-point send 和 receive 等例程,这些例程均经过优化,可通过节点内的 PCIe 和 NVLink 高速互联以及节点间的 NVIDIA Mellanox 网络实现高带宽和低延迟。

NCCL相关环境变量说明 :

【https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage.html】

  1. NCCL_TIMEOUT:设置集合操作超时阈值,单位毫秒;如果常见超时错误,适当增大该值,但不能太大NCCL_TIMEOUT 环境变量用于设置 NCCL 集体通信操作的超时时间。通过调整这个值,你可以更好地处理网络延迟和不稳定的问题,确保 NCCL 通信的稳定性和可靠性。如果在集体通信过程中遇到超时问题,可以尝试调整此环境变量以解决问题。

设置超时时间:

  • NCCL_TIMEOUT 用于定义 NCCL 集体通信操作的超时时间。超时时间是 NCCL 在执行操作时等待响应的最长时间,超出此时间将触发超时错误。

解决网络问题:

  • 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致集体通信操作超时。设置合适的 NCCL_TIMEOUT 可以帮助调节容错设置,避免训练过程中因超时错误而中断。

性能调优:

  • 根据你的集群配置和网络状况,适当调整 NCCL_TIMEOUT 可以帮助优化通信性能和稳定性。
  1. NCCL_ALGO:选择集合通信算法,如Ring, Tree;不同拓扑适合不同算法,测试选更优算法
  2. NCCL_CHUNK_SIZE:定义环形传输缓冲区大小;合理设置可提速,但也会增加内存消耗
  3. NCCL_DEBUG:打开NCCL调试日志;出现问题时打开调试,但会降低速度,不要在生产环境使用
  4. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  5. NCCL_P2P_LEVEL:设置点对点通信优化级别;增加该值可减少P2P次数,提高某些操作效率
  6. NCCL_P2P_DISABLE:禁用点对点通信,强制使用集合通信。在某些情况下,P2P 通信可能会导致性能问题或出现错误。禁用 P2P 通信可以帮助解决这些问题。如果你遇到与 P2P 通信相关的错误或不稳定性,禁用 P2P 可能有助于恢复系统的稳定性。
  7. NCCL_PXN_DISABLE:禁用使用非本地 NIC 的节点间通信,使用 NVLink 和一个中间 GPU。建议设置成1。在PyTorch中进行跨节点all-to-all通信时,如果该环境变量是0会出现异常。
  8. NCCL_SOCKET_IFNAME:选择网络接口。
  9. NCCL_SOCKET_NTHREADS 增加它的数量可以提高socker传输的效率,但是会增加CPU的负担
  10. NCCL_NET_GDR_LEVEL:设置GPUDirect RDMA的使用级别。
  11. NCCL_MAX_NRINGS:定义支持的最大NCCL环路数。
  12. NCCL_MIN_NRINGS:定义最小环路数。
  13. NCCL_BUFFSIZE:设置scratch空间大小。
  14. NCCL_BUFFLE_SIZE 缓存数据量,缓存越大一次ring传输的数据就越大自然对带宽的压力最大,但是相应的总延迟次数会少。默认值是4M(4194304),注意设置的时候使用bytes(字节大小)
  15. NCCL_NTHREADS:设置NCCL内部使用的线程数。
  16. NCCL_VERSION:显示NCCL版本信息。
  17. NCCL_MAX/MIN_NCHANNELS 最小和最大的rings,rings越多对GPU的显存、带宽的压力都越大,也会影响计算性能
  18. NCCL_CHECKS_DISABLE 在每次集合通信进行前对参数检验校对,这会增加延迟时间,在生产环境中可以设为1.默认是0
  19. NCCL_CHECK_POINTERS 在每次集合通信进行前对CUDA内存 指针进行校验,这会增加延迟时间,在生产环境中可以设为1.默认是0
  20. NCCL_NET_GDR_LEVEL GDR触发的条件,默认是当GPU和NIC挂载一个swith上面时使用GDR
  21. NCCL_IGNORE_CPU_AFFINITY 忽略CPU与应用的亲和性使用GPU与nic的亲和性为主
  22. NCCL_IB_DISABLE:禁用InfiniBand传输。

禁用 InfiniBand: 设置 NCCL_IB_DISABLE=1 会禁用 NCCL 在 InfiniBand 设备上的使用。这意味着 NCCL 将不会利用 InfiniBand 网络进行数据传输,而是回退到其他网络接口(例如以太网或其他网络接口)。

调试和兼容性: 禁用 InfiniBand 可能用于调试目的,或在系统中 InfiniBand 网络出现问题时回退到其他网络接口。如果你遇到与 InfiniBand 相关的错误或兼容性问题,禁用 InfiniBand 可能有助于解决这些问题。

  1. NCCL_IB_HCA 代表IB使用的设备:Mellanox mlx5系列的HCA设备NCCL_IB_HCA=mlx5 会默认轮询所有的设备。NCCL_IB_HCA=mlx5_0:1 指定其中一台设备。
  2. NCCL_IB_TIMEOUT 改变量用于控制InfiniBand Verbs超时。取值范围1-22。超时时间的计算公式为4.096微秒 * 2 ^ timeout,正确的值取决于网络的大小。增加该值可以在非常大的网络上提供帮助,例如 NCCL在调用ibv_poll_cq时出现错误12时。建议在大模型训练任务中设置成最大值22,可以减少不少nccl timeout异常。设置超时时间: NCCL_IB_TIMEOUT 用于控制 InfiniBand 网络操作的超时时间。通过调整这个值,你可以控制 NCCL 在遇到通信延迟或网络问题时的容忍度。解决网络问题: 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致超时错误。调整 NCCL_IB_TIMEOUT 可以帮助你在遇到网络问题时更好地调节超时设置,避免训练过程被中断。
  1. NCCL_IB_RETRY_CNT变量控制 InfiniBand 的重试次数。建议在大模型训练任务中设置成13,尽可能多重试。
  2. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  3. NCCL_IB_PCI_RELAXED_ORDERING启用 IB Verbs 传输的Relaxed Ordering。Relaxed Ordering可以极大地提高虚拟化环境下 InfiniBand 网络的性能。设置为 2,如果可用,自动使用Relaxed Ordering。设置为 1,强制使用Relaxed Ordering,如果不可用则失败。设置为 0,禁用使用Relaxed Ordering。默认值为 2。建议值为1

PyTorch 提速

摘自:https://github.com/lartpang/PyTorchTricks?tab=readme-ov-file

Note

原始文档:https://www.yuque.com/lart/ugkv9f/ugysgn

声明: 大部分内容来自知乎和其他博客的分享, 这里只作为一个收集罗列. 欢迎给出更多建议.

知乎回答 (欢迎点赞哦):

预处理提速

  • 尽量减少每次读取数据时的预处理操作, 可以考虑把一些固定的操作, 例如 resize , 事先处理好保存下来, 训练的时候直接拿来用。
  • 将预处理搬到 GPU 上加速。
    • Linux 可以使用 NVIDIA/DALI
    • 使用基于 Tensor 的图像处理操作。

IO 提速

使用更快的图片处理

  • opencv 一般要比 PIL 要快 。
    • 请注意,PIL 的惰性加载的策略使得其看上去 open 要比 opencv 的 imread 要快,但是实际上那并没有完全加载数据。可以对 open 返回的对象调用其 load() 方法,从而手动加载数据,这时的速度才是合理的。
  • 对于 jpeg 读取, 可以尝试 jpeg4py
  • 存 bmp 图 (降低解码时间)。
  • 关于不同图像处理库速度的讨论:Python 的各种 imread 函数在实现方式和读取速度上有何区别? – 知乎

整合数据为单个连续文件 (降低读取次数)

对于大规模的小文件读取,可以保存为一个可以连续读取的连续文件格式。可以选择考虑 TFRecord (Tensorflow) , recordIOhdf5pthn5lmdb

预读取数据

预读取下一次迭代需要的数据。使用案例:

借助内存

  • 直接载到内存里面。
    • 将图片读取后存到一个固定的容器对象中。
  • 把内存映射成磁盘。

借助固态

机械硬盘换成 NVME 固态。参考自 如何给你 PyTorch 里的 Dataloader 打鸡血 – MKFMIKU 的文章 – 知乎

训练策略

低精度训练

在训练中使用低精度 ( FP16 甚至 INT8 、二值网络、三值网络) 表示取代原有精度 ( FP32 ) 表示。

可以节约一定的显存并提速, 但是要小心一些不安全的操作如 mean 和 sum。

更大的 batch

更大的 batch 在固定的 epoch 的情况下往往会带来更短的训练时间。但是大的 batch 面临着超参数的设置、显存占用问题等诸多考量,这又是另一个备受关注的领域了。

代码层面

库设置

  • 在训练循环之前设置 torch.backends.cudnn.benchmark = True 可以加速计算。由于计算不同内核大小卷积的 cuDNN 算法的性能不同,自动调优器可以运行一个基准来找到最佳算法。当你的输入大小不经常改变时,建议开启这个设置。如果输入大小经常改变,那么自动调优器就需要太频繁地进行基准测试,这可能会损害性能。它可以将向前和向后传播速度提高 1.27x 到 1.70x。
  • 使用页面锁定内存,即在 DataLoader 中设定 pin_memory=True
  • 合适的 num_worker,细节讨论可见 Pytorch 提速指南 – 云梦的文章 – 知乎
  • optimizer.zero_grad(set_to_none=False 这里可以通过设置 set_to_none=True 来降低的内存占用,并且可以适度提高性能。但是这也会改变某些行为,具体可见文档。通过 model.zero_grad() 或 optimizer.zero_grad() 将对所有参数执行 memset,并通过读写操作更新梯度。但是,将梯度设置为 None 将不会执行 memset,并且将使用“只写”操作更新梯度。因此,设置梯度为 None 更快。
  • 反向传播期间设定使用 eval 模式并使用 torch.no_grad 关闭梯度计算。
  • 可以考虑使用 channels_last 的内存格式。
  • DistributedDataParallel代替DataParallel。对于多 GPU 来说,即使只有单个节点,也总是优先使用 DistributedDataParallel 而不是 DataParallel ,因为 DistributedDataParallel 应用于多进程,并为每个 GPU 创建一个进程,从而绕过 Python 全局解释器锁 (GIL) 并提高速度。

模型

  • 不要初始化任何用不到的变量,因为 PyTorch 的初始化和 forward 是分开的,他不会因为你不去使用,而不去初始化。
  • @torch.jit.script,使用 PyTroch JIT 将逐点运算融合到单个 CUDA kernel 上。PyTorch 优化了维度很大的张量的运算操作。在 PyTorch 中对小张量进行太多的运算操作是非常低效的。所以有可能的话,将计算操作都重写为批次(batch)的形式,可以减少消耗和提高性能。而如果没办法自己手动实现批次的运算操作,那么可以采用 TorchScript 来提升代码的性能。TorchScript 是一个 Python 函数的子集,但经过了 PyTorch 的验证,PyTorch 可以通过其 just in time(jtt) 编译器来自动优化 TorchScript 代码,提高性能。但更好的做法还是手动实现批次的运算操作。
  • 在使用混合精度的 FP16 时,对于所有不同架构设计,设置尺寸为 8 的倍数。
  • BN 之前的卷积层可以去掉 bias。因为在数学上,bias 可以通过 BN 的均值减法来抵消。我们可以节省模型参数、运行时的内存

数据

  • 将 batch size 设置为 8 的倍数,最大化 GPU 内存的使用。
  • GPU 上尽可能执行 NumPy 风格的操作。
  • 使用 del 释放内存占用。
  • 避免不同设备之间不必要的数据传输。
  • 创建张量的时候,直接指定设备,而不要创建后再传输到目标设备上。
  • 使用 torch.from_numpy(ndarray) 或者 torch.as_tensor(data, dtype=None, device=None),这可以通过共享内存而避免重新申请空间,具体使用细节和注意事项可参考对应文档。如果源设备和目标设备都是 CPU,torch.from_numpy 和 torch.as_tensor 不会拷贝数据。如果源数据是 NumPy 数组,使用 torch.from_numpy 更快。如果源数据是一个具有相同数据类型和设备类型的张量,那么 torch.as_tensor 可以避免拷贝数据,这里的数据可以是 Python 的 list, tuple,或者张量。
  • 使用非阻塞传输,即设定 non_blocking=True。这会在可能的情况下尝试异步转换,例如,将页面锁定内存中的 CPU 张量转换为 CUDA 张量。

对优化器的优化

模型设计

CNN

  • ShuffleNetV2,论文
    • 卷积层输入输出通道一致: 卷积层的输入和输出特征通道数相等时 MAC(内存访问消耗时间, memory access cost 缩写为 MAC ) 最小, 此时模型速度最快
    • 减少卷积分组: 过多的 group 操作会增大 MAC, 从而使模型速度变慢
    • 减少模型分支: 模型中的分支数量越少, 模型速度越快
    • 减少 element-wise 操作: element-wise 操作所带来的时间消耗远比在 FLOPs 上的体现的数值要多, 因此要尽可能减少 element-wise 操作。 depthwise convolution 也具有低 FLOPs 、高 MAC 的特点。

Vision Transformer

  • TRT-ViT: TensorRT-oriented Vision Transformer,论文解读
    • stage-level:Transformer block 适合放置到模型的后期,这可以最大化效率和性能的权衡。
    • stage-level:先浅后深的 stage 设计模式可以提升性能。
    • block-level:Transformer 和 BottleNeck 的混合 block 要比单独的 Transformer 更有效。
    • block-level:先全局再局部的 block 设计模式有助于弥补性能问题。

通用思路

  • 降低复杂度: 例如模型裁剪和剪枝, 减少模型层数和参数规模
  • 改模型结构: 例如模型蒸馏, 通过知识蒸馏方法来获取小模型

推理加速

半精度与权重量化

在推理中使用低精度 ( FP16 甚至 INT8 、二值网络、三值网络) 表示取代原有精度 ( FP32 ) 表示。

  • TensorRT 是 NVIDIA 提出的神经网络推理 (Inference) 引擎, 支持训练后 8BIT 量化, 它使用基于交叉熵的模型量化算法, 通过最小化两个分布的差异程度来实现
  • Pytorch1.3 开始已经支持量化功能, 基于 QNNPACK 实现, 支持训练后量化, 动态量化和量化感知训练等技术
  • 另外 Distiller 是 Intel 基于 Pytorch 开源的模型优化工具, 自然也支持 Pytorch 中的量化技术
  • 微软的 NNI 集成了多种量化感知的训练算法, 并支持 PyTorch/TensorFlow/MXNet/Caffe2 等多个开源框架

更多细节可参考 有三 AI:【杂谈】当前模型量化有哪些可用的开源工具?

操作融合

重参数化(Re-Parameterization)

时间分析

  • Python 自带了几个性能分析的模块 profile , cProfile 和 hotshot , 使用方法基本都差不多, 无非模块是纯 Python 还是用 C 写的。
  • PyTorch Profiler 是一种工具,可在训练和推理过程中收集性能指标。Profiler 的上下文管理器 API 可用于更好地了解哪种模型算子成本最高,检查其输入形状和堆栈记录,研究设备内核活动并可视化执行记录。

项目推荐

  • 基于 Pytorch 实现模型压缩:
    • 量化:8/4/2 bits(dorefa)、三值/二值 (twn/bnn/xnor-net)。
    • 剪枝: 正常、规整、针对分组卷积结构的通道剪枝。
    • 分组卷积结构。
    • 针对特征二值量化的 BN 融合。

扩展阅读

PyTorch 节省显存

原始文档:https://www.yuque.com/lart/ugkv9f/nvffyf

整理自: Pytorch 有什么节省内存 (显存) 的小技巧? – 知乎 https://www.zhihu.com/question/274635237

使用 In-Place 操作

  • 对于默认支持 inplace 的操作尽量启用。比如 relu 可以使用 inplace=True 。
  • 可以将 batchnorm 和一些特定的激活函数打包成 inplace_abn

损失函数

每次循环结束时删除 loss, 可以节约很少显存, 但聊胜于无。可见 Tensor to Variable and memory freeing best practices

混合精度

可以节约一定的显存并提速, 但是要小心一些不安全的操作如 mean 和 sum。

管理不需要反向传播的操作

显存清理

  • torch.cuda.empty_cache() 这是 del 的进阶版, 使用 nvidia-smi 会发现显存有明显的变化. 但是训练时最大的显存占用似乎没变. 大家可以试试: How can we release GPU memory cache?
  • 可以使用 del 删除不必要的中间变量, 或者使用 replacing variables 的形式来减少占用.

梯度累加(Gradient Accumulation)

把一个 batchsize=64 分为两个 32 的 batch,两次 forward 以后,backward 一次。但会影响 batchnorm 等和 batchsize 相关的层。

在 PyTorch 的文档 中提到了梯度累加与混合精度并用的例子。

使用梯度累加技术可以对分布式训练加速,这可以参考:[原创][深度][PyTorch] DDP 系列第三篇:实战与技巧 – 996 黄金一代的文章 – 知乎

梯度检查点(Gradient Checkpointing)

PyTorch 中提供了 torch.utils.checkpoint。这是通过在反向传播期间,在每个检查点位置重新执行一次前向传播来实现的。

论文 Training Deep Nets with Sublinear Memory Cost 基于梯度检查点技术,将显存从 O(N) 降到了 O(sqrt(N))。对于越深的模型, 这个方法省的显存就越多, 且速度不会明显变慢。

相关工具

参考资料

其他技巧

重现

可关注文档中 相关章节

强制确定性操作

避免使用非确定性算法

PyTorch 中,torch.use_deterministic_algorithms() 可以强制使用确定性算法而不是非确定性算法,并且如果已知操作是非确定性的(并且没有确定性的替代方案),则会抛出错误。

设置随机数种子

def seed_torch(seed=1029):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) # if you are using multi-GPU.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

seed_torch()

参考自https://www.zdaiot.com/MLFrameworks/Pytorch/Pytorch%E9%9A%8F%E6%9C%BA%E7%A7%8D%E5%AD%90/

PyTorch 1.9 版本前 DataLoader 中的隐藏 BUG

具体细节可见 可能 95%的人还在犯的 PyTorch 错误 – serendipity 的文章 – 知乎

解决方法可参考 文档

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    numpy.random.seed(worker_seed)
    random.seed(worker_seed)

DataLoader(..., worker_init_fn=seed_worker)

DDP分布式训练–数据加载和训练NCCL

深度学习的发展证明了大数据和大模型的价值。无论是在CV还是NLP领域,在大规模的计算资源上训练模型的能力变得日益重要。GPU以比CPU更快的矩阵乘法和加法运算,加速了模型训练。但随着数据量和模型参数的增长,单块GPU很快变得不够用。因此我们必须找到合适的方法,实现数据和模型在多个GPU甚至多个计算节点间的划分和复制,从而实现更短的训练周期和更大的模型参数量。

DDP大致的流程如下:

  1. 初始化进程组。
  2. 创建分布式并行模型,每个进程都会有相同的模型和参数。
  3. 创建数据分发Sampler,使每个进程加载一个mini batch中不同部分的数据。
  4. 网络中相邻参数分桶,一般为神经网络模型中需要进行参数更新的每一层网络。
  5. 每个进程前向传播并各自计算梯度。
  6. 模型某一层的参数得到梯度后会马上进行通讯并进行梯度平均。
  7. 各GPU更新模型参数。

今天主要来研究 3创建数据分发和Sampler :主要由三部分组成:torch.utils.data.Dataset【可以自定义】、torch.utils.data.DataLoader、以及torch.utils.data.distributed.DistributedSampler【可以自己定义】。

DistributedSampler 确保每个进程(或 GPU)处理数据集的不同部分。DataLoader 使用 DistributedSampler 生成的数据索引来分批数据,并进行数据加载和预处理。

1、 Dataset :

Dataset 是一个抽象类,用于表示数据集。你需要继承这个类并实现其方法,以定义你自己的数据集。它的主要功能包括:

  • 定义数据访问:通过实现 __getitem__ 方法,定义如何访问数据集中单个数据项。
  • 数据集大小:通过实现 __len__ 方法,返回数据集中样本的总数。
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index]

2、DataLoader:

DataLoader 是一个数据加载器,它负责从 Dataset 中批量加载数据。它提供了对数据的批量处理、随机打乱、并行加载等功能。DataLoader 主要功能包括:

  • 批量加载:将数据集分成多个批次,并在每次迭代中返回一个批次的数据。
  • 并行处理:使用多个工作线程(num_workers)来并行加载数据,提高数据加载速度。
  • 数据打乱:通过 shuffle 参数来随机打乱数据顺序。
  • 自动处理样本:使用 collate_fn 将单个样本组合成批次。

1. 数据加载和预处理

DataLoader 负责从数据集(Dataset)中加载数据,并进行必要的预处理操作。预处理可能包括数据增强、归一化等。它通过多线程或多进程的方式并行加载数据,减少了数据加载时间。

  • num_workers:指定用于数据加载的子进程数,帮助加快数据加载速度。

2. 数据分批

DataLoader 将数据集划分为多个批次(batches),以便于模型进行训练和评估。批次的大小可以通过 batch_size 参数进行设置。

  • batch_size:每个批次的数据量,这对于训练过程中每次迭代的数据量非常重要。

3. 分布式训练中的数据划分

在 DDP 下,DataLoader 结合 Sampler 来确保数据在各个进程之间的正确分配。Sampler 控制每个进程(或 GPU)获得数据集的哪一部分。

  • DistributedSampler:当进行分布式训练时,DistributedSampler 确保每个进程处理不同的数据子集,从而实现负载均衡和避免数据重复。

4. 数据的打乱和顺序

为了提高模型的泛化能力,数据通常在每个 epoch 开始时被打乱。DataLoader 提供了打乱数据的功能,这对于训练过程是非常重要的。

  • shuffle:指定是否在每个 epoch 开始时打乱数据,这有助于减少模型对数据顺序的过拟合。

5. 批次丢弃

在训练过程中,如果最后一个批次的样本数不足以构成完整的批次,可以选择丢弃这个批次,以保证每个批次的大小一致。

  • drop_last:指定是否丢弃最后一个批次(如果其大小小于 batch_size)。

6. Sampler 结合使用

DataLoader 可以与不同的 Sampler 结合使用,以支持各种数据加载策略。在 DDP 下,DistributedSampler 是常用的 Sampler,它将数据集划分为多个子集,每个进程处理一个子集。

  • batch_sampler:如果使用自定义的 Sampler,可以将其传递给 batch_sampler 参数来控制数据的分批方式。

data = [1, 2, 3, 4, 5]
dataset = MyDataset(data)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=2, shuffle=True, num_workers=2)

for batch in dataloader:
print(batch)

3、DistributedSampler:

DistributedSampler 用于在分布式训练中对数据进行采样。它的主要作用是确保每个进程(或 GPU)在分布式训练中获得数据的不同子集,从而避免数据重复和确保数据均匀分配。主要功能包括:

  • 分布式数据分配:根据进程的 rank 和总进程数,计算出每个进程应该处理的数据子集。
  • 随机打乱:支持在每个 epoch 重新打乱数据,以增加训练的随机性。
  • 同步:在多个进程之间协调数据的采样。

1. 数据分配

在分布式训练中,数据集被划分成多个子集,每个进程(或 GPU)处理数据集的一部分。Sampler 确保每个进程(或 GPU)得到不同的数据子集,以避免重复和数据丢失。

  • DistributedSampler:这是 PyTorch 提供的专门用于分布式训练的采样器。它根据当前进程的 rank 和总进程数 num_replicas 来划分数据集。每个进程获得数据集的不同部分,从而实现数据的有效分配和负载均衡。

2. 确保数据覆盖

在每个 epoch 中,每个进程需要获取数据集的不同部分,以确保整个数据集被覆盖。Sampler 可以帮助实现这种数据分配策略,避免数据遗漏和冗余。

  • 随机打乱DistributedSampler 还支持在每个 epoch 开始时打乱数据集,这对于训练模型具有更好的泛化能力是非常重要的。

3. 避免数据重复

如果不使用合适的 Sampler,多个进程可能会处理相同的数据,从而导致数据重复。这不仅浪费计算资源,还可能影响模型的训练效果。

  • 去重DistributedSampler 确保每个进程仅处理数据集的一部分,从而避免数据重复。

4. 适应批量大小

在分布式训练中,数据的分配和批处理需要适应分布式环境中的批量大小。Sampler 负责将数据分成适合训练的批次,并确保每个进程处理的数据量与其他进程一致。

  • BatchSamplerBatchSampler 将由 Sampler 生成的索引列表分成批次,以便用于训练。它与 DistributedSampler 结合使用时,可以确保每个进程处理的数据批次符合预期的批量大小。

5. 支持多样本处理策略

不同的任务和模型可能需要不同的数据处理策略,如排序、动态采样等。通过自定义 Sampler,可以实现特定的采样策略以满足任务需求。

  • 自定义采样器:可以实现自定义的 Sampler 类,来满足特定的需求,如按样本长度排序、动态调整批次大小等。
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=4, rank=0)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=2, sampler=sampler)

动手实现一个采样器:

CustomDistributedBufferDynamicBatchSampler 是一个用于分布式训练的自定义数据采样器,它结合了动态批量大小和缓冲区的排序策略。它的目的是通过更复杂的策略来生成批量,以适应各种训练需求。下面是对这个采样器的详细解释:

__iter__ 方法生成数据批次,考虑到动态批量大小和缓冲区的排序:

数据打乱:如果 shuffle 为 True,数据将被打乱。缓冲区排序:数据被分成多个缓冲区,每个缓冲区的大小由 sort_size 控制,并按样本长度进行排序。批量生成:根据 batch_sizebatch_size_sample_max 生成批量。如果当前缓冲区中的数据无法满足批次大小,则将现有数据作为一个批次。数据重复和分配:确保每个进程获得相同数量的批次。如果总批次不足以均分,重复一些批次以满足每个进程的需求。

dataset: 数据集实例。batch_size: 批次大小。batch_type: 批次的类型(例如按 token 或样本)。num_replicas: 总的进程数。rank: 当前进程的 rank。rank_split: 是否分割 rank。shuffle: 是否打乱数据。drop_last: 是否丢弃最后一个批次。is_training: 是否处于训练模式。sort_size: 缓冲区的大小,用于排序数据。start_step: 起始步数(用于从特定步数开始训练)。

def __init__(
    self,
    dataset,
    batch_size,
    batch_type="token",
    num_replicas=None,
    rank=None,
    rank_split=False,
    shuffle=True,
    drop_last=False,
    is_training: bool = True,
    sort_size: int = 1024,
    start_step: int = 0,
    **kwargs,
):
    try:
        rank = dist.get_rank()
        num_replicas = dist.get_world_size()
    except:
        rank = 0
        num_replicas = 1

    self.rank = rank
    self.num_replicas = num_replicas
    self.dataset = dataset
    self.batch_size = batch_size
    self.batch_type = batch_type
    self.is_training = is_training
    self.shuffle = shuffle and is_training
    self.drop_last = drop_last

    self.total_size = len(self.dataset)
    self.num_samples = int(math.ceil(self.total_size / self.num_replicas))
    self.epoch = 0
    self.sort_size = sort_size * num_replicas
    self.max_token_length = kwargs.get("max_token_length", 2048)
    self.length_scale_source = kwargs.get("length_scale_source", 1.0)
    self.batch_size_sample_max = kwargs.get("batch_size_sample_max", 200)
    self.start_step = start_step
    self.batch_num = 1
    if self.start_step > 0:
        logging.info(f"Warning, start_step > 0, dataloader start from step: {self.start_step}")
def __iter__(self):
    if self.shuffle:
        g = torch.Generator()
        g.manual_seed(self.epoch)
        random.seed(self.epoch)
        indices = torch.randperm(len(self.dataset), generator=g).tolist()
    else:
        indices = list(range(len(self.dataset)))

    # Create sorted buffers and form batches
    buffer_batches = []
    for i in range(0, len(indices), self.sort_size):
        buffer = sorted(
            indices[i : i + self.sort_size], key=lambda idx: self.dataset.get_source_len(idx)
        )
        batch = []
        max_len_in_batch = 0
        count = 1
        for idx in buffer:
            original_sample_length = self.dataset.get_source_len(idx)
            if original_sample_length > self.max_token_length:
                continue
            sample_length = 1 if self.batch_type == "example" else original_sample_length
            potential_batch_length = max(max_len_in_batch, sample_length) * (len(batch) + 1)
            if potential_batch_length <= self.batch_size and count < self.batch_size_sample_max:
                batch.append(idx)
                max_len_in_batch = max(max_len_in_batch, sample_length)
                count += 1
            else:
                buffer_batches.append(batch)
                batch = [idx]
                max_len_in_batch = sample_length
                count = 1
        if batch:
            buffer_batches.append(batch)

    # Ensure each rank gets the same number of batches, duplicate data if needed
    batches_per_rank = math.ceil(len(buffer_batches) / self.num_replicas)
    total_batches_needed = batches_per_rank * self.num_replicas
    extra_batches = total_batches_needed - len(buffer_batches)
    buffer_batches += random.choices(buffer_batches, k=extra_batches)

    # Evenly distribute batches from buffer_batches to each rank
    rank_batches = [[] for _ in range(self.num_replicas)]
    for i, batch in enumerate(buffer_batches):
        rank_batches[i % self.num_replicas].append(batch)

    # Assign all batches for the current rank directly
    final_batches = rank_batches[self.rank][self.start_step :]
    self.batch_num = len(final_batches)

    logging.info(
        f"rank: {self.rank}, dataloader start from step: {self.start_step}, batch_num: {len(rank_batches[self.rank])}, after: {self.batch_num}"
    )
    return iter(final_batches)

CustomDistributedBufferDynamicBatchSampler 通过以下方式增强了数据采样:

  • 动态批量大小:根据数据的实际长度动态调整批量大小。
  • 缓冲区排序:使用排序缓冲区策略提高数据处理效率。
  • 数据均匀分配确保每个进程获得相同数量的批次,避免数据不均衡。

这些特性使得 CustomDistributedBufferDynamicBatchSampler 能够更好地处理大规模数据集,并在分布式训练中提供高效的数据加载和批次生成策略。

数据均匀分配至关重要:如果分配不均,会导致某个节点的GPU显存爆炸,导致短筒效应,所以需要对数据进行平均分配:

分布式训练的时候 如何定义自己的samper,如何保证不同的节点使用不同的数据训练?

根据rank数量将索引分成不同的rank份。 分割数据以确保每个进程获取不同的索引

        if self.num_replicas is not None and self.rank is not None:
            # 每个进程处理的数据索引范围
            num_samples = int(np.ceil(len(indices) / self.num_replicas))
            start = self.rank * num_samples
            end = min(start + num_samples, len(indices))
            indices = indices[start:end]

1. 定义自定义Sampler

自定义Sampler需要继承torch.utils.data.Sampler并实现__iter__方法,返回数据索引的迭代器。以下是一个简单的示例:

python复制代码import torch
import numpy as np

class CustomSampler(torch.utils.data.Sampler):
    def __init__(self, data_source, num_replicas=None, rank=None):
        self.data_source = data_source
        self.num_replicas = num_replicas
        self.rank = rank

    def __iter__(self):
        # 获取所有样本索引
        indices = np.arange(len(self.data_source))

        # 分割数据以确保每个进程获取不同的索引
        if self.num_replicas is not None and self.rank is not None:
            # 每个进程处理的数据索引范围
            num_samples = int(np.ceil(len(indices) / self.num_replicas))
            start = self.rank * num_samples
            end = min(start + num_samples, len(indices))
            indices = indices[start:end]

        # 打乱数据
        np.random.shuffle(indices)
        return iter(indices)

    def __len__(self):
        if self.num_replicas is not None and self.rank is not None:
            num_samples = int(np.ceil(len(self.data_source) / self.num_replicas))
            return num_samples
        return len(self.data_source)

2. 初始化分布式环境

在训练脚本中,初始化分布式环境并创建自定义采样器。

python复制代码import torch
import torch.distributed as dist

dist.init_process_group(backend='nccl')  # 或 'gloo'
local_rank = dist.get_rank()
world_size = dist.get_world_size()

# 数据集
from torchvision import datasets, transforms
transform = transforms.Compose([transforms.ToTensor()])
dataset = datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)

# 创建自定义采样器
sampler = CustomSampler(dataset, num_replicas=world_size, rank=local_rank)

# 创建数据加载器
dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, sampler=sampler)

3. 在训练时设置采样器的epoch

如果你的自定义Sampler需要在每个epoch中更改数据顺序,可以在每个epoch开始时调用sampler.set_epoch(epoch)

python复制代码for epoch in range(num_epochs):
    sampler.set_epoch(epoch)  # 如果你的自定义Sampler支持这个方法
    for batch in dataloader:
        # 训练代码

这样,你就可以定义一个适合你需求的自定义Sampler,并在分布式训练中使用它。

DDP分布式训练时候 batchsize设置是指单卡还多卡所有的总batch?

在分布式数据并行(DDP)训练中,batch_size的设置是指每个单卡(即每个GPU)的batch size。总的batch size是每个单卡的batch size乘以GPU的数量。【在samper采样的时候,根据rank数量,将index 分割成 rank份,每一份里面进行batchsize的采样,所以bs指的是单个GPU的bs】

例如,如果你有4个GPU,并且每个GPU的batch size设置为32,那么总的batch size就是32 * 4 = 128。每个GPU在每次训练迭代中处理32个样本,所有4个GPU在每次训练迭代中处理总共128个样本。

如果你使用的是分布式数据并行的训练策略,确保将batch_size设置为每个GPU上希望的大小,而不是总的batch size。

datalaoder中设置的 number_work在DDP训练中如何工作的?

首先明确一点: num_works指的是单个GPU的num_works数据加载进程数量。

  • **num_workers**参数定义了并行数据加载的进程数量。每个进程独立地从数据集中读取和预处理数据。
  • **collate_fn**可以自定义如何将数据项组合成batch。
  • 数据加载进程将预处理后的数据批次传递给主进程,主进程将这些批次数据送入模型进行训练。

使用多个数据加载进程可以提高数据预处理的速度,减少GPU在训练时的等待时间,从而加快整体训练过程。

num_workers的作用

  • 数据加载: num_workers决定了用于加载数据的子进程的数量。更多的工作进程可以并行地读取和预处理数据,从而加快数据加载速度,减少GPU的等待时间。
  • 性能影响: 增加num_workers的数量通常可以提高数据加载速度,但也会增加系统的内存使用。合理设置num_workers的值可以在数据加载效率和系统资源使用之间找到平衡。

在DDP训练中的考虑

  1. 每个进程的num_workers: 每个分布式进程(即每个GPU)都有自己的数据加载子进程。这意味着总的num_workers会是每个GPU上num_workers的值乘以GPU的数量(分布式进程数)。例如,如果有4个GPU,并且每个GPU的num_workers设置为4,那么总的工作进程数将是4 * 4 = 16。
  2. 避免数据重叠: 在分布式训练中,需要确保每个进程处理的数据子集是不同的。使用DistributedSampler可以确保数据在各个进程间均匀分配,从而避免数据重复和丢失。
  3. 同步和通信开销: 增加num_workers的数量可能会增加进程间的同步和通信开销,特别是在多GPU的情况下。需要根据具体的硬件配置和数据集大小来调整num_workers
  4. 内存和CPU资源: 每增加一个工作进程,都会消耗额外的CPU资源和内存。确保你的系统有足够的资源来支持设置的num_workers值。

单个numberwork子进程单独负责一个batch的数据吗,然后多个进程负责加载多个不同batch数据?

在PyTorch中,DataLoader中的子进程(由num_workers参数指定)并不一定是每个进程负责一个完整的batch的数据。实际操作中,多个子进程负责并行地预处理数据并将其送入主进程。下面是详细的解释:

数据加载进程的工作方式

  1. 子进程的任务: 每个数据加载进程从数据集中提取样本,并执行预处理任务。子进程会从数据集中读取单个样本(或多个样本),进行必要的转换和预处理,然后将这些处理后的样本返回给主进程。
  2. 批次的生成: DataLoader在主进程中处理批次的生成。主进程负责将从各个子进程中接收到的样本聚合成一个完整的batch。具体来说,当主进程需要一个新的batch时,它会从子进程中获取样本,然后使用collate_fn将这些样本组合成一个batch。
  3. 数据并行处理: 如果设置了num_workers > 1DataLoader会启动多个子进程来并行地加载数据。这些子进程并不独立处理完整的batch,而是并行地从数据集中提取和预处理样本。主进程会从这些子进程中收集样本,并在主进程中将样本组成batch。

数据加载示例

假设你设置了num_workers=4,这意味着会有4个子进程并行地处理数据。数据加载的过程大致如下:

  1. 子进程读取数据:
    • 每个子进程从数据集中读取样本。
    • 子进程对样本进行预处理(如数据增强、标准化等)。
  2. 主进程收集数据:
    • 主进程从子进程中收集处理后的样本。
    • 主进程使用collate_fn将样本组合成一个batch。
  3. 生成批次:
    • 主进程将生成的batch传递给训练循环。

为什么numberwork设置大了会增加CPU内存?

设置较大的 num_workers 值会增加 CPU 内存使用的原因有几个方面:

1. 进程数量和内存占用

  • 每个子进程的内存消耗: 每个数据加载子进程(由 num_workers 定义)都会独立地运行,并加载一部分数据集。每个子进程会使用自己的内存来存储数据和进行预处理操作。
  • 内存需求: 如果 num_workers 设置得很高,系统将会启动多个子进程,这些进程会同时存在并占用内存。每个进程都需要一定的内存来存储数据和运行预处理代码,从而导致总的内存使用增加。

2. 数据预处理和缓存

  • 数据缓冲: DataLoader 使用子进程来并行加载和预处理数据。在预处理过程中,子进程可能会创建和维护缓存,这些缓存可能会消耗额外的内存。
  • 数据加载: 进程在数据加载过程中可能会在内存中保持一定量的数据,以提高数据处理效率。这种内存的占用也会随着 num_workers 的增加而增加。

3. 并发处理

  • 并发开销: 启动大量的子进程进行数据处理会增加系统的并发开销。操作系统需要为每个进程分配内存和管理资源,这会导致系统整体的内存使用增加。
  • 进程间通信: 多个子进程之间可能会有数据交换和同步操作,这些操作也可能增加内存开销。

大模型训练中的数据加载和NCCL通信问题

A、训练大模型时候,有两亿的数据,数据索引保存到了jsonl文件中,在torch dataloader 加载数据jsonl文件时候爆内存,如何解决

1. 使用分块加载(Chunk Loading)【法1】

将数据分块处理,而不是一次性加载所有数据。可以在Dataset类中实现这一点。示例代码如下:

import json
import torch
from torch.utils.data import Dataset, DataLoader

class LargeJSONLDataset(Dataset):
    def __init__(self, jsonl_file, chunk_size=1000):
        self.jsonl_file = jsonl_file
        self.chunk_size = chunk_size
        self.data = []
        self._load_chunk(0)

    def _load_chunk(self, chunk_index):
        start_line = chunk_index * self.chunk_size
        end_line = start_line + self.chunk_size
        self.data = []
        with open(self.jsonl_file, 'r') as f:
            for i, line in enumerate(f):
                if start_line <= i < end_line:
                    self.data.append(json.loads(line))
                if i >= end_line:
                    break

    def __len__(self):
        with open(self.jsonl_file, 'r') as f:
            return sum(1 for _ in f)

    def __getitem__(self, idx):
        chunk_index = idx // self.chunk_size
        self._load_chunk(chunk_index)
        local_idx = idx % self.chunk_size
        return self.data[local_idx]

# 创建 Dataset 和 DataLoader
dataset = LargeJSONLDataset('data.jsonl')
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

实现的逻辑:采样器sampler 获取 index = len(self.dataset),然后进行index随机抽样,将抽到的id送给dataloader加载器,dataloader根据这些id,去dataset类里面执行getitem。 dataset 不在需要加载所有的jsonl文件,只需要根据id//self.chunk_size判断数据在第几个chunk,然后对应需要加载目标chunk的数据即可,然后在id% self.chunk_size 得到在该chunk的真实id,读取。这样做缺点是每次都需要重新laod jsonl文件,加载时间变慢。

2. 使用内存映射

内存映射可以帮助将大文件映射到内存中而不是完全加载。jsonl格式通常不支持直接内存映射,但可以使用分块处理与内存映射结合的方法。

内存映射是一种将磁盘上的文件映射到内存中的方法。通过使用内存映射,我们可以在不将整个文件加载到内存中的情况下访问文件的内容。这对于处理大型数据集非常有用,因为它可以节省内存空间,并且可以快速访问文件的任意部分。

内存映射:将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read、write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。

使用内存映射有以下几个优点:

  1. 节省内存空间:通过内存映射,我们可以在不将整个文件加载到内存中的情况下访问文件的内容。这对于处理大型数据集非常有用,因为它可以节省大量的内存空间。
  2. 快速访问文件的任意部分:由于内存映射将文件映射到内存中,我们可以快速访问文件的任意部分,而不需要读取整个文件。这对于随机访问大型文件非常有用。
  3. 支持并发访问:多个进程可以同时访问内存映射文件,而不会发生冲突。这使得内存映射非常适合多进程的数据处理任务。

https://github.com/DACUS1995/pytorch-mmap-dataset

3. 优化数据存储格式

考虑将数据存储为其他格式,如HDF5或Parquet,这些格式支持更高效的分块读写和压缩。例如,可以使用pandas将JSONL文件转换为Parquet格式,然后使用pandas读取它们。

4. 使用数据流处理

使用生成器逐行读取数据,而不是将整个文件加载到内存中:

def data_generator(file_path):
    with open(file_path, 'r') as f:
        for line in f:
            yield json.loads(line)

# 在 DataLoader 中使用生成器
def collate_fn(batch):
    # 自定义你的批处理操作
    return batch

dataset = data_generator('data.jsonl')
dataloader = DataLoader(dataset, batch_size=32, collate_fn=collate_fn)

5. 多进程数据加载

使用torch.utils.data.DataLoadernum_workers参数来并行加载数据:

dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)

6. 数据预处理

在数据加载之前进行预处理,将数据处理成更紧凑的格式或者将其划分为多个较小的文件进行分段加载。这样可以减少每次加载的数据量。

7. 使用分块加载【法2】

方法1 每次读取单个数据,都需要重新读取一边jsonl文件,大大增加了数据加载的时间,为了尽量不影响数据加载时间,我们考虑牺牲一部分随机性来提高速度。

具体方法为:jsonl数据被分成N份,在训练1轮中,数据datalaoder先加载第一份的jsonl数据,然后part1数据加载训练结束后,继续加载part2的jsonl数据…..直到所有的jsonl数据加载完成,训练1轮结束。这样做的好处是每次batch不需要重新读取jsonl,但缺点就是不同part的jsonl之间数据不互通,数据的随机性降低,具体代码实现参考:FunASR

1:在训练1个epoch时候:传递data_split_num【数据分成几份】 data_split_i 【当前第几分】

2、datalaoder的 build_iter代码实现:本质上就是 重新执行 torch.utils.data.Dataset【可以自定义】、torch.utils.data.DataLoader、以及torch.utils.data.distributed.DistributedSampler【可以自己定义】 ,需要向 Dataset 传递 data_split_i 参数;

    def build_iter(self, epoch=0, data_split_i=0, start_step=0, **kwargs):

        # reload dataset slice
        if self.data_split_num > 1:
            del self.dataset_tr
            self.dataset_tr = self.dataset_class(
                self.kwargs.get("train_data_set_list"),
                frontend=self.frontend,
                tokenizer=self.tokenizer,
                is_training=True,
                **self.kwargs.get("dataset_conf"),
                data_split_i=data_split_i,
            )

        # dataloader
        batch_sampler = self.kwargs["dataset_conf"].get("batch_sampler", "BatchSampler")
        batch_sampler_val = None
        if batch_sampler is not None:
            batch_sampler_class = tables.batch_sampler_classes.get(batch_sampler)
            batch_sampler = batch_sampler_class(
                self.dataset_tr, start_step=start_step, **self.kwargs.get("dataset_conf")
            )
            batch_sampler_val = batch_sampler_class(
                self.dataset_val, is_training=False, **self.kwargs.get("dataset_conf")
            )

        batch_sampler["batch_sampler"].set_epoch(epoch)
        batch_sampler_val["batch_sampler"].set_epoch(epoch)
        dataloader_tr = torch.utils.data.DataLoader(
            self.dataset_tr, collate_fn=self.dataset_tr.collator, **batch_sampler
        )
        dataloader_val = torch.utils.data.DataLoader(
            self.dataset_val, collate_fn=self.dataset_val.collator, **batch_sampler_val
        )

        return dataloader_tr, dataloader_val

3、 Dataset 的具体实现:

可以看出,AudioDataset里面实际上利用的index_ds来具体读取jsonl文件内容的。

4、index_ds的实现:只返回部分jsonl数据,虽然函数里面加载了整个文件,但函数结束file_list_all解释放掉了,最后只有file_list一直在占用内存。

8、pytorch pin_memory 设置为Fasle【牺牲时间换空间】

在PyTorch中,何时使用pin_memory?【CPU内存不足,建议关闭该功能】 当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。

pin_memory就是锁页内存,创建DataLoader时,设置pin_memory=True,则意味着生成的Tensor数据最开始是属于内存中的锁页内存,这样将内存的Tensor转义到GPU的显存就会更快一些。pin_memory=False表示将load进数据放至非锁页内存区,速度会较慢。

当计算机的内存充足的时候,设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。

主机中的内存,有两种存在方式: 一是锁页,二是不锁页,

锁页内存存放的内容在任何情况下都不会与主机的虚拟内存进行交换(注:虚拟内存就是硬盘),而不锁页内存在主机内存不足时,数据会存放在虚拟内存中。显卡中的显存全部是锁页内存,当计算机的内存充足的时候,可以设置pin_memory=True。

在使用PyTorch进行数据加载时,pin_memory是一个可选的,它通常用于将数据存储在主机内存(RAM)中的固定内存页(pinned memory)上,以便更高效地将数据传输到GPU内存。

主要作用如下:

  1. 提高数据传输效率:当使用GPU进行训练时,通常需要将数据从主机内存传输到GPU内存。使用pin_memory可以将数据存储在固定内存页中,减少数据传输的时间和开销,提高数据传输的效率。
  2. 减少数据传输延迟:主机内存和GPU内存之间的数据传输通常涉及内存拷贝操作,而内存拷贝是一项相对较慢的操作。pin_memory可以在数据加载时将数据直接存放在固定内存页中,避免不必要的内存拷贝过程,从而减少数据传输的延迟。

需要注意的是,使用pin_memory会占用额外的主机内存,并且只在使用CUDA设备的情况下才有效果。

锁页内存和GPU显存之间的拷贝速度大约是6GB/s
可分页内存和GPU显存间的拷贝速度大约是3GB/s。
GPU内存间速度是30GB/s,CPU间内存速度是10GB/s

通常我们的主机处理器是支持虚拟内存系统的,也就是使用硬盘空间来代替内存。大多数系统中虚拟内存空间被划分成许多页,它们是寻址的单元,页的大小至少是4096个字节。虚拟寻址能使一个连续的虚拟地址空间映射到物理内存并不连续的一些页。

如果某页的物理内存被标记为换出状态,它就可以被更换到磁盘上,也就是说被踢出内存了。如果下次需要该页了,则重新加载到内存里。显然如果这一页切换的非常频繁,那么会浪费不少时间。

锁页(pinned page)是操作系统常用的操作,就是为了使硬件外设直接访问CPU内存,从而避免过多的复制操作。被锁定的页面会被操作系统标记为不可被换出的,所以设备驱动程序给这些外设编程时,可以使用页面的物理地址直接访问内存,CPU也可以访问上述锁页内存,但是此内存是不能移动或换页到磁盘上的。另外,在GPU上分配的内存默认都是锁页内存,这只是因为GPU不支持将内存交换到磁盘上。

Host(例如CPU)的数据分配默认是**pageable(可分页的)**,但是GPU是没法直接读取pageable内存里的数据的,所以需要先创建一个临时的缓冲区(pinned memory),把数据从pageable内存拷贝pinned内存上,然后GPU才能从pinned内存上读取数据,如上图(左)所示。

9、number_works降低参数值

从磁盘加载数据到 host 的page-locked内存. 采用多个 worker 进程并行地数据加载 ,会增加内存占用,因此为了降低内存占用,可以考虑number_work从低到高设置:2、4、8、16,知道训练速度达到最优。

每个进程的num_workers: 每个分布式进程(即每个GPU)都有自己的数据加载子进程。这意味着总的num_workers会是每个GPU上num_workers的值乘以GPU的数量(分布式进程数)。

例如,如果有4个GPU,并且每个GPU的num_workers设置为4,那么总的工作进程数将是4 * 4 = 16。

避免数据重叠: 在分布式训练中,需要确保每个进程处理的数据子集是不同的。使用DistributedSampler可以确保数据在各个进程间均匀分配,从而避免数据重复和丢失。

同步和通信开销: 增加num_workers的数量可能会增加进程间的同步和通信开销,特别是在多GPU的情况下。需要根据具体的硬件配置和数据集大小来调整num_workers

内存和CPU资源: 每增加一个工作进程,都会消耗额外的CPU资源和内存。确保你的系统有足够的资源来支持设置的num_workers值。

在给Dataloader设置worker数量(num_worker)时,到底设置多少合适?这个worker到底怎么工作的?

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

参数详解:

每次dataloader加载数据时:dataloader一次性创建num_worker个worker,(也可以说dataloader一次性创建num_worker个工作进程,worker也是普通的工作进程),并用batch_sampler将指定第几个batch分配给指定worker,worker将它负责的batch加载进RAM。

然后,dataloader从RAM中找本轮迭代要用的batch,如果找到了,就使用。如果没找到,就要num_worker个worker继续加载batch到内存,直到dataloader在RAM中找到目标batch。一般情况下都是能找到的,因为batch_sampler指定batch时当然优先指定本轮要用的batch。

num_worker设置得大,好处是寻batch速度快,因为下一轮迭代的batch很可能在上一轮/上上一轮…迭代时已经加载好了。坏处是内存开销大,也加重了CPU负担(worker加载数据到RAM的进程是CPU复制的嘛)。num_workers的经验设置值是自己电脑/服务器的CPU核心数,如果CPU很强、RAM也很充足,就可以设置得更大些。

如果num_worker设为0,意味着每一轮迭代时,dataloader不再有自主加载数据到RAM这一步骤(因为没有worker了),而是在RAM中找batch,找不到时再加载相应的batch。缺点当然是速度更慢。

  1. 根据硬件配置调整: 在多核 CPU 环境下,设置较高的 num_workers(如 4 到 16)可以有效利用多核资源,提高数据加载速度。具体的最佳值需要根据系统的 CPU 核心数和内存情况来调整。
  2. 数据加载瓶颈: 如果你发现训练时 GPU 经常处于等待数据的状态,这可能是因为数据加载成为了瓶颈。增加 num_workers 可以帮助缓解这一问题。
  3. 系统负载: 在某些情况下,设置过高的 num_workers 可能会导致系统负载过高,影响其他任务或整体系统性能。因此需要找到一个平衡点。
  4. 实验调整: 实际应用中,最好的做法是从较小的值开始(如 2 或 4),然后逐步增加,观察训练过程中的数据加载速度和系统资源使用情况,从而确定最佳设置。

DistributedDataParallel 消除了 DataParallel 中上述不足. 其不再需要主 GPU,每个 GPU 分别进行各自任务. 每个 GPU 上的训练是其独立进程,而在 DataParallel 中是采用多线程(multi-thread) 的.

DistributedDataParallel 的工作过程如,

[1] – 从磁盘加载数据到 host 的page-locked内存. 采用多个 worker 进程并行地数据加载;其中,distributed data sampler 确保了加载的数据在跨进程间是不重叠的.

[2] – 将 mini-batch 数据由 page-locked 内存转移到 GPU. 不需要任何数据广播. 因为每个 GPU 分别有模型副本,因此也不需要模型广播.

[3] – 分别在各 GPU 独立进行前向计算和损失函数计算. 因此,也不需要收集各 GPUs 的输出.

[4] – 后向梯度计算,梯度是跨GPUs all-reduced的. 确保在后向传播结束时,每个 GPU 最终得到相同的平均梯度的副本.

[5] – 更新模型参数. 由于每个 GPU 是由相同的模型副本开始的,且梯度是 all-reduced 的,因此所有 GPUs 上的权重更新是相同的,无需再进行模型同步.

以上即完成了一次迭代. 这种设计确保了模型参数的更新是相同的,因此消除了每次开始时的模型同步.

B 、NCCL通信超时问题

[PG 1 Rank 9] Timeout at NCCL work: 957, last enqueued NCCL work: 957, last completed NCCL work: 956.
[rank9]:[E ProcessGroupNCCL.cpp:577] [Rank 9] Some NCCL operations have failed or timed out. Due to the asynchronous nature of CUDA kernels, subsequent GPU operations might run on corrupted/incomplete data.
[rank9]:[E ProcessGroupNCCL.cpp:583] [Rank 9] To avoid data inconsistency, we are taking the entire process down.

这种报错需要具体情况具体分析

1、尝试增加NCCL 超时时间/设置过NCCL变量

如何设置:

1、查看变量:查看环境变量 NCCL_IB_TIMEOUT 的值

echo $NCCL_IB_TIMEOUT # 如果环境变量已设置,这个命令将显示其值;如果没有设置,则不会有任何输出。

printenv 命令可以显示所有环境变量的值,也可以查看特定的环境变量:

printenv NCCL_IB_TIMEOUT #如果环境变量未设置,该命令不会输出任何内容。

也可以使用 env 命令来列出所有环境变量,并查找 NCCL_IB_TIMEOUT

env | grep NCCL_IB_TIMEOUT

NCCL相关环境变量说明 【https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage.html】

  1. NCCL_TIMEOUT:设置集合操作超时阈值,单位毫秒;如果常见超时错误,适当增大该值,但不能太大NCCL_TIMEOUT 环境变量用于设置 NCCL 集体通信操作的超时时间。通过调整这个值,你可以更好地处理网络延迟和不稳定的问题,确保 NCCL 通信的稳定性和可靠性。如果在集体通信过程中遇到超时问题,可以尝试调整此环境变量以解决问题。

设置超时时间:

  • NCCL_TIMEOUT 用于定义 NCCL 集体通信操作的超时时间。超时时间是 NCCL 在执行操作时等待响应的最长时间,超出此时间将触发超时错误。

解决网络问题:

  • 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致集体通信操作超时。设置合适的 NCCL_TIMEOUT 可以帮助调节容错设置,避免训练过程中因超时错误而中断。

性能调优:

  • 根据你的集群配置和网络状况,适当调整 NCCL_TIMEOUT 可以帮助优化通信性能和稳定性。
  1. NCCL_ALGO:选择集合通信算法,如Ring, Tree;不同拓扑适合不同算法,测试选更优算法
  2. NCCL_CHUNK_SIZE:定义环形传输缓冲区大小;合理设置可提速,但也会增加内存消耗
  3. NCCL_DEBUG:打开NCCL调试日志;出现问题时打开调试,但会降低速度,不要在生产环境使用
  4. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  5. NCCL_P2P_LEVEL:设置点对点通信优化级别;增加该值可减少P2P次数,提高某些操作效率
  6. NCCL_P2P_DISABLE:禁用点对点通信,强制使用集合通信。在某些情况下,P2P 通信可能会导致性能问题或出现错误。禁用 P2P 通信可以帮助解决这些问题。如果你遇到与 P2P 通信相关的错误或不稳定性,禁用 P2P 可能有助于恢复系统的稳定性。
  7. NCCL_PXN_DISABLE:禁用使用非本地 NIC 的节点间通信,使用 NVLink 和一个中间 GPU。建议设置成1。在PyTorch中进行跨节点all-to-all通信时,如果该环境变量是0会出现异常。
  8. NCCL_SOCKET_IFNAME:选择网络接口。
  9. NCCL_SOCKET_NTHREADS 增加它的数量可以提高socker传输的效率,但是会增加CPU的负担
  10. NCCL_NET_GDR_LEVEL:设置GPUDirect RDMA的使用级别。
  11. NCCL_MAX_NRINGS:定义支持的最大NCCL环路数。
  12. NCCL_MIN_NRINGS:定义最小环路数。
  13. NCCL_BUFFSIZE:设置scratch空间大小。
  14. NCCL_BUFFLE_SIZE 缓存数据量,缓存越大一次ring传输的数据就越大自然对带宽的压力最大,但是相应的总延迟次数会少。默认值是4M(4194304),注意设置的时候使用bytes(字节大小)
  15. NCCL_NTHREADS:设置NCCL内部使用的线程数。
  16. NCCL_VERSION:显示NCCL版本信息。
  17. NCCL_MAX/MIN_NCHANNELS 最小和最大的rings,rings越多对GPU的显存、带宽的压力都越大,也会影响计算性能
  18. NCCL_CHECKS_DISABLE 在每次集合通信进行前对参数检验校对,这会增加延迟时间,在生产环境中可以设为1.默认是0
  19. NCCL_CHECK_POINTERS 在每次集合通信进行前对CUDA内存 指针进行校验,这会增加延迟时间,在生产环境中可以设为1.默认是0
  20. NCCL_NET_GDR_LEVEL GDR触发的条件,默认是当GPU和NIC挂载一个swith上面时使用GDR
  21. NCCL_IGNORE_CPU_AFFINITY 忽略CPU与应用的亲和性使用GPU与nic的亲和性为主
  22. NCCL_IB_DISABLE:禁用InfiniBand传输。

禁用 InfiniBand: 设置 NCCL_IB_DISABLE=1 会禁用 NCCL 在 InfiniBand 设备上的使用。这意味着 NCCL 将不会利用 InfiniBand 网络进行数据传输,而是回退到其他网络接口(例如以太网或其他网络接口)。

调试和兼容性: 禁用 InfiniBand 可能用于调试目的,或在系统中 InfiniBand 网络出现问题时回退到其他网络接口。如果你遇到与 InfiniBand 相关的错误或兼容性问题,禁用 InfiniBand 可能有助于解决这些问题。

  1. NCCL_IB_HCA 代表IB使用的设备:Mellanox mlx5系列的HCA设备NCCL_IB_HCA=mlx5 会默认轮询所有的设备。NCCL_IB_HCA=mlx5_0:1 指定其中一台设备。
  2. NCCL_IB_TIMEOUT 改变量用于控制InfiniBand Verbs超时。取值范围1-22。超时时间的计算公式为4.096微秒 * 2 ^ timeout,正确的值取决于网络的大小。增加该值可以在非常大的网络上提供帮助,例如 NCCL在调用ibv_poll_cq时出现错误12时。建议在大模型训练任务中设置成最大值22,可以减少不少nccl timeout异常。设置超时时间: NCCL_IB_TIMEOUT 用于控制 InfiniBand 网络操作的超时时间。通过调整这个值,你可以控制 NCCL 在遇到通信延迟或网络问题时的容忍度。解决网络问题: 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致超时错误。调整 NCCL_IB_TIMEOUT 可以帮助你在遇到网络问题时更好地调节超时设置,避免训练过程被中断。
  1. NCCL_IB_RETRY_CNT变量控制 InfiniBand 的重试次数。建议在大模型训练任务中设置成13,尽可能多重试。
  2. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  3. NCCL_IB_PCI_RELAXED_ORDERING启用 IB Verbs 传输的Relaxed Ordering。Relaxed Ordering可以极大地提高虚拟化环境下 InfiniBand 网络的性能。设置为 2,如果可用,自动使用Relaxed Ordering。设置为 1,强制使用Relaxed Ordering,如果不可用则失败。设置为 0,禁用使用Relaxed Ordering。默认值为 2。建议值为1

2、增加 dist.init_process_group 超时时间,还要对应修改NCCL变量: export TORCH_NCCL_BLOCKING_WAIT !!

dist.init_process_group(backend=kwargs.get(“backend”, “nccl”), init_method=”env://”,timeout=timedelta(seconds=7200000)) # 7200s 等待2h


export TORCH_NCCL_BLOCKING_WAIT=1  # 是否堵塞等待某节点错误超时 “0” 不堵塞等待  “1” 堵塞等待
echo $TORCH_NCCL_BLOCKING_WAIT
printenv TORCH_NCCL_BLOCKING_WAIT  # 新版本torch

export TORCH_NCCL_ASYNC_ERROR_HANDLING=1 # 是否堵塞等待某节点错误超时 “0” 不堵塞等待  “1” 堵塞等待
echo $TORCH_NCCL_ASYNC_ERROR_HANDLING
printenv TORCH_NCCL_ASYNC_ERROR_HANDLING # 新版本torch

export NCCL_BLOCKING_WAIT=1
echo $NCCL_BLOCKING_WAIT
printenv NCCL_BLOCKING_WAIT      #旧版本torch

export NCCL_ASYNC_ERROR_HANDLING=1
echo $NCCL_ASYNC_ERROR_HANDLING
printenv NCCL_ASYNC_ERROR_HANDLING   #旧版本torch

在使用 torch.distributed.init_process_group 初始化分布式训练时,timeout 参数用于指定集群中进程之间进行集体通信操作时的超时时间。这个超时时间决定了分布式进程在等待其他进程响应时的最长时间。

torch.distributed.init_process_group(backend=Noneinit_method=Nonetimeout=Noneworld_size=-1rank=-1store=Nonegroup_name=”pg_options=Nonedevice_id=None)

说明文档:https://pytorch.org/docs/stable/distributed.html

新版本torch
旧版本torch

超时设置:

  • timeout 参数用于设置分布式通信操作的超时时间。超时时间是 timedelta 对象,表示在等待其他进程响应时的最长时间。
  • 在你提供的示例中,timeout 被设置为 timedelta(seconds=108000),即 30 小时。这意味着分布式通信操作将在 30 小时内等待其他进程响应。

用途:

  • 容错性: 提高容错性,确保在长时间等待期间不会因为网络延迟或通信问题导致进程失败。
  • 调试: 在调试和测试中,设置较长的超时时间可以帮助识别是否因为超时设置过短而导致的通信问题。
  • 防止死锁: 在复杂的分布式训练任务中,长时间的超时时间有助于防止因通信死锁而导致的进程失败。

超时处理:

  • 如果在指定的超时时间内没有收到预期的响应,init_process_group 将会引发超时错误。这通常表示进程之间的通信出现了问题,可能需要检查网络连接、进程配置或其他潜在问题。

TORCH_NCCL_BLOCKING_WAIT 是一个环境变量,用于控制 PyTorch 在使用 NCCL 后端时的通信等待策略。具体来说,它决定了 NCCL 操作是否使用阻塞等待方式来处理通信操作。

TORCH_NCCL_BLOCKING_WAIT 的作用

  • TORCH_NCCL_BLOCKING_WAIT=1:
    • 启用阻塞等待: 当设置为 1 时,PyTorch 在执行 NCCL 操作(如 all-reducebroadcast)时,会使用阻塞等待的方式。这意味着 PyTorch 会等待操作完全完成或超时之后才继续执行。这种设置可以帮助确保所有进程在继续之前都完成了通信,有助于解决因异步操作引起的数据同步问题或错误。
  • TORCH_NCCL_BLOCKING_WAIT=0:
    • 禁用阻塞等待: 默认情况下(即设置为 0),PyTorch 使用非阻塞等待方式。NCCL 操作在后台异步进行,可能会导致在操作完成之前程序继续执行。这种方式可能会在网络延迟或系统负载较高时引发通信超时或数据不一致的问题。

如何设置 TORCH_NCCL_BLOCKING_WAIT

你可以通过以下方式设置 TORCH_NCCL_BLOCKING_WAIT 环境变量:

  1. 临时设置: 在运行程序时,可以在命令行中临时设置环境变量:bash复制代码TORCH_NCCL_BLOCKING_WAIT=1 python your_training_script.py
  2. 永久设置: 在终端会话中,可以通过 export 命令永久设置:bash复制代码export TORCH_NCCL_BLOCKING_WAIT=1 这个设置会在当前终端会话中生效,直到会话结束或重新启动。
  3. 在脚本中设置: 如果你希望在 Python 脚本内部设置这个变量,可以在脚本的开头添加:python复制代码import os os.environ['TORCH_NCCL_BLOCKING_WAIT'] = '1'

使用场景

  • 调试和稳定性:
    • 启用阻塞等待有助于调试和解决 NCCL 操作中的同步问题。它确保所有通信操作完成后才继续执行,有助于提高系统的稳定性。
  • 网络不稳定和负载高:
    • 在网络延迟较高或系统负载较大的环境中,启用阻塞等待可以减少由于异步操作导致的超时和错误。

注意事项

  • 性能影响:
    • 阻塞等待可能会增加通信操作的等待时间,影响整体训练性能,特别是在大规模分布式训练任务中。
  • 超时问题:
    • 如果超时时间设置过短或网络状况较差,启用阻塞等待可能导致更多的超时错误。因此,需要平衡稳定性和性能。

总结

TORCH_NCCL_BLOCKING_WAIT 环境变量控制 PyTorch 使用 NCCL 后端时的通信等待策略。设置为 1 可以启用阻塞等待,有助于提高系统稳定性和调试能力,但可能会影响性能。根据具体的训练任务和环境,可以选择合适的设置来优化训练过程。

相关环境变量解释:

https://pytorch.org/docs/stable/torch_nccl_environment_variables.html

3、增加 num_workers 来加快处理数据【Dataloader阶段导致 NCCL超时】

如果是在数据加载的时间过长,导致NCCL通信超时,考虑增加num_workers来提高数据加载速度。

减少数据加载瓶颈:

  • 增加 num_workers 可以提高数据加载速度,减少训练过程中因数据加载而导致的等待时间。这可以间接减少由于数据处理缓慢而可能引发的 NCCL 超时问题。

提高训练效率:

  • 更高效的数据加载可以提高整体训练效率,使训练过程更加顺畅,从而可能减少由于系统负载不均导致的通信超时问题。

4、 DistributedSampler 采样阶段导致 NCCL超时:

如果分布式训练中 NCCL 超时问题发生在采样阶段(特别是在使用 DistributedSampler 或自定义的采样器时),可能表明存在某些潜在的问题,这些问题可能导致训练进程之间的同步或数据传输效率低下。以下是一些可能的原因和解决方法:

可能的原因

  1. 数据加载和采样速度问题:
    • 如果采样器的性能不佳,可能会导致数据加载速度变慢,从而影响训练过程。虽然这不会直接导致 NCCL 超时,但它会间接影响整体训练性能。
  2. 进程同步问题:
    • 在使用 DistributedSampler 时,所有进程需要同步以确保数据的一致性。如果采样器在某些进程中出现延迟或阻塞,可能会导致通信超时。
  3. 数据分布不均:
    • 如果数据分布不均,某些进程可能会比其他进程处理更多的数据,从而导致通信延迟和超时问题。
  4. 数据预处理复杂:数据预处理太复杂,会导致数据加载过慢,也有可能导致超时

解决方法:

  1. 优化采样器和数据加载:
    • 确保自定义采样器或 DistributedSampler 以高效的方式进行数据采样和分配。优化数据加载速度,确保每个进程在采样时不会长时间等待。
    • 使用 num_workers 设置合理的数量,以加快数据加载速度,但要注意 CPU 内存和系统负载。
  2. 调整超时时间:
    • 增加 NCCL_TIMEOUT 环境变量值或 dist.init_process_group 中的 timeout 参数,以允许更长的等待时间。

4、基于HugingFace的Trainer多级多卡训练LLM导致NCCL超时

  1. 启动命令前增加了OMP_NUM_THREADS=1 MKL_NUM_THREADS=1,避免多线程导致死锁;
  2. 去掉了加载数据时的tqdm;
  3. 记在数据的DataLoader的drop_last设置为True,pin_memory设置为True,num_workers设置为0;
  4. 设置训练批大小为auto/设置小一点

查阅了一些资料

  1. pytorch 多机多卡卡住问题汇总
  2. Script freezes with no output when using DistributedDataParallel
  3. PyTorch 训练时中遇到的卡住停住等问题
  4. PyTorch训练时,Dataloader卡死、挂起,跑一个epoch停了,问题解决方案
  5. 运行开始训练,卡住半小时,一直不动
  6. 关于炼丹,你是否知道这些细节?
  7. ultralytics/yolov5#7481
  8. https://www.zhihu.com/question/512132168
  9. https://discuss.pytorch.org/t/nccl-timed-out-when-using-the-torch-distributed-run/153276
  10. https://stackoverflow.com/questions/69693950/error-some-nccl-operations-have-failed-or-timed-out