DPO数据集实战从零构建你的第一个偏好学习数据集附代码示例如果你最近在尝试微调自己的大语言模型特别是想让模型输出更符合人类的偏好那么你很可能已经听说过DPODirect Preference Optimization直接偏好优化。与传统的监督微调SFT不同DPO不需要一个“标准答案”它只需要知道在同一个问题下哪个回答更好chosen哪个更差rejected。这种“对比学习”的思路听起来很优雅但当你真正动手时第一个拦路虎往往就是数据从哪里来格式到底怎么弄网上能找到的公开偏好数据集比如Anthropic的HH-RLHF虽然质量高但要么领域不匹配要么数据量太大个人难以处理。更常见的情况是你手头只有一堆原始的对话日志、客服记录或者你自己用不同模型生成的一堆回答。如何把这些“原材料”变成DPO能吃的“精粮”是决定微调成败的关键一步。今天我们就抛开理论直接上手。我会带你从零开始一步步将原始的、杂乱的对话数据清洗、配对、转换成标准的DPO数据集格式。整个过程会穿插具体的Python代码、数据处理的坑点以及我实际项目中总结出的配对策略。目标很明确让你看完就能动手做出一个真正能用于训练、且质量可控的偏好数据集。1. 理解DPO数据不止是“好”与“坏”的标签在开始写代码之前我们必须先彻底搞懂DPO到底需要什么样的数据。很多人以为DPO数据就是简单的三元组(prompt, chosen_response, rejected_response)。这没错但魔鬼藏在细节里。这个“好”与“坏”的对比背后蕴含的偏好信号必须清晰、一致且有意义。注意一个常见的误解是chosen必须是“完全正确”的答案而rejected必须是“完全错误”的。实际上DPO的强大之处在于它能学习相对偏好。chosen只需要比rejected更好即可这种“更好”可以体现在更详细、更安全、更有帮助、更符合格式要求等任何一个维度上。让我们看一个来自真实客服场景的例子。原始数据可能长这样{ conversation_id: 101, turns: [ { role: user, content: 我的订单号是123456为什么还没发货 }, { role: assistant, content: 已查询您的订单正在配货中预计明天发出。, model_source: model_a, user_feedback: null }, { role: assistant, content: 请提供订单号以便查询。, model_source: model_b, user_feedback: thumbs_down } ] }在这个例子中同一个用户问题下我们有两个来自不同模型或同一模型不同参数的回复。model_a的回复直接给出了查询结果和预期而model_b的回复反而向用户索要已知信息。同时我们还有一条隐含的反馈信号user_feedback: thumbs_down。这构成了一个完美的DPO数据候选我们可以将model_a的回复标记为chosen将model_b的回复标记为rejected。DPO数据的核心结构可以归纳为下表字段描述数据类型必须示例prompt用户的输入/问题。字符串是“我的订单号是123456为什么还没发货”chosen被选中的、更优的助手回复。字符串 或 消息列表是“已查询您的订单正在配货中预计明天发出。”rejected被拒绝的、较差的助手回复。字符串 或 消息列表是“请提供订单号以便查询。”source(可选)数据来源标识。字符串否“customer_service_log_2024”metadata(可选)其他元数据如评分、模型版本等。字典否{chosen_score: 4.5, rejected_score: 1.0}这里有一个关键点chosen和rejected字段在Hugging Face的DPOTrainer等主流工具中通常期望是经过聊天模板格式化后的完整对话字符串或者是一个包含角色信息的消息字典列表。而prompt则通常是未格式化的用户输入。在后续的数据处理中我们需要根据所选基座模型的对话模板如ChatML、Llama3等进行正确的格式化。2. 数据收集与清洗从原始日志到候选对假设你手头有一批数据可能是多模型生成结果用不同模型或同一模型不同温度参数对同一批prompt生成多个回复。带人工反馈的日志生产环境中用户对模型回复进行了点赞thumbs_up或点踩thumbs_down。规则筛选的SFT数据你有高质量的SFT数据作为chosen候选同时用规则或弱模型生成了一些质量较低的回复作为rejected候选。我们的第一步是将这些原始数据加载并清洗。以下代码演示了如何处理一个包含多轮对话和反馈的JSONL文件import json from typing import List, Dict, Any, Optional from dataclasses import dataclass dataclass class RawConversation: 原始对话数据单元 conversation_id: str user_query: str # 用户当前轮次的问题 assistant_responses: List[Dict[str, Any]] # 多个助手回复每个回复是一个字典 # 示例: [{text: 回复A, model: gpt-4, score: 0.9}, {text: 回复B, model: claude-3, score: 0.6}] def load_and_parse_raw_data(file_path: str) - List[RawConversation]: 加载并解析原始JSONL格式的对话数据。 假设每行是一个独立的对话片段包含用户查询和多个候选回复。 conversations [] with open(file_path, r, encodingutf-8) as f: for line_num, line in enumerate(f): line line.strip() if not line: continue try: data json.loads(line) # 根据你的原始数据格式调整这里的解析逻辑 conv RawConversation( conversation_iddata.get(id, fline_{line_num}), user_querydata[query], # 假设字段名为query assistant_responsesdata[responses] # 假设字段名为responses是一个列表 ) conversations.append(conv) except (json.JSONDecodeError, KeyError) as e: print(f警告跳过第{line_num}行解析错误: {e}) continue print(f成功加载 {len(conversations)} 条对话记录。) return conversations # 示例调用 raw_data load_and_parse_raw_data(your_raw_data.jsonl)数据清洗是重中之重。低质量的数据对会严重干扰DPO训练甚至导致模型退化。我们需要制定一套清洗规则长度过滤剔除过短如字符数5或过长可能包含异常截断的回复。重复检测如果chosen和rejected的文本内容高度相似如重复率超过90%这对数据就失去了对比意义应剔除。关键词过滤剔除包含明显有害、歧视性或毫无意义占位符如“我不知道”、“未找到信息”的回复除非你的目标就是让模型学会避免这些。语言一致性确保prompt和回复的语言一致。例如中文提问配英文回答除非特定场景否则可能是不良数据。下面是一个结合了以上规则的清洗函数import re from difflib import SequenceMatcher def should_filter_response(text: str, prompt_lang: str zh) - bool: 判断一个回复是否应该被过滤掉 # 1. 长度过滤 if len(text.strip()) 10: return True # 2. 占位符或无效内容过滤可根据实际情况扩充列表 invalid_patterns [ r^未找到.*信息$, r^抱歉.*无法回答, r^I dont know.*, r^NaN$, r^$ # 空字符串 ] for pattern in invalid_patterns: if re.search(pattern, text, re.IGNORECASE): return True # 3. 简单语言检测示例生产环境可用langdetect库 # 这里假设prompt_lang是已知的如果回复中大量出现另一种语言的字符可能过滤 # 此处简化处理实际项目需要更严谨的语言检测 return False def text_similarity(text1: str, text2: str) - float: 计算两段文本的简单相似度0-1 return SequenceMatcher(None, text1, text2).ratio() def clean_and_pair_responses(conversations: List[RawConversation], similarity_threshold: float 0.8) - List[Dict[str, Any]]: 清洗回复并为每个prompt生成候选回复对(chosen, rejected)。 这里采用一个简单策略根据预设的评分或模型优先级选择最优和最差回复。 paired_data [] for conv in conversations: valid_responses [] for resp in conv.assistant_responses: resp_text resp.get(text, ) if should_filter_response(resp_text): continue valid_responses.append(resp) if len(valid_responses) 2: continue # 至少需要两个有效回复才能构成对比对 # 假设每个回复字典里有一个score字段表示质量评分 # 如果没有可以用模型版本、反馈类型等作为代理指标 valid_responses.sort(keylambda x: x.get(score, 0), reverseTrue) chosen_candidate valid_responses[0] rejected_candidate valid_responses[-1] # 检查chosen和rejected是否过于相似 if text_similarity(chosen_candidate[text], rejected_candidate[text]) similarity_threshold: continue # 跳过相似度过高的对 paired_data.append({ prompt: conv.user_query, chosen: chosen_candidate[text], rejected: rejected_candidate[text], metadata: { chosen_source: chosen_candidate.get(model, unknown), rejected_source: rejected_candidate.get(model, unknown), chosen_score: chosen_candidate.get(score), rejected_score: rejected_candidate.get(score), conversation_id: conv.conversation_id } }) print(f清洗配对后得到 {len(paired_data)} 条有效DPO候选数据。) return paired_data # 执行清洗与配对 cleaned_pairs clean_and_pair_responses(raw_data)3. 配对策略设计如何定义“更好”上一步我们用了最简单的策略按评分排序取最高分作为chosen最低分作为rejected。但在实际项目中定义“更好”往往更复杂也更有讲究。不同的配对策略会引导模型学习不同的偏好。以下是几种常见的策略评分绝对值差只选择评分差距大于某个阈值如chosen_score - rejected_score 1.0的对。这确保了偏好信号足够强。基于排名的配对对于每个prompt如果有N个回复可以生成C(N, 2)个配对即所有两两组合并为每个对标注谁是chosen排名更高的。这能极大扩充数据量但需要可靠的排序信息。基于反馈类型的配对如果有明确的正面点赞和负面点踩反馈那么“有点赞无点踩”的回复 vs “有点踩无点赞”的回复就是天然的高质量配对。合成rejected数据当你只有高质量的chosen数据如精挑的SFT数据时可以人工构造或使用一个弱基线模型如未调优的模型、早期版本模型来为每个prompt生成一个rejected回复。这种方法的关键在于确保rejected回复是合理但次优的而不是完全胡言乱语。让我们实现一个更健壮的、基于评分差距和排名的配对策略from itertools import combinations def generate_rank_based_pairs(conversations: List[RawConversation], min_score_gap: float 0.5, max_pairs_per_prompt: int 3) - List[Dict[str, Any]]: 基于排名的配对策略。 对于每个对话将其所有有效回复按评分排序然后为排名靠前的和靠后的回复生成配对。 同时控制每个prompt生成的对数避免数据不平衡。 all_pairs [] for conv in conversations: valid_responses [] for resp in conv.assistant_responses: resp_text resp.get(text, ) if should_filter_response(resp_text): continue # 确保回复有评分没有则跳过或赋予默认值 if score not in resp: continue valid_responses.append(resp) if len(valid_responses) 2: continue # 按评分降序排序 valid_responses.sort(keylambda x: x[score], reverseTrue) generated_pairs 0 # 生成排名靠前与靠后回复之间的配对 # 例如取前2名和后2名两两组合 top_k min(2, len(valid_responses) // 2) top_indices list(range(top_k)) bottom_indices list(range(len(valid_responses) - top_k, len(valid_responses))) for i in top_indices: for j in bottom_indices: if i j: continue chosen_resp valid_responses[i] rejected_resp valid_responses[j] score_gap chosen_resp[score] - rejected_resp[score] if score_gap min_score_gap: continue # 评分差距太小偏好信号弱 # 再次检查相似度 if text_similarity(chosen_resp[text], rejected_resp[text]) 0.85: continue all_pairs.append({ prompt: conv.user_query, chosen: chosen_resp[text], rejected: rejected_resp[text], metadata: { chosen_rank: i, rejected_rank: j, score_gap: score_gap, chosen_source: chosen_resp.get(model, unknown), rejected_source: rejected_resp.get(model, unknown), } }) generated_pairs 1 if generated_pairs max_pairs_per_prompt: break if generated_pairs max_pairs_per_prompt: break print(f基于排名策略生成了 {len(all_pairs)} 条DPO数据对。) return all_pairs # 使用排名策略生成数据 rank_based_pairs generate_rank_based_pairs(raw_data, min_score_gap0.3)4. 格式转换与分词适配训练框架现在我们有了结构化的(prompt, chosen, rejected)三元组列表。接下来我们需要将其转换成特定大语言模型LLM所需的输入格式。不同的模型使用不同的对话模板Chat Template例如ChatML格式(Used by many models like Mistral, Zephyr):|im_start|user {prompt}|im_end| |im_start|assistant {response}|im_end|Llama 3 Instruct格式:|begin_of_text||start_header_id|user|end_header_id| {prompt}|eot_id||start_header_id|assistant|end_header_id| {response}|eot_id|自定义模板你的基座模型可能有自己的模板。此外DPO训练时损失函数通常只计算助手回复部分的token。因此我们需要生成loss_mask在计算损失时掩蔽掉prompt部分和特殊token。幸运的是Hugging Face的transformers库和trl库提供了工具来简化这个过程。下面的代码展示了如何使用transformers的tokenizer和chat template来格式化数据并准备DPO训练所需的输入ID和attention mask。from transformers import AutoTokenizer import torch from torch.utils.data import Dataset class DPODataset(Dataset): 自定义DPO数据集类负责加载配对数据、应用聊天模板、并完成tokenization。 def __init__(self, paired_data: List[Dict], tokenizer: AutoTokenizer, max_length: int 1024): self.data paired_data self.tokenizer tokenizer self.max_length max_length # 确保tokenizer有pad_token if self.tokenizer.pad_token is None: self.tokenizer.pad_token self.tokenizer.eos_token def __len__(self): return len(self.data) def _format_conversation(self, prompt: str, response: str) - str: 使用模型的聊天模板格式化单轮对话。 # 构建消息列表 messages [ {role: user, content: prompt}, {role: assistant, content: response}, ] # 应用聊天模板。add_generation_promptFalse 表示不添加让模型开始生成的提示符如|start_header_id|assistant formatted_text self.tokenizer.apply_chat_template( messages, tokenizeFalse, add_generation_promptFalse ) return formatted_text def __getitem__(self, idx): item self.data[idx] prompt item[prompt] chosen_response item[chosen] rejected_response item[rejected] # 1. 格式化文本 chosen_text self._format_conversation(prompt, chosen_response) rejected_text self._format_conversation(prompt, rejected_response) # 2. Tokenization chosen_encodings self.tokenizer( chosen_text, truncationTrue, max_lengthself.max_length, paddingmax_length, # 训练时通常需要统一长度 return_tensorspt, ) rejected_encodings self.tokenizer( rejected_text, truncationTrue, max_lengthself.max_length, paddingmax_length, return_tensorspt, ) # 3. 提取input_ids和attention_mask chosen_input_ids chosen_encodings[input_ids].squeeze(0) # [seq_len] chosen_attention_mask chosen_encodings[attention_mask].squeeze(0) rejected_input_ids rejected_encodings[input_ids].squeeze(0) rejected_attention_mask rejected_encodings[attention_mask].squeeze(0) # 4. 构建loss labelsDPO通常使用input_ids作为labels并通过mask控制loss计算范围 # 注意对于DPO我们通常需要自己计算哪些位置需要计算loss。 # 一种常见做法是loss只计算assistant回复对应的token。 # 这里我们返回input_ids和attention_mask在训练循环中或自定义collate_fn里生成loss mask。 # 更复杂的实现可以在这里直接生成loss mask。 return { chosen_input_ids: chosen_input_ids, chosen_attention_mask: chosen_attention_mask, rejected_input_ids: rejected_input_ids, rejected_attention_mask: rejected_attention_mask, prompt: prompt, # 保留原始prompt用于调试 } # 初始化tokenizer和数据集 model_name meta-llama/Llama-3.2-1B-Instruct # 以Llama 3.2 1B为例 tokenizer AutoTokenizer.from_pretrained(model_name) # 如果tokenizer没有定义chat_template可能需要手动设置 # tokenizer.chat_template ... dataset DPODataset(rank_based_pairs, tokenizer, max_length512) # 查看一条数据样例 sample dataset[0] print(Prompt:, sample[prompt][:100], ...) print(Chosen input_ids shape:, sample[chosen_input_ids].shape) print(Chosen attention_mask sum (非pad长度):, sample[chosen_attention_mask].sum().item())对于loss mask的生成一个更精细的实现是在__getitem__方法中根据tokenizer解码后的token识别出assistant回复开始的位置然后生成一个与之对应的mask。这里提供一个简化版的思路def _generate_loss_mask(self, input_ids: torch.Tensor) - torch.Tensor: 生成loss mask仅对assistant回复部分的token为1其他为0。 这是一个简化实现假设经过apply_chat_template后assistant回复的开始有特定标记。 实际实现需要根据具体的聊天模板进行调整。 # 将input_ids解码成token字符串列表 tokens self.tokenizer.convert_ids_to_tokens(input_ids.tolist()) loss_mask torch.zeros_like(input_ids, dtypetorch.long) # 寻找assistant回复开始的标记。例如在Llama3模板中可能是|start_header_id|assistant对应的token。 # 这里只是一个示例逻辑你需要根据实际tokenizer和模板调整。 assistant_start_marker self.tokenizer.encode(|start_header_id|assistant, add_special_tokensFalse) # 在input_ids中寻找该标记序列的开始位置 seq_len len(input_ids) marker_len len(assistant_start_marker) for i in range(seq_len - marker_len 1): if all(input_ids[ij] assistant_start_marker[j] for j in range(marker_len)): # 找到开始位置从这个标记结束之后的位置开始计算loss loss_start_idx i marker_len # 一直掩蔽到序列结束或到下一个特殊标记如eos_token loss_mask[loss_start_idx:] 1 # 需要排除padding部分attention_mask为0的位置 # 我们将在collate_fn或训练时结合attention_mask处理 break return loss_mask然后在__getitem__返回的字典中加入chosen_loss_mask和rejected_loss_mask。5. 数据质量评估与拆分在投入训练之前我们必须对构建好的数据集进行质量评估和拆分。直接使用全部数据训练可能导致过拟合或评估不准确。质量评估维度多样性检查prompt的语义多样性避免过于集中。可以计算prompt的嵌入向量用sentence-transformers并做聚类分析。偏好强度计算数据集中chosen和rejected在某个维度如长度、情感、特定关键词上的平均差异。差异过小的配对可能提供噪声信号。长度分布确保chosen和rejected的长度分布合理没有系统性偏差如chosen总是更长。import numpy as np from collections import Counter def analyze_dataset(dataset_pairs: List[Dict]): 对构建好的DPO数据集进行基本分析 chosen_lens [] rejected_lens [] prompt_lens [] for pair in dataset_pairs: prompt_lens.append(len(pair[prompt])) chosen_lens.append(len(pair[chosen])) rejected_lens.append(len(pair[rejected])) print( 数据集分析报告 ) print(f总样本数: {len(dataset_pairs)}) print(fPrompt平均长度: {np.mean(prompt_lens):.1f} 字符) print(fChosen回复平均长度: {np.mean(chosen_lens):.1f} 字符) print(fRejected回复平均长度: {np.mean(rejected_lens):.1f} 字符) print(fChosen比Rejected长的样本占比: {np.mean([c r for c, r in zip(chosen_lens, rejected_lens)])*100:.1f}%) # 检查是否有完全相同的chosen和rejected清洗后应极少 duplicate_count sum(1 for p in dataset_pairs if p[chosen] p[rejected]) print(fChosen与Rejected完全相同的样本数: {duplicate_count} (应接近0)) analyze_dataset(rank_based_pairs)数据集拆分通常按照80/10/10或90/5/5的比例拆分为训练集、验证集和测试集。验证集用于在训练过程中监控模型是否在“学会偏好”而不是单纯记忆。测试集用于最终评估。from sklearn.model_selection import train_test_split def split_dataset(all_pairs: List[Dict], train_ratio: float 0.8, val_ratio: float 0.1, seed: int 42): 拆分数据集为训练集、验证集、测试集。 test_ratio 1 - train_ratio - val_ratio # 首先按conversation_id或其他分组键进行分层确保同一对话的数据不在不同集合中 # 这里简化处理直接随机拆分 train_val, test train_test_split(all_pairs, test_size(1 - train_ratio - val_ratio), random_stateseed) train, val train_test_split(train_val, test_sizeval_ratio/(train_ratioval_ratio), random_stateseed) print(f训练集: {len(train)} 条) print(f验证集: {len(val)} 条) print(f测试集: {len(test)} 条) return train, val, test train_pairs, val_pairs, test_pairs split_dataset(rank_based_pairs, train_ratio0.8, val_ratio0.1)最后将处理好的数据集保存为标准格式如JSONL方便后续加载和复现。def save_to_jsonl(data: List[Dict], file_path: str): 将数据保存为JSONL格式 with open(file_path, w, encodingutf-8) as f: for item in data: # 只保存核心字段metadata可以简化或保留 record { prompt: item[prompt], chosen: item[chosen], rejected: item[rejected], # metadata: item.get(metadata, {}) # 可选保存 } f.write(json.dumps(record, ensure_asciiFalse) \n) print(f数据已保存至: {file_path}) save_to_jsonl(train_pairs, dpo_train.jsonl) save_to_jsonl(val_pairs, dpo_val.jsonl) save_to_jsonl(test_pairs, dpo_test.jsonl)走到这里你已经拥有了一个完全由自己构建、清洗、配对并格式化的DPO数据集。这个数据集紧密贴合你的业务场景和数据分布远比直接使用通用数据集更能提升微调效果。接下来你就可以用Hugging Face的DPOTrainer或类似的训练框架加载这个数据集开始你的偏好优化之旅了。记住数据是模型的天花板在DPO中清晰、一致、高质量的偏好信号是让模型理解“什么更好”的关键。