# Copyright 2024 Bytedance Ltd. and/or its affiliates # Copyright 2023-2024 SGLang Team # Copyright 2025 ModelBest Inc. and/or its affiliates # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import logging import os import re from collections import defaultdict from typing import Optional import datasets import numpy as np import torch from omegaconf import DictConfig, ListConfig from torch.utils.data import Dataset from transformers import PreTrainedTokenizer, ProcessorMixin import verl.utils.torch_functional as verl_F from verl.utils.model import compute_position_id_with_mask logger = logging.getLogger(__name__) def collate_fn(data_list: list[dict]) -> dict: """ Collate a batch of sample dicts into batched tensors and arrays. Args: data_list: List of dicts mapping feature names to torch.Tensor or other values. Returns: Dict where tensor entries are stacked into a torch.Tensor of shape (batch_size, \*dims) and non-tensor entries are converted to np.ndarray of dtype object with shape (batch_size,). """ tensors = defaultdict(list) non_tensors = defaultdict(list) for data in data_list: for key, val in data.items(): if isinstance(val, torch.Tensor): tensors[key].append(val) else: non_tensors[key].append(val) for key, val in tensors.items(): tensors[key] = torch.stack(val, dim=0) for key, val in non_tensors.items(): non_tensors[key] = np.array(val, dtype=object) return {**tensors, **non_tensors} class RLHFDataset(Dataset): """ Load and preprocess RLHF data from Parquet files. - Caches files locally. - Reads into a HuggingFace Dataset and tokenizes prompts. - Optionally handles images/videos via a ProcessorMixin. - Filters prompts over a max length. - Supports resuming from checkpoints. Args: data_files (str or list): Path(s) to Parquet file(s). tokenizer (PreTrainedTokenizer): For the tokenization of text to token IDs. config (DictConfig): Options like cache_dir, prompt_key, max_prompt_length, truncation, etc. processor (ProcessorMixin, optional): Multimodal preprocessor for images/videos. """ def __init__( self, data_files: str | list[str], tokenizer: PreTrainedTokenizer, config: DictConfig, processor: Optional[ProcessorMixin] = None, ): if not isinstance(data_files, list | ListConfig): data_files = [data_files] self.data_files = copy.deepcopy(data_files) self.original_data_files = copy.deepcopy(data_files) # use for resume self.tokenizer = tokenizer self.processor = processor self.config = config self.cache_dir = os.path.expanduser(config.get("cache_dir", "~/.cache/verl/rlhf")) self.prompt_key = config.get("prompt_key", "prompt") self.image_key = config.get("image_key", "images") self.video_key = config.get("video_key", "videos") self.max_prompt_length = config.get("max_prompt_length", 1024) self.return_raw_chat = config.get("return_raw_chat", False) self.return_full_prompt = config.get("return_full_prompt", False) self.truncation = config.get("truncation", "error") self.filter_overlong_prompts = config.get("filter_overlong_prompts", True) self.num_workers = config.get("filter_overlong_prompts_workers", max(1, os.cpu_count() // 4)) self.num_workers = min(self.num_workers, os.cpu_count()) self.use_shm = config.get("use_shm", False) self.chat_template_func = config.get("chat_template_func", None) self.need_tools_kwargs = config.get("need_tools_kwargs", False) self.filter_prompts = config.get("filter_prompts", True) self.serialize_dataset = False self.return_multi_modal_inputs = config.get("return_multi_modal_inputs", True) self._download() self._read_files_and_tokenize() def _download(self, use_origin_parquet=False): from verl.utils.fs import copy_to_local data_files = self.data_files if not use_origin_parquet else self.original_data_files for i, parquet_file in enumerate(data_files): self.data_files[i] = copy_to_local(src=parquet_file, cache_dir=self.cache_dir, use_shm=self.use_shm) def _read_files_and_tokenize(self): dataframes = [] for parquet_file in self.data_files: # read parquet files and cache dataframe = datasets.load_dataset("parquet", data_files=parquet_file)["train"] dataframes.append(dataframe) self.dataframe: datasets.Dataset = datasets.concatenate_datasets(dataframes) print(f"dataset len: {len(self.dataframe)}") self.dataframe = self.maybe_filter_out_long_prompts(self.dataframe) def maybe_filter_out_long_prompts(self, dataframe: datasets.Dataset = None): # filter out too long prompts if self.filter_overlong_prompts: tokenizer = self.tokenizer processor = self.processor prompt_key = self.prompt_key image_key = self.image_key video_key = self.video_key if processor is not None: from verl.utils.dataset.vision_utils import process_image, process_video def doc2len(doc) -> int: messages = self._build_messages(doc) raw_prompt = self.processor.apply_chat_template( messages, add_generation_prompt=True, tokenize=False ) images = [process_image(image) for image in doc[image_key]] if image_key in doc else None videos = [process_video(video) for video in doc[video_key]] if video_key in doc else None return len(processor(text=[raw_prompt], images=images, videos=videos)["input_ids"][0]) else: def doc2len(doc) -> int: return len(tokenizer.apply_chat_template(doc[prompt_key], add_generation_prompt=True)) dataframe = dataframe.filter( lambda doc: doc2len(doc) <= self.max_prompt_length, num_proc=self.num_workers, desc=f"Filtering prompts longer than {self.max_prompt_length} tokens", ) print(f"filter dataset len: {len(dataframe)}") return dataframe def resume_dataset_state(self): self.serialize_dataset = not hasattr(self, "original_data_files") # resume dataframe if not it's serialized in data.pt if not self.serialize_dataset: self._download(use_origin_parquet=True) # download and resume from original parquet files self._read_files_and_tokenize() else: print(r"old dataloader ckpt file is used, please train from scratch for better ckpt performance") def __len__(self): return len(self.dataframe) def _build_messages(self, example: dict): messages: list = example.pop(self.prompt_key) if self.image_key in example or self.video_key in example: for message in messages: content = message["content"] content_list = [] segments = re.split("(|