Direct Preference Optimization (DPO)

Peng Xia

Direct Preference Optimization (DPO) 是一种专为大型语言模型(LLMs)设计的训练方法,旨在通过人类偏好数据来优化模型,而无需使用复杂的强化学习算法(如 Proximal Policy Optimization, PPO)。DPO 的核心思想是通过偏好数据直接调整模型参数,绕过显式奖励模型的拟合以及复杂的强化学习优化过程。

preliminaries

KL 散度

Kullback-Leibler 散度(KL 散度),又称为相对熵,是信息论中的一个重要概念。它用于衡量两个概率分布之间的差异,特别是衡量用一个分布 去近似另一个目标分布 时的效率损失。

KL 散度可以理解为两种分布之间的 “信息差异”。具体而言,KL 散度衡量的是用分布 来编码分布 所需要的额外信息量。假设分布 是我们想要捕捉的真实分布,而我们用分布 来表示它。如果 偏离 ,我们在编码时会付出信息损失。KL 散度值越小,表示两个分布越接近。


其中: 是“真实分布”或“目标分布”; 是“估计分布”或“模型分布”。KL 散度衡量的是,如果 是 “真实” 的概率分布,而我们使用 来表示 ,我们将损失多少信息。

KL 散度具有以下重要性质:

  • 非负性,且当且仅当 时, 。这意味着两个分布完全一致时,KL 散度为零。
  • 非对称性。KL 散度并不是衡量两个分布对称差异的度量,所以 的值可能不同。

假设有两个分布:

  • (真实分布)
  • (模型预测)

!!! 计算 KL 散度时,必须确保两个分布都“合法” !!!,否则 KL 散度要么不定义,要么变为无穷大(∞)。

  1. 都是合法的概率分布:

    • 所有元素满足

  2. 对于所有 如果 ,则必须有 ,否则:

    • 会导致 log 取无穷;

    • 数学上定义为


BT模型

Bradley–Terry 模型是用于成对比较(pairwise comparison)的概率模型,广泛用于排名、偏好建模、体育竞技建模等场景。

设有一组对象 ,每个对象 都具有一个正数能力值 。对于两个对象 ,Bradley–Terry 模型定义它们之间比较时 胜过 (符号为 ,不是 ) 的概率为:

假设我们有两个人:Alice 的能力值 , Bob 的能力值 那么 Alice 赢过 Bob 的概率为:


但更广义的表达是,假设有一个函数 ,可以计算每个对象的能力值,两个对象 偏好概率为:


在 ML 里,为了方便建模,我们通常设置:,这样偏好概率就变成:

与原始公式的区别在于以 e 为底,如果直接用一个线性模型或 LLM 计算 ,那么 可能是负数,不能直接当作能力值。所以我们取指数:。同时Sigmoid 和 softmax 的梯度在深度学习中非常稳定,有助于收敛。

DPO 原理

设计 DPO 的初始目的

image-20250624175053638

DPO optimizes for human preferences while avoiding reinforcement learning. Existing methods for fine-tuning language models with human feedback first fit a reward model to a dataset of prompts and human preferences over pairs of responses, and then use RL to find a policy that maximizes the learned reward. In contrast, DPO directly optimizes for the policy best satisfying the preferences with a simple classification objective, fitting an implicit reward model whose corresponding optimal policy can be extracted in closed form.

DPO 是设计来解决 RLHF 或者 PPO的一个缺点的,即奖励模型 reward model。奖励模型本质上是基于人类打分数据来近似人类的偏好的,其训练前需要采集相对推理数据,和人类偏好结果。

In the second phase the SFT model is prompted with prompts x to produce pairs of answers (y1, y2) ∼ π(y|x). These are then presented to human labelers who express preferences for one answer, denoted as yw ≻ yl | x where yw and yl denotes the preferred and dispreferred completion amongst (y1, y2) respectively.

而且在语言模型领域,奖励模型是从微调对象模型来初始化的,通常是在 transformer 最后加上一层线性层,产生 reward 的标量。

In the context of LMs, the network rφ(x, y) is often initialized from the SFT model π(y|x) with the addition of a linear layer on top of the final transformer layer that produces a single scalar prediction for the reward value .

而且 PPO 是需要分别计算 reward 和 advantage 的,这意味着你需要额外训练两个模型分别作为 reward model 和 advantage model,这就是 Actor-Critic 框架。因此 PPO 训练需要 4 个模型,训练模型,参考模型 (用于产生推理数据和计算 KL 散度),reward model 和 advantage model。奖励模型不仅需要很多数据来单独训练,而且需要额外显存,所以最好把奖励模型剔除。这就是 DPO 的核心任务,但剔除奖励模型后,如何量化 policy (模型的推理)成为了新的问题

Motivated by the challenges of applying reinforcement learning algorithms on large-scale problems such as fine-tuning language models, our goal is to derive a simple approach for policy optimization using preferences directly.

Unlike prior RLHF methods, which learn a reward and then optimize it via RL, our approach leverages a particular choice of reward model parameterization that enables extraction of its optimal policy in closed form, without an RL training loop.

DPO 的做法就是基于**!!! 从奖励模型到最佳策略的分析映射关系,将基于奖励的 loss 函数转换为基于策略的 !!!**。这里记住这个映射关系。

our key insight is to leverage an analytical mapping from reward functions to optimal policies, which enables us to transform a loss function over reward functions into a loss function over policies.


原始目标函数

接下来就是公式分析,从最原始的基于奖励模型的目标开始:

这个公式由两项组成

  • 代表最大化 reward(其中 出自数据集 , , 出自最佳模型 的推理
  • 代表最佳模型 和参考模型 的 KL 散度,这个要限制漂移,因为训练数据是参考模型生成的在训练模型上训练的。

其中 是隐式奖励模型。核心目标是找到一个最佳的策略 ,使得最终的得分期望最高,同时不能与原模型差的太多。


奖励模型–最佳策略的分析映射关系

以下推理我们假定奖励模型已经确定

我们将KL 散度展开为期望格式, 被省略了:

将其与奖励项合并

由于 ML 里一般用最小化来表示目标,所以统一乘以 将 argmax 变成 argmin

我们继续合并里面的公式,将整体合并成一个log,这里分子本身只有一个发布项,所以不要动分子。

然后目标函数变成了类似于 KL 散度的形式,分子是一个确定存在的分布。

但是分母的分布不一定合法,概率和不一定为 1, 所以我们将分母归一化,归一化因子 ,其实就是一个常数,值为原分布的概率和。(PS: 上面 KL 散度章节有解释)

转换后的分母是一个复合分布

归一化的目标函数变为

其中 是常数,可以忽略,忽略它后 就是 KL 散度形式。

回忆下 KL 散度的非负性,,且当且仅当 时, 。这意味着两个分布完全一致时,KL 散度为零。因此我们最优模型是

但其实我们没法直接得到 ,因为其中 还是未知的,我们以上只是当作它已知。但**!!! 我们已经得到了最优策略和奖励模型之间的映射关系 !!!**。将上面公式变换可得到奖励模型的公式,就得到了理论上的奖励模型

基于BT模型的新目标函数

Fortunately, the Bradley-Terry model depends only on the difference of rewards between two completions. Substituting the reparameterization in Eq. 5 for r∗(x, y) into the preference model Eq. 1, the partition function cancels, and we can express the human preference probability in terms of only the optimal policy and reference policy.


因为 BT 模型只取决于两个推理之间的奖励差,我们就可以把 BT 模型中的 reward 换成上述公式,整理完后公式里只有最佳策略模型和参考模型。其中 是代表 loser,不偏好的回答, 代表 winner,用户偏好的回答。

Now that we have the probability of human preference data in terms of the optimal policy rather than the reward model, we can formulate a maximum likelihood objective for a parametrized policy πθ.

在拿到人类在最佳策略模型于参考模型的偏好概率后,我们就可以为策略生成一个最大似然公式,顺便调整了 的顺序。

DPO 的梯度更新通过以下公式计算 ( PS: )

其中, 是隐式奖励函数,定义为:


论文中有一段说明,如下图:

其中有 2 部分:

  1. 表示对于一个策略,要偏向于增加出现 的可能性,降低出现 的可能性;
  2. 表示当奖励模型 评估出现错误时,可获得更高的权重

梯度更新的直观理解是增加优胜者的生成概率,减少劣胜者的生成概率。

Intuitively, the gradient of the loss function LDPO increases the likelihood of the preferred completions yw and decreases the likelihood of dispreferred completions yl.

以上是 DPO 的所有公式推理,整体上基于正常 RL 目标得到奖励模型与最佳模型的映射关系,然后代入基于 BT 模型的 RL 目标


Loss 函数

总结以下,DPO loss 为

主要参数如下:

  • 是要微调的大模型,即策略模型
  • 是参考模型,即冻结的强化学习前模型
  • 是偏好数据
  • 是数据集 中采样的 prompt
  • 对应的人类偏好的回答
  • 对应的人类不偏好的回答
  • 是超参,用于控制偏离 的程度

The DPO loss function can be broken down into two main terms, the first term represents the log probability of the human-preferred response . This term aims to maximize the probability of as generated by the model , relative to the reference model . The division by serves as a regularizing factor, ensuring that the fine-tuning does not cause the model to deviate excessively from its original training. Maximizing this term effectively increases the likelihood of generating responses similar to in response to inputs like , reinforcing the human preference patterns. Conversely, the second term focuses on minimizing the log probability of the human-dispreferred response . This is achieved by reducing the model’s tendency to generate type responses, as indicated by the negative sign.

DPO 损失函数可以分为两个主要部分:第一项表示人类偏好回答 的对数概率。这一项的目标是最大化模型 生成 的概率,相对于参考模型 。这里除以 起到了正则化的作用,用以确保微调过程不会使模型过度偏离其原始训练状态。最大化这一项,实质上是在提高 在输入 下生成类似 的回答的可能性,从而强化人类的偏好模式。注意原始目标函数里是有策略模型和参考模型的 KL 散度的,最后转换变成了这样子,但实际还是带 KL 散度控制的,只是最终公式里没有显式的出现

相反,第二项的目标是最小化人类不偏好回答 的对数概率。由于负号的存在,这一项通过降低模型生成类似 的回答倾向,来达到惩罚不良回答的效果。

DPO from scratch

下面是我手搓的 DPO。我参考了 Allam https://github.com/0xallam/Direct-Preference-Optimization.git 这个 repo 中关于 log prob 和 DPO loss 的写法。Allam 和我在计算 log prob 上有些差别,我只计算了回答部分,而且计算前还 shift label了,Allam 计算了 prompt 所有部分的。
数据集预处理也部分参考了 Allam 的写法,但是额外遵守了 qwen 原本的 chat template。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import os

os.environ["WANDB_MODE"] = "offline" # Disable Weights & Biases logging
os.environ['CUDA_VISIBLE_DEVICES'] = '1' # Set visible GPUs if needed
import random, torch, wandb, copy
import torch.nn as nn
import numpy as np
from tqdm import tqdm
from functools import partial
import torch.nn.functional as F
from torch.optim import AdamW

from torch.utils.data import DataLoader
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, DataCollatorForSeq2Seq


max_length = 512
batch_size = 8
model_name_or_path = "Qwen/Qwen2.5-0.5B-Instruct"

config = {
"max_length": max_length,
"batch_size": batch_size,
"model_name_or_path": model_name_or_path,
}

wandb.init(project='DPO_scratch', config=config)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token

# 策略模型
model = AutoModelForCausalLM.from_pretrained(
model_name_or_path,
torch_dtype=torch.bfloat16,
device_map="auto"
)
# 参考模型
ref_model = copy.deepcopy(model)
ref_model.eval()

def freeze_model(model):
model.eval()
for param in model.parameters():
param.requires_grad = False

freeze_model(ref_model)

数据集 tokenize

DPO 数据集 belike

1
2
3
4
5
6
7
8
{
'id': '04c275bf738fd391b7fe25e25fe7bed3',
'source': 'truthy_dpo',
'system': 'You are an unbiased, uncensored, helpful assistant.',
'prompt': "What's the nearest national park to you?",
'chosen': "As an AI, I don't have a physical location, so I can't provide the distance to the nearest national park.",
'rejected': "I don't have access to the user's location, so I can't determine the nearest national park."
}

只有提示词相关的字段有用: system, prompt, chosen, rejected, 其中 system 和 prompt 组成了整体的 prompt, chosen 和 rejected 分别作为同一个 prompt 的偏好回答和不偏好回答。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
ds = load_dataset("jondurbin/truthy-dpo-v0.1")

def tokenize_fn(sample, tokenizer, max_length:int=None):

if max_length is None:
max_length = tokenizer.model_max_length

system_message_content = sample["system"]
user_message_content = sample["prompt"]
chosen_assistant_message_content = sample["chosen"]
rejected_assistant_message_content = sample["rejected"]

# === 仅 prompt ===
messages_without_response = [
{"role": "system", "content": system_message_content},
{"role": "user", "content": user_message_content}
]
# 由于 tokenizer.apply_chat_template 没法实现截断,所以分为两步
messages_without_response_str = tokenizer.apply_chat_template(
messages_without_response,
tokenize=False,
add_generation_prompt=True,
)
messages_without_response_input_ids = tokenizer(
messages_without_response_str,
max_length=max_length,
truncation=True,
)['input_ids']

# === 带偏好回答 ===
messages_with_chosen_response = messages_without_response + [
{"role": "assistant", "content": chosen_assistant_message_content}
]
messages_with_chosen_response_str = tokenizer.apply_chat_template(
messages_with_chosen_response,
tokenize=False,
add_generation_prompt=False,
)
messages_with_chosen_response_input_ids = tokenizer(
messages_with_chosen_response_str,
max_length=max_length,
truncation=True,
)['input_ids']
messages_with_chosen_response_lables = copy.deepcopy(messages_with_chosen_response_input_ids)
for i in range(len(messages_without_response_input_ids)):
messages_with_chosen_response_lables[i] = -100 # 将 prompt 部分的标签设置为 -100,表示不计算损失
messages_with_chosen_response_attention_mask = [1] * len(messages_with_chosen_response_input_ids)

# === 带不偏好回答 ===
messages_with_rejected_response = messages_without_response + [
{"role": "assistant", "content": rejected_assistant_message_content}
]
messages_with_rejected_response_str = tokenizer.apply_chat_template(
messages_with_rejected_response,
tokenize=False,
add_generation_prompt=False,
)
messages_with_rejected_response_input_ids = tokenizer(
messages_with_rejected_response_str,
max_length=max_length,
truncation=True,
)['input_ids']
messages_with_rejected_response_lables = copy.deepcopy(messages_with_rejected_response_input_ids)
for i in range(len(messages_without_response_input_ids)):
messages_with_rejected_response_lables[i] = -100 # 将 prompt 部分的标签设置为 -100,表示不计算损失
messages_with_rejected_response_attention_mask = [1] * len(messages_with_rejected_response_input_ids)

length = max(
len(messages_with_chosen_response_input_ids),
len(messages_with_rejected_response_input_ids)
)

ret = {
"respone_preferred": {
"input_ids": messages_with_chosen_response_input_ids,
"attention_mask": messages_with_chosen_response_attention_mask,
"labels": messages_with_chosen_response_lables,
},
"respone_rejected": {
"input_ids": messages_with_rejected_response_input_ids,
"attention_mask": messages_with_rejected_response_attention_mask,
"labels": messages_with_rejected_response_lables,
},
"length": length,
}

return ret

ds_tokenized = ds.map(
partial(tokenize_fn, tokenizer=tokenizer, max_length=max_length),
remove_columns=ds['train'].column_names,
)
# ds_tokenized = ds_tokenized.sort("length", reverse=True)

处理为 batch tensor 数据

其实每个 sample 里有两组 inputs,处理逻辑和正常数据集没什么差别。
还有 dataloader 构造时不要直接都放到 cuda 上,显存会直接都占用了,最好在迭代时将 cpu 上的数据移动到 cuda。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def collate_fn(batch, data_collator, device=torch.device("cpu")):

respone_preferred_list = [item['respone_preferred'] for item in batch]
respone_rejected_list = [item['respone_rejected'] for item in batch]

respone_preferred_batch_data = data_collator(respone_preferred_list)
respone_rejected_batch_data = data_collator(respone_rejected_list)
if device.type != 'cpu':
respone_preferred_batch_data = {k: v.to(device) for k, v in respone_preferred_batch_data.items()}
respone_rejected_batch_data = {k: v.to(device) for k, v in respone_rejected_batch_data.items()}

ret = {
"respone_preferred_batch_data": respone_preferred_batch_data,
"respone_rejected_batch_data": respone_rejected_batch_data,
}
return ret

data_collator = DataCollatorForSeq2Seq(
tokenizer=tokenizer,
max_length=max_length,
padding="longest",
)

train_dataloader = DataLoader(
ds_tokenized['train'],
batch_size=batch_size,
shuffle=True, # 如果提前按长度排序了,不要shuffle,我个人习惯先跑最大的batch来确保不会OOM
collate_fn=partial(collate_fn, data_collator=data_collator),
)

计算 DPO loss 的函数

DPO 公式是

其中 代表 winner,用户偏好的回答, 是代表 loser,不偏好的回答。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def get_log_prob_by_labels(logits, labels):
"""
计算 $、log \pi(y|x)$
"""

# logits: [B, L, V] labels: [B, L]
assert logits.shape[:2] == labels.shape

# 先 shift logits 和 labels
shift_logits = logits[:, :-1, :] # [B, L-1, V]
shift_labels = labels[:, 1:] # [B, L-1]
shift_mask = (shift_labels != -100) # [B, L-1]

# 如果用原始的 shift_mask 去执行 torch.gather 会导致异常,因为 -100 不是合法的索引
# 这里我们暂时把 -100 变成 0,然后再 ele-wise 乘以 mask,-100/0 位置的 logits 都会被置为 0
safe_labels = shift_labels.clone()
safe_labels[~shift_mask] = 0

# 先计算 log prob,再取出 labels 对应的 log prob,最后 mask 处理-100 位置,最后取 mean
log_probs = F.log_softmax(shift_logits, dim=-1) # [B, L-1, V]
log_probs_at_labels = torch.gather(log_probs, 2, safe_labels.unsqueeze(-1)).squeeze(-1) # [B, L-1]
log_probs_at_labels = log_probs_at_labels * shift_mask # [B, L-1]

log_prob_per_example = log_probs_at_labels.sum(dim=1) # [B]

return log_prob_per_example


def get_DPO_loss(
respone_preferred_logprob_policy,
respone_preferred_logprob_ref,
respone_rejected_logprob_policy,
respone_rejected_logprob_ref,
beta=0.1, # beta 是一个超参数,用于平衡奖励与KL散度
):
"""
计算 DPO 损失
"""

respone_preferred_logprob_relative = respone_preferred_logprob_policy - respone_preferred_logprob_ref
respone_rejected_logprob_relative = respone_rejected_logprob_policy - respone_rejected_logprob_ref

# y_w 奖励 > y_l 奖励的概率统计
reward_accuracies = (respone_preferred_logprob_relative > respone_rejected_logprob_relative).float().mean()
# y_w 与 y_l奖励之差的平均值
reward_margins = (respone_preferred_logprob_relative - respone_rejected_logprob_relative).mean()

# logsigmoid 函数就是 log(sigmoid(x))
loss = - F.logsigmoid(
beta * (respone_preferred_logprob_relative - respone_rejected_logprob_relative)
).mean()

return (
loss,
respone_preferred_logprob_relative.mean().item(),
respone_rejected_logprob_relative.mean().item(),
reward_accuracies.item(),
reward_margins.item()
)

主循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
epochs = 3
lr = 1e-6
optimizer = AdamW(model.parameters(), lr=lr)

model.train()
ref_model.eval()

for _ in range(epochs):

for batch in tqdm(train_dataloader):

optimizer.zero_grad()

respone_preferred_batch_data = batch['respone_preferred_batch_data']
respone_preferred_batch_data = {k: v.to(DEVICE) for k, v in respone_preferred_batch_data.items()}
respone_rejected_batch_data = batch['respone_rejected_batch_data']
respone_rejected_batch_data = {k: v.to(DEVICE) for k, v in respone_rejected_batch_data.items()}

# ===== 先计算策略模型和参考模型的 logprob =====
# $\log \pi_\theta(y_w|x)$
input_ids = respone_preferred_batch_data['input_ids']
attention_mask = respone_preferred_batch_data['attention_mask']
labels = respone_preferred_batch_data['labels']

logits = model(
input_ids=input_ids,
attention_mask=attention_mask,
).logits
respone_preferred_logprob_policy = get_log_prob_by_labels(
logits = logits,
labels = labels
)

# $\log \pi_{\text{ref}}(y_w|x))$
with torch.no_grad():

logits = ref_model(
input_ids=input_ids,
attention_mask=attention_mask,
).logits
respone_preferred_logprob_ref = get_log_prob_by_labels(
logits = logits,
labels = labels
)

# ===== 再计算参考模型的 logprob =====
# $\log \pi_\theta(y_l|x)$
input_ids = respone_rejected_batch_data['input_ids']
attention_mask = respone_rejected_batch_data['attention_mask']
labels = respone_rejected_batch_data['labels']

logits = model(
input_ids=input_ids,
attention_mask=attention_mask,
).logits
respone_rejected_logprob_policy = get_log_prob_by_labels(
logits = logits,
labels = labels
)

# $\log \pi_{\text{ref}}(y_l|x))$
with torch.no_grad():

logits = ref_model(
input_ids=input_ids,
attention_mask=attention_mask,
).logits
respone_rejected_logprob_ref = get_log_prob_by_labels(
logits = logits,
labels = labels
)

# ===== 计算 DPO 损失 =====
loss, respone_preferred_logprob_relative, respone_rejected_logprob_relative, reward_accuracies, reward_margins = get_DPO_loss(
respone_preferred_logprob_policy,
respone_preferred_logprob_ref,
respone_rejected_logprob_policy,
respone_rejected_logprob_ref,
beta=0.1
)

loss.backward()
optimizer.step()

# torch.cuda.empty_cache()

wandb.log({
'loss': loss.item(),
'preferred_relative_logprob': respone_preferred_logprob_relative,
'dispreferred_relative_logprob': respone_rejected_logprob_relative,
'reward_accuracy': reward_accuracies,
'reward_margin': reward_margins,
})

wandb.finish()

可视化结果

image-20250624201116637

loss 从 0.69 降到 0.47,大概1/3。reward margin不断变大,accuracy 第一个epoch结束就达到了0.9,后面一直随着数据在波动,但从margin不断变大可看出还在正向训练。

DPO trainer by HF

以下是 huggingface 实现的 DPO trainer 的调用方法。出自其官方文档 https://huggingface.co/docs/trl/en/dpo_trainer#loss-functions。

1
2
3
4
5
6
7
8
9
10
11
# train_dpo.py
import json
from datasets import load_dataset
from trl import DPOConfig, DPOTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer

model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-0.5B-Instruct")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B-Instruct")
train_dataset = load_dataset("trl-lib/ultrafeedback_binarized", split="train").select(range(1000)) # 仅选取前1000条数据进行测试

# print(json.dumps(train_dataset[0], indent=2, ensure_ascii=False))

trl-lib/ultrafeedback_binarized 这个数据集 belike.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"chosen": [
{
"content": "Use the pygame library to write a version of the classic game Snake, with a unique twist",
"role": "user"
},
{
"content": "Sure, I'd be happy to help you write a version of the classic game Snake using the pygame library! Here's a basic outline of how we can approach this:\n\n1. First, we'll need to set up the game display and create a game object that we can use to handle the game's state.\n2. Next, we'll create the game's grid, which will be used to represent the game board. We'll need to define the size of the grid and the spaces within it.\n3. After that, we'll create the snake object, which will be used to represent the player's movement. We'll need to define the size of the snake and the speed at which it moves.\n4. We'll also need to create a food object, which will be used to represent the food that the player must collect to score points. We'll need to define the location of the food and the speed at which it moves.\n5. Once we have these objects set up, we can start handling the game's logic. We'll need to define the rules for the player's movements, such as how the player can move the snake and how the snake will grow as the player collects more food.\n6. We'll also need to add collisions detection to the game, so that the snake and the food can collide with each other.\n7. Finally, we'll need to add a user interface to the game, such as a menu and a scoreboard.\n\nNow, as for the unique twist, we could add a few features to make the game more interesting. For example, we could add power-ups that give the player special abilities, such as the ability to grow the snake faster or to change its direction. We could also add obstacles, such as walls or pits, that the player must avoid.\n\nHere's some sample code to get us started:\n```\nimport pygame\n\n# Initialize pygame\npygame.init()\n\n# Set up the game display\nwidth = 800\nheight = 600\nscreen = pygame.display.set_mode((width, height))\n\n# Define the colors\nWHITE = (255, 255, 255)\nBLACK = (0, 0, 0)\nGREEN = (0, 255, 0)\n\n# Define the game objects\nsnake = pygame.Rect(50, 50, 10, 10)\nfood = pygame.Rect(100, 100, 10, 10)\n\n# Define the game logic\ndef update_snake():\n # Get the mouse position\n mouse_pos = pygame.mouse.get_pos()\n\n # Move the snake based on the mouse position\n if mouse_pos[0] > snake.x:\n snake.x += 10\n elif mouse_pos[0] < snake.x:\n snake.x -= 10\n elif mouse_pos[1] > snake.y:\n snake.y += 10\n elif mouse_pos[1] < snake.y:\n snake.y -= 10\n\n # Update the snake's size\n if snake.x == food.x and snake.y == food.y:\n snake.width += 10\n snake.height += 10\n\n# Define the game loop\ndef game_loop():\n # Clear the screen\n screen.fill(BLACK)\n\n # Update the game objects\n update_snake()\n\n # Draw the game objects\n screen.fill(WHITE)\n screen.draw.rect(snake, GREEN)\n screen.draw.rect(food, BLACK)\n\n # Update the display\n pygame.display.flip()\n\n# Start the game loop\ngame_loop()\n\n# Run the game\nwhile True:\n for event in pygame.event.get():\n if event.type == pygame.QUIT:\n pygame.quit()\n break\n\n pygame.time.Clock().tick(60)\n```\nThis code sets up a basic game display, defines the snake and food objects, and updates the game state based on the player's movements. We've also added a simple user interface and some basic collision detection.\n\nAs for the unique twist, we could add a few features to make the game",
"role": "assistant"
}
],
"rejected": [
{
"content": "Use the pygame library to write a version of the classic game Snake, with a unique twist",
"role": "user"
},
{
"content": "Sure, here's an example of how to write a version of Snake game with a unique twist using the Pygame library:\n```python\nimport pygame\n\nclass SnakeGame:\n def __init__(self, game_width, game_height):\n pygame.init()\n screen = pygame.display.set_mode((game_width, game_height))\n pygame.display.set_caption(\"Snake Game\")\n self.speed = 5 # Speed of the snake\n self.food_speed = 1 # Speed of the food\n self.direction = 0 # Initial direction of the snake\n self.snakelen = 0 # Length of the snake\n self.food = pygame.image.load(\"snake_food.png\")\n self.head = pygame.image.load(\"snake_head.png\")\n self.tail = pygame.image.load(\"snake_tail.png\")\n self.game Quint()\n def Quint(self):\n for i in range(50):\n pygame.draw.line(screen, (180, 100, 220), (0, 0, 200), 2)\n pygame.draw.line(screen, (180, 100, 220), (50, 0, 200), 2)\n pygame.draw.line(screen, (180, 100, 220), (50, 50, 200), 2)\n pygame.draw.line(screen, (180, 100, 220), (50, 50, 300), 2)\n pygame.draw.line(screen, (180, 100, 220), (50, 80, 200), 2)\n pygame.draw.line(screen, (180, 100, 220), (50, 80, 300), 2)\n pygame.draw.line(screen, (180, 100, 220), (0, 50, 200), 2)\n pygame.draw.line(screen, (180, 100, 220), (0, 50, 300), 2)\n pygame.draw.line(screen, (180, 100, 220), (0, 80, 200), 2)\n pygame.draw.line(screen, (180, 100, 220), (0, 80, 300), 2)\n pygame.display.flip()\n self.game.run()\n def run(self):\n while True:\n for event in pygame.event. pygame.KEYDOWN:\n if event.key == pygame.K_LEFT:\n self.direction = -1\n if event.key == pygame.K_RIGHT:\n self.direction = 1\n self.snakelen += 1\n if self.snakelen == 0:\n self.snakelen = 10\n if self.snakelen > 20:\n self.snakelen = 20\n self.gameQuint()\n self.foodCrossing()\n self.headRun()\n pygame.display.update()\ngame = SnakeGame(800, 600)\ngame.run()\n```\nIn this game, the snake moves with a constant speed, but the direction of the snake can be controlled by the user using the left and right arrow keys. The snake grows in length every 10 segments, and when it reaches a certain length, it resets to 10 segments. The food moves fast and randomly crosses the screen, and the snake can eat it by colliding with it. The snake's head and tail move independently of each other. The game ends when the snake dies or reaches the end of the screen.",
"role": "assistant"
}
],
"score_chosen": 6.0,
"score_rejected": 4.0
}

它数据已经部分组织好了,chosen 和 rejected 都是包含了问题与回答的完整 messages。还额外多了两个 score 代表满意度之类的。

1
2
3
training_args = DPOConfig(output_dir="Qwen2.5-0.5B-DPO")
trainer = DPOTrainer(model=model, args=training_args, processing_class=tokenizer, train_dataset=train_dataset)
trainer.train()

tokenize

DPOTrainer.__init__() 主要负责数据处理

1
2
3
4
# 出自 DPOTrainer
if data_collator is None:
data_collator = DataCollatorForPreference(pad_token_id=self.padding_value)
train_dataset = self._prepare_dataset(train_dataset, processing_class, args, "train")

其中 processing_class 就是 tokenizer。DPOTrainer._prepare_dataset 里经过了3个变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dataset = dataset.map(maybe_extract_prompt, **map_kwargs)
dataset = dataset.map(
maybe_apply_chat_template, fn_kwargs={"tokenizer": processing_class, "tools": args.tools}, **map_kwargs
)
dataset = dataset.map(
self.tokenize_row if not self.is_vision_model else self.process_row,
remove_columns=["chosen", "rejected"],
fn_kwargs={
"processing_class": processing_class,
"max_prompt_length": args.max_prompt_length,
"max_completion_length": args.max_completion_length,
# for enc-dec, we add the special tokens ([bos_token] + prompt + [eos_token]; completion + [eos_token])
"add_special_tokens": False,
},
**map_kwargs,
)

maybe_extract_promptmaybe_apply_chat_template 都是嵌套函数,内部简单判断下是否需要执行 extract_promptapply_chat_template
extract_prompt直接把对话 messages 拆分为共同的 messages 前缀 prompt 和之后各自的回答 chosen 与 rejected。

1
2
3
4
5
6
7
8
9
10
for idx in range(min(len(example["chosen"]), len(example["rejected"]))):
if example["chosen"][idx] != example["rejected"][idx]:
if example["chosen"][idx - 1] == " ": # remove space before the prompt
idx -= 1
break
return {
"prompt": example["chosen"][:idx],
"chosen": example["chosen"][idx:],
"rejected": example["rejected"][idx:],
}

apply_chat_template 就是分别为 prompt, chosen, rejected 转换为文本形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# Apply the chat template to the whole conversation
if "messages" in example:
messages = tokenizer.apply_chat_template(example["messages"], tools=tools, tokenize=False)

# Apply the chat template to the prompt, adding the generation prompt
if "prompt" in example:
last_role = example["prompt"][-1]["role"]
if last_role == "user":
add_generation_prompt = True
continue_final_message = False
elif last_role == "assistant":
add_generation_prompt = False
continue_final_message = True
else:
raise ValueError(f"Invalid role in the last message: {last_role}")
prompt = tokenizer.apply_chat_template(
example["prompt"],
tools=tools,
continue_final_message=continue_final_message,
tokenize=False,
add_generation_prompt=add_generation_prompt,
)
if "prompt" in example: # explicit prompt and prompt-completion case
if "chosen" in example:
prompt_chosen = tokenizer.apply_chat_template(
example["prompt"] + example["chosen"], tools=tools, tokenize=False
)
# DeepSeek-R1 inserts a <think> token when using `add_generation_prompt`, which can cause discrepancies
# between the prompt alone and the combined prompt+completion. To ensure consistency, we extract the
# common prefix between the two. In most cases, this is a no-op.
prompt = "".join(x for x, _ in takewhile(lambda x: x[0] == x[1], zip(prompt, prompt_chosen)))

chosen = prompt_chosen[len(prompt) :]
if "rejected" in example and "prompt" in example: # explicit prompt
prompt_rejected = tokenizer.apply_chat_template(
example["prompt"] + example["rejected"], tools=tools, tokenize=False
)
# Handle DeepSeek-R1 <think> token, see the above comment for details
prompt = "".join(x for x, _ in takewhile(lambda x: x[0] == x[1], zip(prompt, prompt_rejected)))
rejected = prompt_rejected[len(prompt) :]
if "completion" in example:
prompt_completion = tokenizer.apply_chat_template(
example["prompt"] + example["completion"], tools=tools, tokenize=False
)
# Handle DeepSeek-R1 <think> token, see the above comment for details
prompt = "".join(x for x, _ in takewhile(lambda x: x[0] == x[1], zip(prompt, prompt_completion)))
completion = prompt_completion[len(prompt) :]
else: # implicit prompt case
if "chosen" in example:
chosen = tokenizer.apply_chat_template(example["chosen"], tools=tools, tokenize=False)
if "rejected" in example:
rejected = tokenizer.apply_chat_template(example["rejected"], tools=tools, tokenize=False)

# Extract the completion by removing the prompt part from the prompt-completion string
output = {}
if "messages" in example:
output["text"] = messages
if "prompt" in example:
output["prompt"] = prompt
if "chosen" in example:
output["chosen"] = chosen
if "rejected" in example:
output["rejected"] = rejected
if "completion" in example:
output["completion"] = completion
if "label" in example:
output["label"] = example["label"]

return output

最后一个函数就是正常的 tokenize 为 ids.

此外,它还单独设计了一个 data collater, DataCollatorForPreference,分别对 prompt, chosen, rejected 做 padding。

forward

forward 函数是 DPOTrainer.concatenated_forward, 它内部太长了,只抽取部分内容简单讲下, 内容直接写在代码段里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 把 prompt x 2, 同时 chosen, rejected 拼接,分别作为 prompt_input_ids, completion_input_ids
concatenated_batch = self.concatenated_inputs(batch, padding_value=self.padding_value)
# Concatenate the prompt and completion inputs
input_ids = torch.cat((prompt_input_ids, completion_input_ids), dim=1)
# loss mask 和 input_ids 形状一样,但对应 prompt 的内容是 0 ,对应 completion 的内容是 1。目的也是只mask prompt部分。
# Mask the prompt but not the completion for the loss
loss_mask = torch.cat(
(torch.zeros_like(prompt_attention_mask), completion_attention_mask),
dim=1,
)
# .... 中间一段变化,把一堆变量都塞入 model_kwargs
outputs = model(input_ids, **model_kwargs)
logits = outputs.logits
# 之后就要开始计算 log prob
# 先 shift label, label直接基于 input_ids 变换过来
# Offset the logits by one to align with the labels
labels = torch.roll(input_ids, shifts=-1, dims=1)
loss_mask = torch.roll(loss_mask, shifts=-1, dims=1).bool()
# label 经过 loss_mask 处理,把mask对象变为 0,不能为 -100,因为 -100 不是合法的索引
# Compute the log probabilities of the labels
labels[~loss_mask] = 0 # dummy token; we'll ignore the losses on these tokens later
per_token_logps = selective_log_softmax(logits, labels)
per_token_logps[~loss_mask] = 0
per_token_logps = torch.roll(per_token_logps, shifts=1, dims=1)
# 在 seq 维度取sum,因为都是 log
all_logps = per_token_logps[:, 1:].sum(-1)
# 因为数据是前部分是 chosen, 后部分是 rejected, 直接分开就行
output["chosen_logps"] = all_logps[:num_examples]
output["rejected_logps"] = all_logps[num_examples:]

DPO loss

loss 函数是 DPOTrainer.dpo_loss,传参分别是策略函数和参考函数的上计算 chosen 和 rejected 的logprob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 这里就是 log(\pi/\pi_ref),因为传入的就是 logprob,所以这里直接相减就行
logratios = chosen_logps - rejected_logps
ref_logratios = ref_chosen_logps - ref_rejected_logps
logits = logratios - ref_logratios
# 之后乘以beta,再 log sigmoid 处理下就变成 loss
if self.loss_type == "sigmoid":
losses = (
-F.logsigmoid(self.beta * logits) * (1 - self.label_smoothing)
- F.logsigmoid(-self.beta * logits) * self.label_smoothing
)

# 返回 chosen 和 rejected 的奖励平均值,和 loss
chosen_rewards = self.beta * (chosen_logps.to(device) - ref_chosen_logps.to(device)).detach()
rejected_rewards = self.beta * (rejected_logps.to(device) - ref_rejected_logps.to(device)).detach()

return losses, chosen_rewards, rejected_rewards
Comments