# Copyright 2025 Individual Contributor: Thibaut Barroyer # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import multiprocessing from functools import partial import ray from verl import DataProto from verl.trainer.ppo.reward import compute_reward, get_custom_reward_fn from .reward_score import _default_compute_score def load_reward_manager(config, tokenizer, num_examine, **reward_kwargs): """ Load and initialize a reward manager based on the configuration. Args: config: PPO trainer configuration object containing reward_model fields. tokenizer: Tokenizer object used for processing text. num_examine: Number of samples to examine. **reward_kwargs: Additional keyword arguments for the reward manager. Returns: An instance of the specified reward manager class. """ from verl.workers.reward_manager import get_reward_manager_cls # The list of pre-defined reward managers are defined in `verl/workers/reward_manager/`: # naive: NaiveRewardManager # prime: PrimeRewardManager # batch: BatchRewardManager # dapo: DAPORewardManager # Note(haibin.lin): For custom reward managers, please make sure they are imported and # registered via `verl.workers.reward_manager.register` # By default reward_manager is set to naive (NaiveRewardManager) reward_manager_name = config.reward_model.get("reward_manager", "naive") reward_manager_cls = get_reward_manager_cls(reward_manager_name) # Try to get a custom reward function based on the configuration compute_score = get_custom_reward_fn(config) final_compute_score = compute_score if compute_score is None: sandbox_config = config.reward_model.get("sandbox_fusion") sandbox_url = sandbox_config.get("url") if sandbox_config else None if sandbox_url: sandbox_manager = multiprocessing.Manager() # Create a semaphore to control concurrent access to the sandbox _concurrent_semaphore = sandbox_manager.Semaphore(sandbox_config.get("max_concurrent", 64)) final_compute_score = partial( _default_compute_score, sandbox_fusion_url=sandbox_url, concurrent_semaphore=_concurrent_semaphore ) else: final_compute_score = _default_compute_score # Instantiate and return the reward manager with the specified parameters return reward_manager_cls( tokenizer=tokenizer, num_examine=num_examine, compute_score=final_compute_score, reward_fn_key=config.data.reward_fn_key, **reward_kwargs, ) @ray.remote(num_cpus=1) def compute_reward_async(data: DataProto, config, tokenizer): """ Load the reward manager and compute the reward for a batch of data. This is meant to be run in a separate Ray worker. """ reward_fn = load_reward_manager(config, tokenizer, num_examine=0, **config.reward_model.get("reward_kwargs", {})) return compute_reward(data, reward_fn)