one_step_off.md 13.5 KB
Newer Older
jerrrrry's avatar
jerrrrry committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# Recipe: One Step Off Policy Async Trainer

**Author:**  `https://github.com/meituan-search`

Last updated: 07/17/2025.

## Introduction

### Background

The current reinforcement learning training process implemented by verl is synchronous, adhering to the algorithmic
workflows of established methods like PPO, GRPO, and DAPO. In each step, training samples are generated by the latest
model, and the model is updated after training completes. While this approach aligns with off-policy reinforcement
learning and stabilizes RL training, but it suffers from severe efficiency issues.
Model updates must wait for the longest output in the generation phase to complete.
During the generation of long-tail samples, GPUs remain idle, resulting in significant underutilization.
The more severe the long-tail problem in sample generation, the lower the overall training efficiency.
For example, in DAPO 32B training, the Rollout phase accounts for approximately 70% of the total time,
and increasing resources does not reduce the Rollout duration.

![DAPO 32B Math Performance](
https://raw.githubusercontent.com/eric-haibin-lin/verl-community/refs/heads/main/docs/dapo_32b_math.png)
> source data: https://wandb.ai/verl-org/DAPO%20Reproduction%20on%20verl/workspace?nw=nwusertongyuxuan361

### Solution

We have implemented the **One Step Off Async Trainer** to help alleviate this issue. This approach parallelizes the
generation and training processes, utilizing samples generated in the previous step for current training.
It also involves appropriately partitioning resources, allocating dedicated resources for generation while automatically
assigning the remainder to training. By reducing resources allocated to the generation phase, we mitigate GPU idle time
during long-tail sample generation. Throughout this process, generation and training parameters maintain a one-step off
policy.

![One Step Off Policy Diagram](
https://raw.githubusercontent.com/eric-haibin-lin/verl-community/refs/heads/main/docs/one_step_off_policy.png)
> reference: [AReaL: A Large-Scale Asynchronous Reinforcement Learning System for Language Reasoning](
> https://arxiv.org/abs/2505.24298)

Our core contributions include:

1. **Parallel Generation and Training**:  
   Samples for the next batch are asynchronously generated while the current batch is being trained.

2. **Resource Isolation**:  
   Unlike `hybrid_engine`, this method requires explicit resource allocation for rollout, with remaining resources
   automatically assigned to training.

3. **NCCL Parameter Synchronization**:  
   Employs NCCL communication primitives for seamless parameter transfer between generation and training modules.

### Experimental Results

- **Machine Configuration**: 2 nodes with 16 H20 GPUs each
   - Generation: 4 GPUs
   - Training: 12 GPUs
- **Model**: Qwen2.5-Math-7B
- **Rollout Configuration**:
- **Max Response Length**: FSDP2: 20,480 tokens; Megatron: 8,192 tokens
- **Algorithm**: DAPO
- **Rollout Engine**: vLLM

| training mode          | engine        | step | gen | wait_prev_gen | generate_sequences | old_log_prob | update_actor | total time    | acc/best@32/mean | acc/maj@32/mean |
|------------------------|---------------|------|-----|---------------|--------------------|--------------|--------------|---------------|------------------|-----------------|
| colocate sync          | VLLM+FSDP2    | 749  | 321 | -             | 247                | 88           | 286          | 19h18m        | 0.5948           | 0.417           |
| one-step-overlap async | VLLM+FSDP2    | 520  | -   | 45            | 458                | 108          | 337          | 15h34m(+23%)  | 0.6165           | 0.494           |
| colocate sync          | VLLM+Megatron | 699  | 207 | -             | 162                | 119          | 344          | 18h21m        | 0.605            | 0.4217          |
| one-step-overlap async | VLLM+Megatron | 566  | -   | 59            | 501                | 120          | 347          | 13h06m (+40%) | 0.6569           | 0.4038          |

* colocate sync: step ≈ gen + old_log_prob + update_actor
* one-step-overlap async: step ≈ wait_prev_gen + old_log_prob + update_actor

![One Step Off Megatron Performance](
https://raw.githubusercontent.com/eric-haibin-lin/verl-community/refs/heads/main/docs/one_step_off_megatron.png)

> source data: https://wandb.ai/hou-zg-meituan/one-step-off-policy?nw=nwuserhouzg

## Implementation

### One Step Off Policy Async Pipline

Our implemented **One Step Off Policy Async Pipeline** integrates seamlessly into existing training logic at minimal
cost,
eliminating the need for additional sample storage management. The core mechanism uses `async_gen_next_batch`
for asynchronous rollout generation while maintaining continuous operation during epoch transitions
via `create_continuous_iterator`.

```python
# iterator generator, simplify one-step integration of the training process
def _create_continuous_iterator(self):
   for epoch in range(self.config.trainer.total_epochs):
      iterator = iter(self.train_dataloader)
      for batch_dict in iterator:
         yield epoch, batch_dict


# read next batch samples, parameters sync and launch asyn gen_seq
def _async_gen_next_batch(self, continuous_iterator):
   # read train_data
   try:
      epoch, batch_dict = next(continuous_iterator)
   except StopIteration:
      return None
   batch = DataProto.from_single_dict(batch_dict)
   gen_batch = batch_pocess(batch)
   # sync weights from actor to rollout
   self.sync_rollout_weights()
   # async generation
   gen_batch_output = self.rollout_wg.async_generate_sequences(gen_batch)
   # future encapsulated
   return GenerationBatchFuture(epoch, batch, gen_batch_output)


continuous_iterator = self._create_continuous_iterator()
# run rollout first to achieve one-step-off
batch_data_future = self._async_gen_next_batch(continuous_iterator)

while batch_data_future is not None:
   # wait for the gen_seq result from the previous step
   batch = batch_data_future.get()
   # launch the next async call to generate sequences
   batch_data_future = self._async_gen_next_batch(continuous_iterator)

   # compute advantages 
   batch = critic.compute_values(batch)
   batch = reference.compute_log_prob(batch)
   batch = reward.compute_reward(batch)
   batch = compute_advantages(batch)

   # model update
   critic_metrics = critic.update_critic(batch)
   actor_metrics = actor.update_actor(batch)
```

### Parameter Synchronization

The exciting point is that our nccl based weights updating for rollout model has great performance.
At most of time, the latency is under 300ms, which is negligible for RLHF.

> **sync_rollout_weights**:The time for synchronizing parameters from actor to rollout is extremely fast and can almost
> be ignored because it is implemented with nccl.

```python
class ActorRolloutRefWorker:
   # actor acquires the meta-info of model parameters for parameter sync
   @register(dispatch_mode=Dispatch.ONE_TO_ALL)
   def get_actor_weights_info(self):
      params = self._get_actor_params()
      ret = []
      for key, tensor in params.items():
         ret.append((key, tensor.size(), tensor.dtype))
      self._weights_info = ret
      return ret

   # rollout sets the meta-info of model parameters for parameter sync
   @register(dispatch_mode=Dispatch.ONE_TO_ALL)
   def set_actor_weights_info(self, weights_info):
      self._weights_info = weights_info


class AsyncRayPPOTrainer(RayPPOTrainer):
   def init_workers(self):


...
# rollout obtains the meta-info of model parameters from the actor for parameter sync
weights_info = self.actor_wg.get_actor_weights_info()[0]
self.rollout_wg.set_actor_weights_info(weights_info)

# Create an actor-rollout communication group for parameter sync
actor_rollout_workers = self.actor_wg.workers + self.rollout_wg.workers
collective.create_collective_group(
   actor_rollout_workers,
   len(actor_rollout_workers),
   list(range(0, len(actor_rollout_workers))),
   backend="nccl",
   group_name="actor_rollout"
)
```

```python
# drive process call the actor and rollout respectively to sync parameters by nccl 
def sync_rollout_weights(self):
   self.actor_wg.sync_rollout_weights()
   ray.get(self.rollout_wg.sync_rollout_weights())


# fsdp model parameter sync
@register(dispatch_mode=Dispatch.ONE_TO_ALL, blocking=False)
def sync_rollout_weights(self):
   params = self._get_actor_params() if self._is_actor else None
   if self._is_rollout:
      inference_model = (
         self.rollout.inference_engine.llm_engine.model_executor.driver_worker.worker.model_runner.model
      )
      patch_vllm_moe_model_weight_loader(inference_model)
   # Model parameters are broadcast tensor-by-tensor from actor to rollout
   for key, shape, dtype in self._weights_info:
      tensor = torch.empty(shape, dtype=dtype, device=get_torch_device().current_device())
      if self._is_actor:
         assert key in params
         origin_data = params[key]
         if hasattr(origin_data, "full_tensor"):
            origin_data = origin_data.full_tensor()
         if torch.distributed.get_rank() == 0:
            tensor.copy_(origin_data)
      from ray.util.collective import collective

      collective.broadcast(tensor, src_rank=0, group_name="actor_rollout")
      if self._is_rollout:
         inference_model.load_weights([(key, tensor)])
```

## Usage

### FSDP2 Configuration Example

```shell
python3 -m recipe.one_step_off_policy.async_main_ppo \
    --config-path=config \
    --config-name='one_step_off_ppo_trainer.yaml' \
    actor_rollout_ref.actor.strategy=fsdp2 \
    # actor and rollout are placed separately
    actor_rollout_ref.hybrid_engine=False \
    # actor and rollout resource
    trainer.nnodes=1 \
    trainer.n_gpus_per_node=6 \
    rollout.nnodes=1 \
    rollout.n_gpus_per_node=2
```

### Megatron Configuration Example

```shell
python3 -m recipe.one_step_off_policy.async_main_ppo \
    --config-path=config \
    --config-name='one_step_off_ppo_megatron_trainer.yaml' \
    actor_rollout_ref.actor.strategy=megatron \
    # actor and rollout are placed separately
    actor_rollout_ref.hybrid_engine=False \
    # actor and rollout resource
    trainer.nnodes=1 \
    trainer.n_gpus_per_node=6 \
    rollout.nnodes=1 \
    rollout.n_gpus_per_node=2
```

### Configuration Guidelines

1. **Card Number Relationships**  
   Maintain either of these relationships for optimal batch distribution:
   - `actor_rollout_ref.rollout.n` should be an integer divisor of:  
     `trainer.n_gpus_per_node * trainer.nnodes`
   - `actor_rollout_ref.rollout.n * data.train_batch_size` should be evenly divisible by:  
     `trainer.n_gpus_per_node * trainer.nnodes`

   > Rationale: Ensures training samples can be evenly distributed across training GPUs when using partial resources for
   generation.

2. **Dynamic Resource Tuning**  
   Adjust `trainer.nnodes` `trainer.n_gpus_per_node` `rollout.nnodes` `rollout.n_gpus_per_node` based on phase
   durations:
   - **Ideal state**: Rollout and training phases have comparable durations
   - **Diagnostic metrics**:
      - Monitor `wait_prev_gen` duration
      - Analyze `sequence_length` distribution
   - **Adjustment strategy**:
      - High `wait_prev_gen` + uniform sequence lengths → Increase rollout resources
      - High `wait_prev_gen` + long-tail sequences → Optimize stopping criteria (resource increase won't help)
   > **wait_prev_gen**:The time consumed waiting for the previous rollout to end (the part that is not fully
   overlapped).
   **Resource Configuration Strategies:**
   - **Resource-constrained scenario**: Optimize resource utilization by adjusting GPU allocation ratios,
     keeping the number of nodes equal to allow training and rollout to share nodes;
      - Configure `trainer.nnodes = rollout.nnodes` with
        `trainer.n_gpus_per_node + rollout.n_gpus_per_node = physical_gpus_per_node`. Control rollout resource
        allocation by adjusting `n_gpus_per_node`.
   - **Resource-abundant scenario**: Optimize performance by adjusting the number of nodes,
     keeping the number of GPUs per node equal to enable independent scaling of training and rollout
     parallelism.
      - Configure `trainer.n_gpus_per_node = rollout.n_gpus_per_node` and control rollout resource allocation by
        adjusting `trainer.nnodes` and `rollout.nnodes`to achieve optimal performance.
   > **Note**: The total number of nodes required by the system is not simply `trainer.nnodes + rollout.nnodes`. The
   > actual calculation depends on GPU capacity:
   > - When `trainer.n_gpus_per_node + rollout.n_gpus_per_node <= physical_gpus_per_node`,
       > the required node count is `max(trainer.nnodes, rollout.nnodes)`
   > - When `trainer.n_gpus_per_node + rollout.n_gpus_per_node > physical_gpus_per_node`,
       > the required node count is `trainer.nnodes + rollout.nnodes`

## Functional Support

| Category           | Support Situation                                                                                               |
|--------------------|-----------------------------------------------------------------------------------------------------------------|
| train engine       | FSDP2  <br/> Megatron                                                                                           |
| rollout engine     | vLLM                                                                                                            |
| AdvantageEstimator | GRPO <br/> GRPO_PASSK <br/> REINFORCE_PLUS_PLUS <br/> RLOO <br/> OPO <br/> REINFORCE_PLUS_PLUS_BASELINE<br/>GPG |
| Reward             | all                                                                                                             |