论文情况概括
http://arxiv.org/abs/2506.23235
论文基本信息 (Bibliographic Information)
- 标题 (Title): Generalist Reward Models: Found Inside Large Language Models (通用奖励模型:发现于大型语言模型内部)
- 作者 (Authors): Yi-Chen Li, Tian Xu, Yang Yu, Xuqin Zhang, Xiong-Hui Chen, Zhongxiang Ling, Ningjing Chao, Lei Yuan, Zhi-Hua Zhou (来自南京大学计算机科学与技术系、人工智能学院及软件新技术全国重点实验室)
- 发表期刊/会议 (Journal/Conference): 这是一篇于 arXiv 发布的预印本 (Preprint),尚未在经同行评审的正式期刊或会议上发表。
- 发表年份 (Publication Year): 2025年 (根据论文中的日期 29 Jun 2025)
- 摘要 (Abstract): 论文的核心论点是,一个强大的通用奖励模型 (Generalist Reward Model) 已经潜在地存在于任何通过标准“下一个词元预测” (Next-token Prediction) 训练的大语言模型 (LLM) 内部。作者从理论上证明,这种“内生奖励” (Endogenous Reward) 等价于通过离线逆强化学习 (Offline Inverse Reinforcement Learning) 学到的奖励函数,因此可以直接从基座模型中提取,无需额外训练或昂贵的人类偏好数据。更重要的是,他们证明了使用这种内生奖励进行强化学习微调,可以得到一个比原始模型错误界更优的策略。实验证实,该方法不仅优于现有的“LLM-as-a-judge”方法,甚至能超越经过显式训练的奖励模型,为模型对齐提供了一个更高效、强大且可扩展的新范式。
整体概括
- 研究背景与动机 (Background & Motivation - Why):
 当前,使大语言模型与人类价值观对齐的主流方法是“基于人类反馈的强化学习” (RLHF),但这一过程极度依赖一个在昂贵的人类偏好数据上训练的奖励模型 (RM)。为了规避高昂的成本,研究界转向“基于 AI 反馈的强化学习” (RLAIF) 或 "LLM-as-a-judge" 框架,但这些方法通常缺乏严格的理论基础,更像是一种启发式方法。这引出了一个根本性问题:一个高质量的奖励信号是否必须从外部获取?本文正是为了解决现有对齐方法成本高、理论基础薄弱的痛点,探索是否存在一种更根本、更高效的方式来获取奖励信号。
- 核心贡献/主要发现 (Main Contribution/Findings - What): - 发现内生奖励 (Endogenous Reward): 论文最重要的发现是,一个高质量的通用奖励模型天然地内嵌于任何经过“下一个词元预测”训练的语言模型中,作者将其命名为 内生奖励 (Endogenous Reward) 。
- 建立理论基础: 论文首次从理论上建立了“下一个词元预测”目标与“离线逆强化学习” (Offline IRL) 之间的等价关系。证明了 LLM 的 logits 本身就是 IRL 框架下的一个最优的软 Q 函数 ($Q$-function),从而为直接提取奖励提供了坚实的理论依据。
- 提出无训练奖励提取方法: 基于上述理论,论文提出了一种无需任何额外训练即可从基座模型中直接提取奖励函数的方法。这极大地简化了传统 RLHF 中需要独立训练奖励模型的复杂流程。
- 证明强化学习的有效性: 论文从理论上证明了,使用这种内生奖励对模型进行强化学习微调,能有效缓解模仿学习中普遍存在的“复合误差” (Compounding Errors) 问题。策略的性能差距从与任务长度 $H$ 的二次方 ($O(H^2)$) 相关,优化到了更优的线性相关 ($O(H)$)。据作者所知,这是首个为强化学习应用于 LLM 提供了有效性理论证明的工作。
 
代码
此处代码针对实验1使用内源奖励模型对数据集进行打分的测试
本次实验选择THU-KEG/RM-Bench · Datasets at Hugging Face的chat分类进行测试
File: reward.py
import torch
import torch.nn.functional as F
from unsloth import FastLanguageModel
import warnings
from datasets import load_dataset
from tqdm import tqdm
from typing import List, Dict, Any
import numpy as np
# 忽略不必要的警告
warnings.filterwarnings("ignore")
def compute_accuracy(results: List[Dict[str, Any]]) -> Dict[str, float]:
    """
    根据 RM-Bench 官方文档,计算 hard, normal, 和 easy 准确率。
    结果是一个字典列表,每个字典包含 'score_chosen' 和 'score_rejected' 键。
    每个键对应一个包含3个分数的列表(concise, detailed_plain, detailed_markdown)。
    我们迭代比较这 3x3 的分数矩阵。
    """
    MATRIX_SIZE = 3
    acc_matrix = np.zeros((MATRIX_SIZE, MATRIX_SIZE))
    for result in results:
        # 确保分数列表长度正确
        if len(result.get("score_chosen", [])) != MATRIX_SIZE or len(result.get("score_rejected", [])) != MATRIX_SIZE:
            continue
        for i in range(MATRIX_SIZE):
            for j in range(MATRIX_SIZE):
                if result["score_chosen"][i] > result["score_rejected"][j]:
                    acc_matrix[i][j] += 1
    if not results:
        return {"hard_acc": 0.0, "normal_acc": 0.0, "easy_acc": 0.0, "average_acc": 0.0}
    acc_matrix /= len(results)
    # hard accuracy: 比较风格更简洁的 chosen 和风格更花哨的 rejected (矩阵右上三角)
    upper_right_count = MATRIX_SIZE * (MATRIX_SIZE - 1) / 2
    hard_acc = np.sum(np.triu(acc_matrix, 1)) / upper_right_count if upper_right_count > 0 else 0.0
    # normal accuracy: 比较风格相同的 chosen 和 rejected (矩阵对角线)
    normal_acc = np.mean(np.diag(acc_matrix))
    # easy accuracy: 比较风格更花哨的 chosen 和风格更简洁的 rejected (矩阵左下三角)
    lower_left_count = MATRIX_SIZE * (MATRIX_SIZE - 1) / 2
    easy_acc = np.sum(np.tril(acc_matrix, -1)) / lower_left_count if lower_left_count > 0 else 0.0
    average_acc = np.sum(acc_matrix) / (MATRIX_SIZE * MATRIX_SIZE)
    return {
        "hard_acc": hard_acc,
        "normal_acc": normal_acc,
        "easy_acc": easy_acc,
        "average_acc": average_acc
    }
class EndogenousRewardModel:
    """
    基于论文《Generalist Reward Models: Found Inside Large Language Models》
    实现的内生奖励模型 (Endogenous Reward Model, EndoRM)。
    """
    def __init__(self, model, tokenizer, alpha=1.0):
        self.model = model.eval()
        self.tokenizer = tokenizer
        self.alpha = alpha
        self.vocab_size = model.config.vocab_size
        self.device = model.device
        print(f"内生奖励模型已在设备 '{self.device}' 上为 '{model.config._name_or_path}' 初始化。")
    @torch.no_grad()
    def calculate_reward(self, prompt: str, response: str, gamma: float = 0.95, beta: float = 0.0) -> float:
        """
        计算给定 prompt 和 response 的内生奖励分数。
        Args:
            prompt (str): 输入的提示或问题。
            response (str): 模型生成或待评估的回答。
            gamma (float): 奖励的折扣因子,用于调整时间步长对总奖励的影响。
            beta (float): 奖励的下限因子,确保即使在折扣后奖励也不会太小。
        Returns:
            float: 计算出的总奖励分数。如果序列过长,则返回负无穷大。
        """
        # 步骤 1: 将 prompt 和 response 文本编码为 token IDs
        # prompt_inputs 包含 input_ids 和 attention_mask
        prompt_inputs = self.tokenizer(prompt, return_tensors='pt')
        # response_inputs 只包含 input_ids,不添加特殊 token (如<s>, </s>)
        response_inputs = self.tokenizer(response, return_tensors='pt', add_special_tokens=False)
        # 将 token IDs 移动到模型所在的设备 (例如 'cuda:0')
        prompt_ids = prompt_inputs.input_ids.to(self.device)
        response_ids = response_inputs.input_ids.to(self.device)
        # 获取 prompt 和 response 的长度
        prompt_len = prompt_ids.shape[1]
        response_len = response_ids.shape[1]
        # 如果 response 为空,则奖励为0
        if response_len == 0: return 0.0
        # 检查总长度是否超过模型的最大位置编码,防止因超长序列导致的错误
        if (prompt_len + response_len) > self.model.config.max_position_embeddings:
            print(f"警告:序列长度 {prompt_len + response_len} 超出模型最大长度,跳过此样本。")
            return float('-inf')
        # 步骤 2: 获取模型的 logits
        # 将 prompt 和 response 的 token IDs 拼接成一个完整的序列
        full_ids = torch.cat([prompt_ids, response_ids], dim=1)
        # 将完整序列输入模型,获取每个位置上每个 token 的 logits
        # logits 的形状为 (batch_size, sequence_length, vocab_size)
        logits = self.model(full_ids).logits
        # 我们只关心 response 部分的 logits,用于计算奖励
        # 从 prompt 的最后一个 token 开始,到完整序列的倒数第二个 token 结束
        # 这是因为在自回归模型中,s_h (输入) 的 logit 是在预测 a_h (输出) 时产生的
        response_logits = logits[:, prompt_len - 1:-1, :]
        # 步骤 3: 逐个 token 计算内生奖励
        per_token_rewards = []
        # 遍历 response 中的每一个 token
        for h in range(response_len):
            # 论文核心公式: r_h = Q(s_h, a_h) - V(s_{h+1})
            # 1. 计算 Q(s_h, a_h): 即在状态 s_h 时,选择动作(token) a_h 的价值。
            #    根据论文,这等价于模型在当前步预测的、实际选择的那个 token 的 logit 值。
            chosen_token_id = response_ids[:, h]  # 当前步实际选择的 token
            # 从当前步的 logits 分布中,提取实际选择的 token 的 logit 值
            q_value = response_logits[:, h, :].gather(-1, chosen_token_id.unsqueeze(-1)).squeeze().item()
            # 2. 计算 V(s_{h+1}): 即下一个状态 s_{h+1} 的价值。
            #    根据论文公式 (154),V(s) = alpha * log(sum(exp(Q(s,a)/alpha)))
            #    这通过 logsumexp 技巧来稳定计算。
            if h < response_len - 1:
                # 如果不是最后一个 token,下一个状态的 logits 是可用的
                next_logits = response_logits[:, h + 1, :]
                v_next = self.alpha * torch.logsumexp(next_logits / self.alpha, dim=-1).item()
            else:
                # [cite_start]如果是最后一个 token,根据论文的边界条件 Q(s_{H+1}, ·) = 0 [cite: 166]
                # V(s_{H+1}) = alpha * log(sum(exp(0))) = alpha * log(|V|)
                # 其中 |V| 是词汇表大小 (vocab_size)。
                v_next = self.alpha * torch.log(torch.tensor(self.vocab_size, dtype=torch.float32)).item()
            # 3. 计算当前 token 的奖励
            token_reward = q_value - v_next
            per_token_rewards.append(token_reward)
        # 步骤 4: 计算总奖励
        # 根据论文附录 C 的公式 (639),使用折扣因子 gamma 和下限 beta 来聚合每个 token 的奖励
        total_reward = sum(max(gamma**h, beta) * r_h for h, r_h in enumerate(per_token_rewards))
        return total_reward
# =================================================================
# 主评测流程
# =================================================================
if __name__ == '__main__':
    # 1. 模型和分词器初始化
    model, tokenizer = FastLanguageModel.from_pretrained(
        "unsloth/Qwen3-4B-Base",
        max_seq_length=4096, # 增加最大长度以容纳更长的样本
        dtype=torch.bfloat16,
        load_in_4bit=False,
    )
    model.eval()
    # 2. 实例化内生奖励模型
    endo_rm = EndogenousRewardModel(model, tokenizer, alpha=1.0)
    # 3. 加载并筛选数据集
    print("\n" + "="*50)
    print("正在从 Hugging Face Hub 加载 THU-KEG/RM-Bench 数据集...")
    try:
        # 使用 'train' 分割
        dataset = load_dataset("THU-KEG/RM-Bench", split="train")
        # 筛选出 domain 为 'chat' 的样本
        chat_dataset = dataset.filter(lambda example: example['domain'] == 'chat')
        print(f"数据集加载和筛选成功!将评测 {len(chat_dataset)} 条 'chat' 领域的样本。")
        all_results = []
        # 使用tqdm显示评测进度
        for example in tqdm(chat_dataset, desc="正在评测 'chat' 样本"):
            prompt = example['prompt']
            # 为3个chosen回答计算分数
            scores_chosen = [endo_rm.calculate_reward(prompt, resp) for resp in example['chosen']]
            # 为3个rejected回答计算分数
            scores_rejected = [endo_rm.calculate_reward(prompt, resp) for resp in example['rejected']]
            # 过滤掉计算失败的样本
            if float('-inf') in scores_chosen or float('-inf') in scores_rejected:
                continue
            all_results.append({
                "score_chosen": scores_chosen,
                "score_rejected": scores_rejected
            })
        # 4. 计算最终准确率
        print("\n所有样本评测完成,正在计算最终准确率...")
        final_accuracies = compute_accuracy(all_results)
        # 5. 打印结果
        print("\n" + "="*50)
        print("RM-Bench 'chat' 领域评测结果")
        print("="*50)
        print(f"总评测样本数: {len(all_results)}")
        print(f"平均准确率 (Average Accuracy): {final_accuracies['average_acc']:.2%}")
        print(f"简单准确率 (Easy Accuracy):   {final_accuracies['easy_acc']:.2%}")
        print(f"普通准确率 (Normal Accuracy): {final_accuracies['normal_acc']:.2%}")
        print(f"困难准确率 (Hard Accuracy):   {final_accuracies['hard_acc']:.2%}")
        print("="*50)
    except Exception as e:
        print(f"评测过程中出现错误: {e}")
        print("请检查网络连接、依赖库或模型最大长度设置。")结果如下:
总评测样本数: 129
平均准确率 (Average Accuracy): 84.93%
简单准确率 (Easy Accuracy):   89.66%
普通准确率 (Normal Accuracy): 91.47%
困难准确率 (Hard Accuracy):   73.64%
