LLM训练-对齐算法综述:RLHF, RLAIF, PPO, DPO and More

参考论文:A Comprehensive Survey of LLM Alignment Techniques: RLHF, RLAIF, PPO, DPO and More

一个完整的LLM训练过程包含以下几步:

  • Model Initialization:加载模型和处理器
  • 数据准备:解析数据集并设置其格式
  • 模型推理:将数据输入到模型中并获取输出
  • 梯度更新:根据损失函数更新模型参数

对齐(alignment)其作用就是让 LLM 与人类的价值观保持一致。在对齐 LLM 方面,基于人类反馈的强化学习(RLHF)是一种突破性的技术。该方法催生了 GPT-4、Claude 和 Gemini 等强大模型。RLHF 之后,人们也探索了多种多样的对齐 LLM 的方法。但是,此前还没有人全面总结对齐 LLM 与人类偏好的方法。

Salesforce 决定填补这一空白,于近日发布了一份 37 页的综述报告,其中按类别总结了现有的研究文献,并详细分析了各篇论文。

Introduction

这篇论文分为四大主题:奖励模型、反馈、强化学习(RL)、优化。每个主题又包含进一步的子主题,如图 1 所示。

xPO LLM 与人类偏好保持一致的 13 个分类方向

奖励模型的子主题包括:1. 显式奖励模型与隐式奖励模型;2. 逐点奖励模型与偏好模型;3. 响应层面的奖励与 token 层面的奖励;4. 负偏好优化。

反馈的子主题包括:1. 偏好反馈与二元反馈;2. 成对反馈与列表反馈;3. 人类反馈与 AI 反馈。

强化学习的子主题包括:1. 基于参考的强化学习与无参考的强化学习;2. 长度控制式强化学习;3. 强化学习中的不同分支;4. 在线策略强化学习与离线策略强化学习

优化的子主题包括:1. 在线 / 迭代式偏好优化与离线 / 非迭代式偏好优化;2. 分离 SFT 和对齐与合并 SFT 和对齐。

Individual Paper Reviews in Detail

1. RLHF/PPO

LLM 的预训练要用到大量来自不同来源的语料库,而这本身就无法确保这些数据集的质量。此外,LLM 的主要目标是预测下一个 token,这个目标与「有用且安全地遵从用户指令」的目标并不一致。因此,LLM 可能会输出不真实、有害或对用户无用的内容。本质上讲,这些模型并未与用户意图对齐。RLHF/PPO 的主要目标是在各种任务上对齐语言模型与用户意图,其做法是使用人类反馈来微调模型。有关这个主题的研究有很多。

2. RLAIF

获取人类偏好数据集的成本不低,因此基于人工智能反馈的强化学习(RLAIF)诞生了。此外,随着 LLM 的能力不断进步,所能收集到的 AI 偏好数据集的质量也不断提高,由此可提升 LLM 的对齐效果。

3.直接人类偏好优化

传统 RLHF 方法通常涉及到优化源自人类偏好的奖励函数。该方法虽有效,但也可能带来一些难题,比如增大计算复杂度以及在估计和优化奖励时需要考虑偏置 – 方差权衡。参阅论文《High-dimensional continuous control using generalized advantage estimation》。

近期有研究探索了其它一些旨在根据人类偏好(无需依赖某个标量的奖励信号)来直接优化 LLM 策略的方法。

这些方法的目标是通过更直接地使用偏好数据来简化对齐流程、降低计算开销以及实现更稳健的优化。通过将该问题描述为一个偏好优化问题,而不是奖励估计和最大化问题,这些方法能提供一种将语言模型与人类判断对齐的不同视角

4.token 级 DPO

使用 DPO 时,奖励会被一起分配给 prompt 和响应。相反,使用 MDP 时,奖励会被分配给各个动作。后续的两篇论文在 token 层面阐述了 DPO 并将其应用扩展到了 token 级的分析。

5.迭代式 / 在线 DPO

使用 DPO 时,会使用所有可用的偏好数据集来对齐 LLM。为了持续提升 LLM,应当实现迭代式 / 在线 DPO。这就引出了一个有趣的问题:如何高效地收集新的偏好数据集。下面两篇论文深入探讨了这一主题。

  • 自我奖励式语言模型,参阅论文《Self-rewarding language models》。
  • CRINGE,参阅论文《The cringe loss: Learning what language not to model》。

6.二元反馈

事实证明,收集偏好反馈比收集二元反馈(比如点赞或点踩)的难度大,因此后者可促进对齐过程的扩展。KTO 和 DRO 这两项研究关注的便是使用二元反馈来对齐 LLM。

  • KTO,Kahneman-Tversky 优化,参阅论文《KTO: Model alignment as prospect theoretic optimization》。
  • DRO,直接奖励优化,参阅论文《Offline regularised reinforcement learning for large language models alignment》。

7.融合 SFT 和对齐

之前的研究主要还是按顺序执行 SFT 和对齐,但事实证明这种方法很费力,并会导致灾难性遗忘。后续的研究有两个方向:一是将这两个过程整合成单一步骤;二是并行地微调两个模型,最终再进行融合。

  • ORPO,比值比偏好优化,参阅论文《ORPO: Monolithic preference optimization without reference model》。
  • PAFT,并行微调,参阅论文《PAFT: A parallel training paradigm for effective llm fine-tuning》。

8.长度控制式 DPO 和无参考 DPO

之前有研究表明,LLM 的输出往往过于冗长。为了解决这个问题,R-DPO 和 SimPO 的关注重心是在不影响生成性能的前提下实现对响应长度的控制。

此外,DPO 必需参考策略来确保已对齐模型不会与参考模型有太大偏差。相较之下,SimPO 和 RLOO 提出了一些方法,可以在不影响 LLM 效果的情况下消除对参考模型的需求

9.逐列表的偏好优化

之前在 PPO 和 DPO 方面的研究关注的是成对偏好,而 RLHF 方面的研究则是收集逐列表的偏好来加速数据收集过程,之后再将它们转换成成对偏好。尽管如此,为了提升 LLM 的性能,直接使用逐列表的数据集来执行偏好优化是可行的。以下三篇论文专门讨论了这种方法。

  • LiPO,逐列表偏好优化,参阅论文《LIPO: Listwise preference optimization through learning-to-rank》。
  • RRHF,参阅论文《RRHF: Rank responses to align language models with human feedback without tears》。
  • PRO,偏好排名优化,参阅论文《Preference ranking optimization for human alignment》。

10.负偏好优化

这些研究有一个共同前提:当前这一代 LLM 已经在翻译和总结等任务上超越了人类性能。因此,可以将 LLM 的输出视为期望响应,而无需依靠将人类标注的数据视为偏好响应;这样做是有好处的。反过来,不期望得到的响应依然也可被用于对齐 LLM,这个过程就是所谓的负偏好优化(NPO)。

  • NN,否定负例方法,参阅论文《Negating negatives: Alignment without human positive samples via distributional dispreference optimization》。
  • NPO,负例偏好优化,参阅论文《Negative preference optimization: From catastrophic collapse to effective unlearning》。
  • CPO,对比偏好优化,参阅论文《Contrastive preference optimization: Pushing the boundaries of llm performance in machine translation》。

11.纳什学习

之前的研究通常是使用逐点奖励和 BT 模型来得到成对偏好。但是,这种方法比不上直接成对偏好建模并且无法解决成对偏好中的不一致问题。为了克服这些局限,一些研究提出了纳什学习方法。

  • 根据人类反馈的纳什学习,参阅论文《Nash learning from human feedback》。
  • SPPO,自博弈偏好优化,参阅论文《A minimaximalist approach to reinforcement learning from human feedback》。
  • DNO,直接纳什优化,参阅论文《Direct nash optimization: Teaching language models to self-improve with general preferences》。

LLM 对齐(Alignment)方法:SFT、PPO、DPO 详细介绍

LLM(大语言模型)的对齐(Alignment)方法旨在让 AI 的输出更加符合人类预期,减少错误信息、有害内容或不准确的回答。主要总结LLM训练中的基本的对齐算法, 监督微调 (SFT)、直接偏好优化 (DPO) 和近端策略优化 (PPO)。

SFT(Supervised Fine-Tuning,监督微调)

监督微调(SFT)是 LLM 训练中的第一步,通过高质量的人工标注数据集对模型进行微调,使其具备基础的任务能力。SFT 是所有对齐方法的基础,如 RLHF、DPO 等都依赖于一个经过 SFT 训练的模型作为初始状态。

过程

  1. 数据准备:收集高质量的指令-响应(Instruction-Response)数据集,例如人类标注的数据或合成的数据。
  2. 模型微调:使用交叉熵损失(Cross-Entropy Loss)训练模型,使其学习提供与标注数据匹配的答案。
  3. 效果:使模型在常见任务(如问答、代码生成、对话等)中表现更好,提高其对指令的遵循能力。

给定输入 x(Prompt) 和目标输出 y(Response),模型的目标是最大化生成目标文本的概率:

其中:

  • Pθ​(yt​∣x,y<t​) 是 LLM 在给定上下文下预测下一个 token yt​ 的概率。
  • 训练时采用交叉熵损失(Cross Entropy Loss)来优化模型参数 θ。

SFT 仅依赖于人工标注数据,无法让模型学习偏好信息(比如不同回答的优劣)。无法动态调整:SFT 训练后,模型固定,难以针对用户反馈进行调整。缺乏探索性:模型只会学到训练数据中的模式,无法进行强化学习优化。

DPO(Direct Preference Optimization,直接偏好优化)

DPO(直接偏好优化)是一种比 RLHF 更简单、更高效的对齐方法。
它不需要训练奖励模型(RM)和使用强化学习(RL),而是直接优化 LLM,使其更符合人类偏好数据

准备偏好数据

  • 每个输入 Prompt 对应两个候选回答:一个优选(Preferred y+),一个劣选(Dispreferred y−)。
  • 例如:

Prompt: “如何写一封正式的电子邮件?”
Response 1 (优选): “在邮件中应保持正式语气,并包含称呼、正文和署名。”
Response 2 (劣选): “随便写就行了,不要太在意格式。”

优化 LLM 使其更倾向于优选回答

  • 计算优选回答的概率Pθ​(y+∣x) 和劣选回答的概率 Pθ​(y−∣x)。
  • 目标是优化模型,使得: Pθ(y+∣x)>Pθ(y−∣x)

损失函数: DPO 直接优化模型输出的偏好分布:LDPO=−log⁡σ(β(log⁡Pθ(y+∣x)−log⁡Pθ(y−∣x)))

其中:

  • σ(⋅) 是 Sigmoid 函数,确保优化方向正确。
  • β 是一个温度参数,控制学习速率。

PPO(Proximal Policy Optimization,近端策略优化)

PPO 是一种强化学习(RL)方法,通常用于 RLHF(基于人类反馈的强化学习)其目标是通过奖励模型(Reward Model, RM)指导 LLM 生成更符合人类偏好的内容

人类偏好数据(Human Preference Data)
人工标注的问答对,并标注哪个回答更优。

奖励模型(Reward Model, RM)
训练一个模型来预测人类对答案的偏好,帮助 LLM 学习优化目标。

强化学习(PPO 训练)
使用 RL 算法(如 PPO)优化 LLM,使其生成更符合人类期望的回答。

PPO 训练流程

  1. 初始化策略模型(LLM)
    • 采用 SFT 训练后的模型 作为 RLHF 训练的起点。
  2. 生成候选回答
    • LLM 生成多个答案,并让奖励模型(RM)进行打分。
  3. 计算奖励
    • 奖励模型对 LLM 的回答 y 计算奖励分数 R(y)。
  4. PPO 更新策略:计算策略更新梯度,优化 LLM 以生成更高奖励的答案。采用截断重要性采样(Clipped Importance Sampling) 控制更新幅度,防止策略崩溃。

PPO 公式

PPO 通过策略梯度(Policy Gradient)优化 LLM 的生成策略:

其中:

  • rt(θ) 是新旧策略的比率。
  • E 是期望,PPO 损失函数的平均值。
  • At​ 是优势函数(Advantage Function),衡量当前策略比旧策略好多少。
  • ϵ是超参数(通常取 0.2),限制更新幅度,防止训练不稳定。

优点

  • 能动态优化 LLM,使其不断学习更符合人类偏好的输出。
  • 能够处理长文本生成任务,适用于 GPT-4、Claude 这类大模型的对齐。

缺点

  • 计算成本高:训练需要采样大量数据,计算复杂。
  • 依赖奖励模型(RM):如果 RM 质量不好,PPO 可能会强化错误模式。
  • 训练不稳定:需要 carefully 选择超参数,避免梯度爆炸或模型崩溃。

奖励模型 RM

该阶段是RHLF的第一个阶段,训练得到一个rm模型用于rl阶段的模型打分,数据可以由人工标注(如 OpenAI 的 RLHF 训练)或合成数据(如基于规则的自动标注)生成。

RM 通常基于预训练的大型语言模型(如 GPT、LLaMA),然后添加一个奖励预测头(Reward Head),用于输出一个标量奖励分数。

例如,对于一个文本回答 R,RM 计算出:Rθ​(x,y)=s

其中:

  • x 是输入文本
  • y 是模型的回答
  • s 是 RM 计算出的偏好分数

为了让 RM 进行有效的偏好学习,通常使用 成对排名损失(Pairwise Ranking Loss) 训练 RM:L=−log⁡(σ(Rθ(x,ypreferred)−Rθ(x,ydispreferred)))

其中:

  • Rθ(x,ypreferred 是偏好回答的得分
  • Rθ(x,ydispreferred)是不被偏好的回答的得分
  • σ 是 Sigmoid 函数

目标
最大化更优答案次优答案的分数差,使 RM 更准确地预测人类偏好。

对比:

代码实现:

基于pytorch、deepspeed、transformers,代码:https://github.com/PKU-Alignment/align-anything/tree/main/align_anything/trainers/text_to_text

sft训练代码:

def loss(self, sft_batch: SupervisedBatch) -> dict[str, torch.Tensor]:
    """Loss function for supervised finetuning."""
    outputs = self.model(**self.infer_batch(sft_batch))
    return {'loss': outputs.loss}

def train_step(self, sft_batch: SupervisedBatch) -> dict[str, Any]:
    """Performs a single training step."""
    loss = self.loss(sft_batch)['loss']
    self.model.backward(loss)
    self.model.step()

    return {
        'train/loss': loss.item(),
        'train/lr': self.model.optimizer.param_groups[0]['lr'],
    }

dpo训练代码:

def compute_log_probs(
    self,
    model: AutoModelForCausalLM,
    batch: PreferenceBatch,
) -> torch.Tensor:
    """Compute log probabilities of given sequences."""
    # 获得所有可能输出的log概率,logits 表示每个 token 位置的 未归一化的概率分布
    logits = model(**self.infer_batch(batch)).logits
    device = logits.device
    input_ids = batch['input_ids']
    #取得每个样本的回复长度,用于截取模型输出
    batch_size = len(batch['meta_info']['response_lens'])
    logprob_list = []
    for idx in range(batch_size):
        response_length = batch['meta_info']['response_lens'][idx]
        # 去除填充 (PAD) token,避免计算无效 token 的概率。
        raw_input_id = strip_pad(input_ids[idx], self.tokenizer.pad_token_id)
        #只保留 回复部分的 logits,丢弃 prompt 部分。 
        logit = logits[idx][-response_length:].unsqueeze(0)
        input_id = raw_input_id[-response_length:].unsqueeze(0)
        #计算对应的better 和worse 序列token 对数概率
        log_p = gather_log_probabilities(logit[:, :-1], input_id[:, 1:])
        logprob_list.append(log_p.squeeze(0))
    # 不同样本的 log_probs 长度不同,使用 pad_sequence 进行 padding,补齐到相同长度。
    return torch.nn.utils.rnn.pad_sequence(
        logprob_list, batch_first=True, padding_value=0.0
    ).to(device)

def loss(  # pylint: disable=too-many-locals
    self,
    batch: PreferenceBatch,
) -> dict[str, torch.Tensor]:
    """Loss function for the DPO algorithm."""
   #计算当前模型 (self.model.module) 在 batch 上的 log 概率。
    sequence_log_probs = self.compute_log_probs(
        self.model.module,
        batch,
    )
# better_sequence_log_probs (用户偏好的回复)
# worse_sequence_log_probs (用户不喜欢的回复)
    (
        better_sequence_log_probs,  # size = (B, L - 1)
        worse_sequence_log_probs,  # size = (B, L - 1)
    ) = sequence_log_probs.chunk(chunks=2, dim=0)
# 计算参考模型 (self.reference_model.module) 的对数概率 (log_probs)。
# reference_model 通常是 原始未优化的模型,作为对比基准。
# torch.no_grad() 表示 不计算梯度,避免影响参考模型。
    with torch.no_grad():
        ref_sequence_log_probs = self.compute_log_probs(  # size = (2 * B, L - 1)
            self.reference_model.module,
            batch,
        )
        ref_better_sequence_log_probs, ref_worse_sequence_log_probs = (
            ref_sequence_log_probs.chunk(chunks=2, dim=0)
        )

    losses = []
    better_sample_rewards = []
    worse_sample_rewards = []

    batch_size = better_sequence_log_probs.size(0)
    for i in range(batch_size):
# 计算 更好/更差回复的总 log 概率(即累加 token 级别 log 概率)。
        better_log_prob = better_sequence_log_probs[i, :].sum(dim=-1)
        worse_log_prob = worse_sequence_log_probs[i, :].sum(dim=-1)
        ref_better_log_prob = ref_better_sequence_log_probs[i, :].sum(dim=-1)
        ref_worse_log_prob = ref_worse_sequence_log_probs[i, :].sum(dim=-1)
# 当前模型比参考模型更偏好 better 回复 的程度。
        better_log_ratio = better_log_prob - ref_better_log_prob
# 当前模型比参考模型更偏好 worse 回复 的程度。
        worse_log_ratio = worse_log_prob - ref_worse_log_prob
# 计算 better 和 worse 的 log 比值差
# 使用 -logsigmoid(x) 计算负对数 sigmoid 损失,优化模型使其更倾向 better 回复。
# logsigmoid 的性质:
# 如果 x 很大,logsigmoid(x) ≈ 0,意味着损失小,模型已经正确偏好 better response。
# 如果 x 很小或负,logsigmoid(x) ≈ x,意味着损失大,模型没有正确区分 better 和 worse,需要优化。
        losses.append(
            -F.logsigmoid(
                self.cfgs.train_cfgs.scale_coeff * (better_log_ratio - worse_log_ratio),
            ),
        )
        better_sample_rewards.append(
            self.cfgs.train_cfgs.scale_coeff * better_log_ratio.detach(),
        )
        worse_sample_rewards.append(self.cfgs.train_cfgs.scale_coeff * worse_log_ratio.detach())
    loss = torch.stack(losses).mean()  # size = ()
    better_sample_reward = torch.stack(better_sample_rewards)  # size = (B,)
    worse_sample_reward = torch.stack(worse_sample_rewards)  # size = (B,)
# 计算 奖励 (reward)、准确率 (accuracy) 和奖励间距 (margin)。
    reward = better_sample_reward + worse_sample_reward  # size = (B,)
    reward_accuracy = (better_sample_reward > worse_sample_reward).float().mean()  # size = ()
    reward_margin = better_sample_reward - worse_sample_reward  # size = (B,)

    return {
        'loss': loss,
        'reward': reward,
        'better_sample_reward': better_sample_reward,
        'worse_sample_reward': worse_sample_reward,
        'reward_accuracy': reward_accuracy,
        'reward_margin': reward_margin,
    }

def train_step(
    self,
    batch: PreferenceBatch,
) -> dict[str, Any]:
    """Perform a single training step for DPO."""
    loss_dict = self.loss(batch=batch)
    loss = loss_dict['loss']
    self.model.backward(loss)
    self.model.step()

    with torch.no_grad():
        reward = loss_dict['reward'].mean()
        better_sample_reward = loss_dict['better_sample_reward'].mean()
        worse_sample_reward = loss_dict['worse_sample_reward'].mean()
        reward_accuracy = loss_dict['reward_accuracy']
        reward_margin = loss_dict['reward_margin'].mean()

        loss = get_all_reduce_mean(loss)
        reward = get_all_reduce_mean(reward)
        better_sample_reward = get_all_reduce_mean(better_sample_reward)
        worse_sample_reward = get_all_reduce_mean(worse_sample_reward)
        reward_accuracy = get_all_reduce_mean(reward_accuracy)
        reward_margin = get_all_reduce_mean(reward_margin)

    return {
        'train/loss': loss.item(),
        'train/reward': reward.item(),
        'train/better_sample_reward': better_sample_reward.item(),
        'train/worse_sample_reward': worse_sample_reward.item(),
        'train/reward_accuracy': reward_accuracy.item(),
        'train/reward_margin': reward_margin.item(),
        'train/lr': self.model.optimizer.param_groups[0]['lr'],
    }

ppo训练代码:

#使用策略模型 (Actor Model) 生成文本,并返回其 input_ids 和 attention_mask。
def actor_step(self, mini_prompt_only_batch: PromptOnlyBatch) -> dict[str, Any]:
    infer_batch = self.infer_batch(mini_prompt_only_batch)
    actor_batch = copy.deepcopy(infer_batch)
    sequences = self.actor_model.module.generate(
        **infer_batch,
        generation_config=self.generation_config,
        synced_gpus=True,
        do_sample=True,
    )
    attention_mask = sequences.not_equal(self.tokenizer.pad_token_id)
    actor_batch['input_ids'] = sequences
    actor_batch['attention_mask'] = attention_mask

    return actor_batch
# 计算奖励值 (reward) 和对抗奖励值 (reward_values)。 
def reward_model_step(self, actor_batch: PromptOnlyBatch) -> dict[str, Any]:
        reward_batch = copy.deepcopy(actor_batch)
        if self.reward_tokenizer is not self.tokenizer:
            reward_tokenize_output = batch_retokenize(
                actor_batch['input_ids'],
                src_tokenizer=self.tokenizer,
                dest_tokenizer=self.reward_tokenizer,
                skip_special_tokens=True,
                device=self.args.device,
            )
            reward_batch['input_ids'] = reward_tokenize_output['input_ids']
            reward_batch['attention_mask'] = reward_tokenize_output['attention_mask']
        reward_infer_batch = self.reward_infer_batch(reward_batch)
        reward_batch['reward'] = self.reward_model(**reward_infer_batch).end_scores.squeeze(dim=-1)
        critic_infer_batch = self.reward_infer_batch(actor_batch)
        scores = self.reward_critic_model(**critic_infer_batch).scores
        reward_batch['reward_values'] = scores.squeeze(dim=-1)[:, :-1]

        return reward_batch
#冻结模型参数,避免影响训练,采样多个 mini-batch,生成文本,计算奖励,计算 log 概率 (log_probs),计算参考模型的 log 概率 (ref_log_probs)
  @torch.no_grad()
    def rollout(self, prompt_only_batch: PromptOnlyBatch) -> list[dict[str, Any]]:
        """Rollout a batch of experiences."""
        # freeze the model for rolling out
        self.set_train(mode=False)

        total_batch_size = prompt_only_batch['input_ids'].size(0)
        micro_batch_size = int(self.cfgs.train_cfgs.per_device_train_batch_size)
        micro_inference_batches = []
        micro_training_batches = []
        mini_batch = {}
        for i in range(0, total_batch_size, micro_batch_size):

            mini_batch = {
                key: prompt_only_batch[key][i : i + micro_batch_size] for key in prompt_only_batch
            }

            # actor generation
            actor_batch = self.actor_step(mini_batch)
            # reward model and reward critic model scoring
            reward_batch = self.reward_model_step(actor_batch)
            # calculate the log probabilities
            logits = self.actor_model(**actor_batch).logits
            ref_logits = self.actor_reference_model(**actor_batch).logits
            log_probs = gather_log_probabilities(logits[:, :-1], actor_batch['input_ids'][:, 1:])
            ref_log_probs = gather_log_probabilities(
                ref_logits[:, :-1], actor_batch['input_ids'][:, 1:]
            )

            micro_training_batch = {}
            micro_training_batch['prompt_idx'] = mini_batch['input_ids'].size(-1) - 1
            micro_training_batch['log_probs'] = log_probs
            micro_training_batch['ref_log_probs'] = ref_log_probs
            micro_training_batch['reward'] = reward_batch['reward']
            micro_training_batch['reward_values'] = reward_batch['reward_values']

            mini_batch['input_ids'] = reward_batch['input_ids']
            mini_batch['attention_mask'] = actor_batch['attention_mask']
            # add rollout results to the batches
            micro_inference_batches.append(mini_batch)
            micro_training_batches.append(micro_training_batch)

        # unfreeze the model for training
        self.set_train()

        return micro_inference_batches, micro_training_batches

#计算策略梯度损失
# 计算 PPO 损失函数:
# ratios = exp(new_log_probs - old_log_probs)(新旧策略比)。
# 裁剪 ratios 避免策略剧烈变化(PPO 关键)。
# return -masked_mean(surrogate, mask):最大化优势 𝐴𝑡
   
def actor_loss_fn(
        self,
        log_probs: torch.Tensor,  # size = (B, L - S)
        old_log_probs: torch.Tensor,  # size = (B, L - S)
        advantages: torch.Tensor,  # size = (B, L - S)
        mask: torch.BoolTensor,  # size = (B, L - S)
    ) -> torch.Tensor:  # size = ()
        # size = (B, L - S)
        ratios = torch.exp(log_probs - old_log_probs)
        surrogate1 = advantages * ratios
        surrogate2 = advantages * torch.clamp(
            ratios,
            1.0 - self.clip_range_ratio,
            1.0 + self.clip_range_ratio,
        )
        surrogate = torch.minimum(surrogate1, surrogate2)
        return -masked_mean(surrogate, mask)  # size = ()
#  rl_step函数是训练过程中使用强化学习(RL)更新策略的一步。在PPo算法中,rl_step是用来更新策略网络(actor)和价值网络(critic)的一部分。具体来说,这个函数通过计算强化学习损失(actor loss和critic loss),并通过反向传播优化这两个网络。
# reward_critic_model 评估奖励函数的 价值估计,用于计算 优势函数 𝐴𝑡不是直接计算奖励,而是估算未来可能获得的奖励。主要用于时间差分(TD learning)更新策略,类似于 价值函数。

def rl_step(
        self, inference_batch: dict[str, torch.Tensor], training_batch: dict[str, torch.Tensor]
    ) -> dict[str, Any]:
        """Perform a single update step with RL loss."""
        old_log_probs = training_batch['log_probs']
        ref_log_probs = training_batch['ref_log_probs']
        reward = training_batch['reward']
        old_reward_values = training_batch['reward_values']
        start = training_batch['prompt_idx']

        input_ids = inference_batch['input_ids']
        attention_mask = inference_batch['attention_mask']

        sequence_mask = attention_mask[:, 1:]

        with torch.no_grad():
            old_rewards = self.add_kl_divergence_regularization(
                reward,
                old_log_probs,
                ref_log_probs,
                sequence_mask,
            )
            reward_advantages, reward_returns = self.get_advantages_and_returns(
                old_reward_values,
                old_rewards,
                sequence_mask,
                start,
            )

        logits = self.actor_model(**inference_batch, use_cache=False).logits
        log_probs = gather_log_probabilities(logits[:, :-1], input_ids[:, 1:])
        actor_loss = self.actor_loss_fn(
            log_probs[:, start:],
            old_log_probs[:, start:],
            reward_advantages,
            sequence_mask[:, start:],
        )
        self.actor_model.backward(actor_loss)
        self.actor_model.step()

        reward_values = self.reward_critic_model(**inference_batch).scores
        reward_values = reward_values.squeeze(dim=-1)[:, :-1]
        reward_critic_loss = self.critic_loss_fn(
            reward_values[:, start:],
            old_reward_values[:, start:],
            reward_returns,
            sequence_mask[:, start:],
        )
        self.reward_critic_model.backward(reward_critic_loss)
        self.reward_critic_model.step()

        with torch.no_grad():
            mask = sequence_mask[:, start:]
            kl_divergence = ((old_log_probs - ref_log_probs)[:, start:] * mask).sum(dim=-1).mean()
            mean_generated_length = mask.sum(dim=-1).float().mean()
            max_generated_length = mask.sum(dim=-1).float().max()

            reward = reward.mean()
            reward_with_kl_penalty = (old_rewards[:, start:] * mask).sum(dim=-1).mean()
            reward_advantage = masked_mean(reward_advantages, mask)
            reward_return = masked_mean(reward_returns, mask)
            reward_value = masked_mean(reward_values[:, start:], mask)

            actor_loss = get_all_reduce_mean(actor_loss)
            reward_critic_loss = get_all_reduce_mean(reward_critic_loss)
            reward = get_all_reduce_mean(reward)
            reward_with_kl_penalty = get_all_reduce_mean(reward_with_kl_penalty)
            reward_advantage = get_all_reduce_mean(reward_advantage)
            reward_return = get_all_reduce_mean(reward_return)
            reward_value = get_all_reduce_mean(reward_value)
            kl_divergence = get_all_reduce_mean(kl_divergence)
            mean_generated_length = get_all_reduce_mean(mean_generated_length)
            max_generated_length = get_all_reduce_max(max_generated_length)

        dist.barrier()

        return {
            'train/actor_loss': actor_loss.item(),
            'train/reward_critic_loss': reward_critic_loss.item(),
            'train/reward': reward.item(),
            'train/reward_with_kl_penalty': reward_with_kl_penalty.item(),
            'train/reward_advantage': reward_advantage.item(),
            'train/reward_return': reward_return.item(),
            'train/reward_value': reward_value.item(),
            'train/kl_divergence': kl_divergence.item(),
            'train/actor_lr': self.actor_model.optimizer.param_groups[0]['lr'],
            'train/reward_critic_lr': self.reward_critic_model.optimizer.param_groups[0]['lr'],
            'train/mean_generated_length': mean_generated_length.item(),
            'train/max_generated_length': max_generated_length.item(),
        }

    def ptx_step(self, ptx_batch: dict[str, torch.Tensor]) -> dict[str, Any]:
        """Perform a single update step with PTX loss."""
        ptx_loss = self.actor_model(**self.infer_batch(ptx_batch)).loss
        self.actor_model.backward(self.ptx_coeff * ptx_loss)
        self.actor_model.step()
        ptx_loss = get_all_reduce_mean(ptx_loss)
        return {
            'train/ptx_loss': ptx_loss.item(),
        }

    def train(self) -> None:
        """Train the model."""
        self.logger.print('***** Running training *****')

        progress_bar = tqdm(
            total=self.total_training_steps,
            desc=f'Training 1/{self.cfgs.train_cfgs.epochs} epoch',
            position=0,
            leave=True,
            disable=not is_main_process(),
        )

        if self.cfgs.data_cfgs.eval_datasets:
            self.logger.print('\n***** Evaluating at the beginning *****')
            self.eval()

        num_prompt_only_batches = len(self.prompt_only_dataloader)
        num_ptx_batches = len(self.ptx_dataloader)
        num_ptx_replicas = (num_prompt_only_batches + num_ptx_batches - 1) // num_ptx_batches
        for epoch in range(int(self.cfgs.train_cfgs.epochs)):
            for prompt_only_batch, ptx_batch in zip(
                self.prompt_only_dataloader,
                itertools.chain.from_iterable([self.ptx_dataloader] * num_ptx_replicas),
            ):
                inference_batches, training_batches = self.rollout(prompt_only_batch)

                if self.use_ptx:
                    ptx_batches = self.split_ptx_micro_batches(ptx_batch)
                else:
                    ptx_batches = [None for _ in range(len(inference_batches))]
                torch.cuda.empty_cache()

                for _ in range(self.cfgs.train_cfgs.update_iters):
                    for inference_batch, training_batch, ptx_batch in zip(
                        inference_batches, training_batches, ptx_batches
                    ):
                        rl_info = self.rl_step(inference_batch, training_batch)

                        torch.cuda.empty_cache()
                        self.logger.log(rl_info, step=self.global_step)
                        if self.use_ptx:
                            ptx_info = self.ptx_step(ptx_batch)
                            torch.cuda.empty_cache()
                            self.logger.log(ptx_info, step=self.global_step)

                        self.global_step += 1
                        progress_bar.set_description(
                            f'Training {epoch + 1}/{self.cfgs.train_cfgs.epochs} epoch '
                            f'(reward {rl_info["train/reward"]:.4f})',
                        )
                        progress_bar.update(1)

                        if self.global_step % self.cfgs.logger_cfgs.save_interval == 0:
                            self.logger.print(f'Saving checkpoint at step {self.global_step} ...')
                            self.save(tag=self.global_step)
                            self.logger.print('Checkpoint saved.')

                        if (
                            self.cfgs.data_cfgs.eval_datasets
                            and self.cfgs.train_cfgs.eval_strategy == 'steps'
                            and self.global_step % self.cfgs.train_cfgs.eval_interval == 0
                        ):
                            self.logger.print(
                                f'\n***** Evaluating at step {self.global_step} *****',
                            )
                            self.eval()

RM奖励模型训练代码:

    def loss(
        self,
        batch: PreferenceBatch,
    ) -> dict[str, torch.Tensor]:
        """Loss function for the reward model."""
        (
            better_input_ids,  # size = (B, L)
            worse_input_ids,  # size = (B, L)
        ) = batch[
            'input_ids'
        ].chunk(chunks=2, dim=0)
        assert better_input_ids.size(0) == worse_input_ids.size(0), 'batch size mismatch!'

# scores:一般来说,这代表模型在每个时间步骤(或输入分段)上的奖励得分,通常是一个形状为 (B, L, 1) 的张量,其中 B 是批量大小,L 是输入序列的长度,1 是奖励得分的维度。
#end_scores:通常表示输入序列的结束阶段的奖励得分,这可能是在整个序列处理完成后,模型计算出的最终奖励。
        output = self.model(**self.infer_batch(batch))
        scores = output.scores
        end_scores = output.end_scores
        higher_rewards, lower_rewards = scores.squeeze(dim=-1).chunk(chunks=2, dim=0)
        higher_end_reward, lower_end_reward = end_scores.squeeze(dim=-1).chunk(chunks=2, dim=0)

        loss = -F.logsigmoid(higher_end_reward - lower_end_reward).mean()

        if self.cfgs.train_cfgs.regularization > 0.0:
            loss = (
                loss
                + self.cfgs.train_cfgs.regularization
                * torch.stack([lower_end_reward, higher_end_reward]).square().mean()
            )

        accuracy = (higher_end_reward > lower_end_reward).float().mean()  # size = ()
        return {
            'loss': loss,  # size = ()
            'higher_end_reward': higher_end_reward,  # size = (B,)
            'lower_end_reward': lower_end_reward,  # size = (B,)
            'higher_rewards': higher_rewards,  # size = (B, L)
            'lower_rewards': lower_rewards,  # size = (B, L)
            'accuracy': accuracy,  # size = ()
        }

    def train_step(
        self,
        batch: PreferenceBatch,
    ) -> dict[str, Any]:
        """Perform a single training step."""
        loss_dict = self.loss(batch)
        loss = loss_dict['loss']
        self.model.backward(loss)
        self.model.step()

        accuracy = loss_dict['accuracy']

        loss = get_all_reduce_mean(loss)
        accuracy = get_all_reduce_mean(accuracy)

        return {
            'train/loss': loss.item(),
            'train/accuracy': accuracy.item(),
            'train/lr': self.model.optimizer.param_groups[0]['lr'],
        }

相关论文:

  • KTO,Kahneman-Tversky 优化,参阅论文《KTO: Model alignment as prospect theoretic optimization》。
  • DRO,直接奖励优化,参阅论文《Offline regularised reinforcement learning for large language models alignment》。
  • SimPO,简单偏好优化,参阅论文《SimPO: Simple preference optimization with a reference-free reward》

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注