Data Plugins

1. Data Plugins 简介

DataConverterPlugin

1. DataConverterPlugin 简介

DataConverter 负责将非标准格式的数据集转换为 v1 的标准 Messages 格式。这使得用户可以继续使用现有的数据集(如 Alpaca 格式),而无需手动转换。针对自定义格式的数据集,用户也可以通过构建对应的自定义 DataConverter 插件,来负责其数据格式标准化。

当前,LLaMA-Factory 已内置了 Alpaca ConverterPair Converter,这两类数据集可以直接使用对应的 converter 进行标准化,无需自定义转换器。

2. Alpaca Converter 详解

2.1 Alpaca 格式

Alpaca 格式是一种常见的指令微调数据格式:

{
  "system": "You are a helpful assistant.",
  "instruction": "Describe a process of making crepes.",
  "input": "",
  "output": "Making crepes is an easy and delicious process..."
}

2.2 Alpaca Converter 接口定义

class AlpacaSample(TypedDict, total=False):
    """Alpaca 格式数据样本结构

    attr:
        system (str, 可选): 系统提示信息(system prompt),用于设定对话背景或模型行为。
        instruction (str, 可选): 用户指令(user instruction),通常为任务描述。
        input (str, 可选): 额外的输入内容(input text),可与 instruction 拼接。
        output (str, 可选): 模型生成的目标输出(expected response)。
    """
    ...


def alpaca_converter(raw_sample: AlpacaSample) -> SFTSample:
    """将 Alpaca 样本转换为 SFT(Supervised Fine-Tuning)标准样本格式

    `alpaca_converter` 将 Alpaca 数据集中一条样本转换为通用的 `SFTSample` 格式
    该格式用于监督微调(SFT)或多轮对话建模

    转换逻辑:
        - 若存在 `system` 字段,则生成一条系统消息,loss_weight = 0.0
        - 若存在 `instruction` 或 `input` 字段,则合并为一条用户消息,loss_weight = 0.0
        - 若存在 `output` 字段,则生成一条助手机器人回复消息,loss_weight = 1.0

    args:
        raw_sample (AlpacaSample): 原始 Alpaca 数据样本

    return:
        SFTSample: 转换后的标准化样本,格式如下:

            {
                "messages": [
                    {"role": "system", "content": [{"type": "text", "value": "..."}], "loss_weight": 0.0},
                    {"role": "user", "content": [{"type": "text", "value": "..."}], "loss_weight": 0.0},
                    {"role": "assistant", "content": [{"type": "text", "value": "..."}], "loss_weight": 1.0},
                ]
            }

    example:
        >>> raw = {"instruction": "请将以下句子翻译成英文:", "input": "你好", "output": "Hello"}
        >>> alpaca_converter(raw)
        {
            "messages": [
                {"role": "user", "content": [{"type": "text", "value": "请将以下句子翻译成英文:你好"}], "loss_weight": 0.0},
                {"role": "assistant", "content": [{"type": "text", "value": "Hello"}], "loss_weight": 1.0}
            ]
        }
    """

2.3 转换过程

alpaca_converter 函数将 Alpaca 格式转换为标准格式,转换逻辑如下:

def alpaca_converter(raw_sample: AlpacaSample) -> SFTSample:
    messages = []

    # 1. 添加系统提示词(如果存在)
    if "system" in raw_sample:
        messages.append({
            "role": "system",
            "content": [{"type": "text", "value": raw_sample["system"]}],
            "loss_weight": 0.0
        })

    # 2. 添加用户输入(instruction + input)
    if "instruction" in raw_sample or "input" in raw_sample:
        user_content = raw_sample.get("instruction", "") + raw_sample.get("input", "")
        messages.append({
            "role": "user",
            "content": [{"type": "text", "value": user_content}],
            "loss_weight": 0.0
        })

    # 3. 添加模型回复
    if "output" in raw_sample:
        messages.append({
            "role": "assistant",
            "content": [{"type": "text", "value": raw_sample["output"]}],
            "loss_weight": 1.0
        })

    return {"messages": messages}

2.4 转换示例

输入(Alpaca 格式):

{
  "instruction": "What is the capital of France?",
  "input": "",
  "output": "The capital of France is Paris."
}

输出(标准格式):

{
  "messages": [
    {
      "role": "user",
      "content": [{"type": "text", "value": "What is the capital of France?"}],
      "loss_weight": 0.0
    },
    {
      "role": "assistant",
      "content": [{"type": "text", "value": "The capital of France is Paris."}],
      "loss_weight": 1.0
    }
  ]
}

3. 自定义转换器

3.1 创建自定义转换器

如果用户有自己的数据格式,可以轻松添加自定义转换器将其标准化,实现过程可参考如下示例:

# src/llamafactory/v1/plugins/data_plugins/converter.py

from typing import TypedDict, NotRequired
from ...extras.types import SFTSample

# 1. 定义输入格式的类型
class MyCustomSample(TypedDict, total=False):
    question: str
    answer: str
    context: NotRequired[str]

# 2. 实现转换逻辑
def custom_converter(raw_sample: MyCustomSample) -> SFTSample:
    messages = []

    # 构建用户消息
    user_text = raw_sample["question"]
    if "context" in raw_sample:
        user_text = f"Context: {raw_sample['context']}\n\nQuestion: {user_text}"

    messages.append({
        "role": "user",
        "content": [{"type": "text", "value": user_text}],
        "loss_weight": 0.0
    })

    # 构建助手消息
    messages.append({
        "role": "assistant",
        "content": [{"type": "text", "value": raw_sample["answer"]}],
        "loss_weight": 1.0
    })

    return {"messages": messages}

# 3. 注册 custom_converter
#src/llamafactory/v1/plugins/data_plugins/converter.py: CONVERTERS
CONVERTERS = {
    "alpaca": alpaca_converter,
    "custom": custom_converter,  # 添加自定义转换器
}

3.2 使用自定义转换器

在 YAML 配置中指定转换器名称:

my_dataset:
  file_name: custom_data.json
  converter: custom

DataLoaderPlugin

1. DataLoaderPlugin 简介

DataLoaderPlugin 负责从本地文件加载数据集,当前支持如下文件格式:

  • JSON: .json

  • JSONL: .jsonl

  • CSV: .csv

  • Parquet: .parquet

  • Arrow: .arrow

  • Text: .txt

2. DataLoaderPlugin 接口定义

@dataclass
class DataLoaderPlugin:
    """数据加载插件(DataLoaderPlugin)

    负责根据数据集信息(`DatasetInfo`)自动加载本地或远程数据集。  
    支持多种文件格式(如 CSV、JSON、Parquet、Text、Arrow),并可选择是否以流式方式加载。

    通常由 `DataEngine` 调用,用于统一封装数据加载逻辑。
    """

    args: DataArguments
    """数据参数对象,包含数据目录、缓存路径、分片等配置信息。"""


    def _get_builder_name(self, path: str) -> Literal["arrow", "csv", "json", "parquet", "text"]:
        """获取数据集文件格式

        根据输入文件路径自动判断应使用的 HuggingFace `load_dataset` 构建器类型。
        通过文件扩展名推断数据类型,例如 `.csv`、`.jsonl`、`.parquet`、`.txt` 等。

        args:
            path (str): 数据集文件路径,用于识别文件类型。

        return:
            Literal["arrow", "csv", "json", "parquet", "text"]:
                数据构建器名称,用于 `datasets.load_dataset()`。

        example:
            >>> _get_builder_name("data/train.jsonl")
            "json"
        """
        ...


    def auto_load_data(self, dataset_info: DatasetInfo) -> HFDataset:
        """根据传入的 `dataset_info` 自动选择合适的加载方式

        args:
            dataset_info (DatasetInfo): 数据集元信息,通常包含:
                - `file_name`: 数据文件路径
                - `split`: 数据划分(如 "train"、"test");
                - `streaming`: 是否启用流式加载

        return:
            HFDataset: 加载完成的 Hugging Face 数据集对象。

        example:
            >>> plugin = DataLoaderPlugin(args)
            >>> ds = plugin.auto_load_data({"file_name": "~/data.json", "split": "train"})
        """
        ...


    def load_data_from_file(self, filepath: str, split: str, streaming: bool) -> HFDataset:
        """从文件或目录加载数据集

        根据输入路径自动识别文件类型(CSV、JSON、Parquet、Text 等),  
        并通过 `datasets.load_dataset()` 加载数据集。  
        若 `streaming=True`,则将结果转换为迭代式数据集。

        args:
            filepath (str): 文件路径或目录路径。
            split (str): 数据划分名称(如 "train"、"validation")。
            streaming (bool): 是否启用流式加载模式。

        return:
            HFDataset: 加载后的数据集对象。

        example:
            >>> plugin.load_data_from_file("data/train.json", "train", False)
        """
        ...


DataIndexPlugin

1. DataIndexPlugin 简介

DataIndexPlugin 负责调整数据索引,支持通过配置 size, weight 等参数控制数据集样本数量和采样频率。

  • 使用 size 参数 限制使用的样本数量:

my_dataset:
  file_name: large_dataset.json
  size: 1000  # 只使用前 1000 个样本
  • 使用 weight 参数调整数据集在混合数据中的采样频率:

dataset_a:
  file_name: data_a.json
  weight: 1.0

dataset_b:
  file_name: data_b.json
  weight: 2.0  # dataset_b 的样本出现频率是 dataset_a 的 2 倍

说明weight 参数适用于在多个数据集混合训练时,调整不同数据集的的采样频率

  • weight=1.0 时,数据集按原始比例采样

  • weight=2.0 时,该数据集的索引会复制 2 倍,使其样本出现频率翻倍

2. DataIndexPlugin 接口定义

@dataclass
class DataIndexPlugin:
    """数据索引插件(DataIndexPlugin)

    根据 `size` 和 `weight` 调整数据索引列表,控制数据集的样本数量和采样频率  
    通常在多数据集混合训练时使用,以控制不同数据集在总体样本中的占比。

    在 `DataEngine.build_data_index` 中被自动调用,用于实现样本重采样或加权分布。
    """

    def adjust_data_index(
        self, data_index: list[tuple[str, int]], size: Optional[int], weight: Optional[float]
    ) -> list[tuple[str, int]]:
        """调整数据索引列表

        根据 `size` 或 `weight` 参数对输入的数据索引进行采样、扩展或缩减。  
        若两个参数同时存在,将依次执行基于大小和基于权重的调整。

        args:
            data_index (list[tuple[str, int]]):  
                数据索引列表,每个元素为 `(dataset_name, sample_index)`。  
            size (Optional[int]):  
                目标样本数量,若指定则根据该数量裁剪或重复样本。  
            weight (Optional[float]):  
                数据集权重,用于控制数据集在混合训练中的采样比例。

        return:
            list[tuple[str, int]]:  
                调整后的数据索引列表。

        example:
            >>> plugin = DataIndexPlugin()
            >>> adjusted = plugin.adjust_data_index([("ds1", i) for i in range(100)], size=50, weight=None)
            >>> len(adjusted)
            50
        """
        ...


    def adjust_by_size(self, data_index: list[tuple[str, int]], size: int) -> list[tuple[str, int]]:
        """根据目标大小调整数据索引

        通过裁剪或重复样本,使索引总数等于 `size`。  
        常用于统一不同数据集的样本数量。

        args:
            data_index (list[tuple[str, int]]):  
                原始数据索引列表。  
            size (int):  
                目标样本数量。

        return:
            list[tuple[str, int]]:  
                调整后长度等于 `size` 的数据索引列表。

        example:
            >>> plugin.adjust_by_size([("ds1", i) for i in range(10)], 20)
        """
        ...


    def adjust_by_weight(self, data_index: list[tuple[str, int]], weight: float) -> list[tuple[str, int]]:
        """根据权重调整数据索引

        通过加权采样或重复样本,使数据集样本出现频率符合指定权重。  
        常用于多数据源训练中按比例平衡样本。

        args:
            data_index (list[tuple[str, int]]):  
                原始数据索引列表。  
            weight (float):  
                数据集权重(相对比例,可与其他数据集共同归一化)。

        return:
            list[tuple[str, int]]:  
                调整后的加权数据索引列表。

        example:
            >>> plugin.adjust_by_weight([("ds1", i) for i in range(10)], 0.5)
        """
        ...


DataSelectorPlugin

1. DataSelectorPlugin 简介

DataSelectorPluginDataEngine提供基于索引访问数据的功能,由 DataEngine__getitem__ 方法自动调用。

2. DataSelectorPlugin 接口定义

@dataclass
class DataSelectorPlugin:
    """根据索引选择数据集样本。

    配合 `DataEngine` 使用,通过统一的 `data_index` 结构(包含数据集名与样本索引)来实现灵活的数据选择

    """

    data_index: list[tuple[str, int]]
    """数据索引列表,每个元素为 (dataset_name, sample_index)。"""


    def select(self, index: Union[slice, list[int], Any]) -> Union[tuple[str, int], list[tuple[str, int]]]:
        """选择数据集样本

        根据输入类型从 `data_index` 中选择对应的样本索引  
        支持三种索引方式:
            - 切片(slice):返回对应范围内的样本
            - 索引列表(list[int]):返回指定索引处的多个样本
            - 其他类型输入将触发异常。

        args:
            index (Union[slice, list[int], Any]): 数据样本索引
                可以是切片(`slice`)或索引列表

        return:
            Union[tuple[str, int], list[tuple[str, int]]]:
                - 若为单个索引:返回一个 `(dataset_name, sample_index)`
                - 若为多个索引或切片:返回多个样本的列表

        except:
        Raises:
            ValueError: 当输入索引类型不受支持时抛出。
        ...