sequence.rs 15 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
//! Runtime-specific glue for [`ActiveSequencesMultiWorker`].
5
//!
6
7
8
9
10
11
12
13
14
//! This module provides the concrete [`SequencePublisher`] and [`SequenceSubscriber`]
//! implementations that wire the runtime-agnostic business logic (in `dynamo_kv_router`)
//! to NATS event transport and Prometheus metrics.

pub use dynamo_kv_router::multi_worker_sequence::{
    ActiveSequencesMultiWorker, SequenceError, SequencePublisher, SequenceRequest,
    SequenceSubscriber,
};
pub use dynamo_kv_router::sequence::{ActiveSequences, RequestId};
15

16
17
18
use anyhow::Result;
use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider;
19
use dynamo_runtime::transports::event_plane::{EventPublisher, EventSubscriber};
20
use std::collections::HashMap;
21
22
use std::sync::Arc;

23
use super::metrics::WORKER_LOAD_METRICS;
24
use super::protocols::{ActiveLoad, ActiveSequenceEvent, WorkerWithDpRank};
25
use crate::kv_router::{ACTIVE_SEQUENCES_SUBJECT, KV_METRICS_SUBJECT};
Yan Ru Pei's avatar
Yan Ru Pei committed
26
use crate::local_model::runtime_config::ModelRuntimeConfig;
27

28
29
/// Concrete [`SequencePublisher`] backed by NATS [`EventPublisher`] and Prometheus gauges.
pub struct RuntimeSequencePublisher {
30
    event_publisher: EventPublisher,
31
    metrics_publisher: Arc<EventPublisher>,
32
33
}

34
35
36
impl SequencePublisher for RuntimeSequencePublisher {
    async fn publish_event(&self, event: &ActiveSequenceEvent) -> anyhow::Result<()> {
        self.event_publisher.publish(event).await
37
38
    }

39
    fn publish_load(&self, load: ActiveLoad) {
40
41
        let publisher = self.metrics_publisher.clone();
        tokio::spawn(async move {
42
            if let Err(e) = publisher.publish(&load).await {
43
                tracing::trace!(
44
45
46
                    "Failed to publish ActiveLoad to NATS for worker (id={}, dp_rank={}): {e:?}",
                    load.worker_id,
                    load.dp_rank
47
48
49
                );
            }
        });
50
51
    }

52
    fn observe_load(
53
        &self,
54
55
56
57
58
59
60
61
62
63
64
65
        worker: &WorkerWithDpRank,
        worker_type: &str,
        blocks: usize,
        tokens: usize,
    ) {
        WORKER_LOAD_METRICS.observe(
            worker.worker_id,
            worker.dp_rank,
            worker_type,
            blocks,
            tokens,
        );
66
    }
67
}
68

69
70
71
72
/// Concrete [`SequenceSubscriber`] backed by NATS typed event stream.
pub struct RuntimeSequenceSubscriber {
    inner: dynamo_runtime::transports::event_plane::TypedEventSubscriber<ActiveSequenceEvent>,
}
73

74
75
76
77
78
impl SequenceSubscriber for RuntimeSequenceSubscriber {
    async fn next_event(&mut self) -> Option<anyhow::Result<ActiveSequenceEvent>> {
        match self.inner.next().await? {
            Ok((_envelope, event)) => Some(Ok(event)),
            Err(e) => Some(Err(e)),
79
        }
80
    }
81
}
82

83
84
/// Type alias for the runtime-wired multi-worker sequence tracker.
pub type ActiveSequencesMulti = ActiveSequencesMultiWorker<RuntimeSequencePublisher>;
85

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/// Convenience async constructor that creates the NATS publishers/subscribers
/// and returns an `Arc<ActiveSequencesMulti>` with replica sync already running.
pub async fn create_multi_worker_sequences(
    component: Component,
    block_size: usize,
    workers_with_configs: HashMap<u64, ModelRuntimeConfig>,
    replica_sync: bool,
    router_id: u64,
    worker_type: &'static str,
) -> Result<Arc<ActiveSequencesMulti>> {
    let event_publisher =
        EventPublisher::for_component(&component, ACTIVE_SEQUENCES_SUBJECT).await?;
    let metrics_publisher =
        Arc::new(EventPublisher::for_namespace(component.namespace(), KV_METRICS_SUBJECT).await?);

    let publisher = RuntimeSequencePublisher {
        event_publisher,
        metrics_publisher,
    };

106
    let dp_range: HashMap<u64, (u32, u32)> = workers_with_configs
107
        .into_iter()
108
109
110
111
112
113
        .map(|(id, config)| {
            (
                id,
                (config.data_parallel_start_rank, config.data_parallel_size),
            )
        })
114
115
116
117
118
        .collect();

    let multi_worker = ActiveSequencesMultiWorker::new(
        publisher,
        block_size,
119
        dp_range,
120
121
122
123
124
125
126
127
128
129
130
131
132
133
        replica_sync,
        router_id,
        worker_type,
    );

    let arc = Arc::new(multi_worker);

    if replica_sync {
        let subscriber = EventSubscriber::for_component(&component, ACTIVE_SEQUENCES_SUBJECT)
            .await?
            .typed::<ActiveSequenceEvent>();
        let subscriber = RuntimeSequenceSubscriber { inner: subscriber };
        let cancel_token = component.drt().runtime().child_token();
        arc.start_replica_sync(subscriber, cancel_token);
134
    }
135

136
    Ok(arc)
137
138
139
140
141
}

#[cfg(test)]
mod tests {
    use super::*;
142
    use dynamo_runtime::{DistributedRuntime, Runtime};
143

144
145
146
147
148
    #[test]
    fn test_active_sequences_shared_blocks() {
        let block_size = 4;
        let mut seq_manager = ActiveSequences::new(block_size);

149
        seq_manager.add_request("request_1".to_string(), Some(vec![1, 2, 3]), 12, 0, None);
150
151
152
        assert_eq!(seq_manager.active_blocks(), 3);
        assert_eq!(seq_manager.active_tokens(), 12);

153
        seq_manager.add_request("request_2".to_string(), Some(vec![4]), 4, 0, None);
154
155
156
        assert_eq!(seq_manager.active_blocks(), 4);
        assert_eq!(seq_manager.active_tokens(), 16);

157
        seq_manager.add_request("request_3".to_string(), Some(vec![1, 2, 3, 4]), 16, 4, None);
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
        assert_eq!(seq_manager.active_blocks(), 4);
        assert_eq!(seq_manager.active_tokens(), 16);

        seq_manager.free(&"request_2".to_string());
        assert_eq!(seq_manager.active_blocks(), 4);
        assert_eq!(seq_manager.active_tokens(), 12);

        seq_manager.free(&"request_3".to_string());
        assert_eq!(seq_manager.active_blocks(), 3);
        assert_eq!(seq_manager.active_tokens(), 12);

        seq_manager.free(&"request_1".to_string());
        assert_eq!(seq_manager.active_blocks(), 0);
        assert_eq!(seq_manager.active_tokens(), 0);
    }

174
    #[tokio::test]
175
    #[ignore]
176
    async fn test_multi_worker_cross_instance_sync() -> Result<()> {
177
178
        dynamo_runtime::logging::init();

179
        let block_size = 4;
180

181
182
        let runtime = Runtime::from_current()?;
        let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
183

184
        let namespace = distributed.namespace("test_cross_instance_sync")?;
185
        let component = namespace.component("sequences")?;
186

Yan Ru Pei's avatar
Yan Ru Pei committed
187
188
189
190
        let mut workers_with_configs = HashMap::new();

        let mut config_worker_0 = crate::local_model::runtime_config::ModelRuntimeConfig::new();
        config_worker_0.data_parallel_size = 2;
191
        workers_with_configs.insert(0, config_worker_0);
Yan Ru Pei's avatar
Yan Ru Pei committed
192
193

        let config_worker_1 = crate::local_model::runtime_config::ModelRuntimeConfig::new();
194
        workers_with_configs.insert(1, config_worker_1);
Yan Ru Pei's avatar
Yan Ru Pei committed
195

196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
        let seq_manager_1 = create_multi_worker_sequences(
            component.clone(),
            block_size,
            workers_with_configs.clone(),
            true,
            1,
            crate::discovery::WORKER_TYPE_DECODE,
        )
        .await?;
        let seq_manager_2 = create_multi_worker_sequences(
            component,
            block_size,
            workers_with_configs,
            true,
            2,
            crate::discovery::WORKER_TYPE_DECODE,
        )
        .await?;
214
215
216
217

        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

        seq_manager_1
218
219
220
221
222
223
224
225
226
            .add_request(SequenceRequest {
                request_id: "request_0".to_string(),
                token_sequence: Some(vec![0, 1, 2]),
                isl: 12,
                overlap: 0,
                expected_output_tokens: None,
                worker: WorkerWithDpRank::new(0, 0),
                lora_name: None,
            })
227
            .await?;
228

229
        seq_manager_1
230
231
232
233
234
235
236
237
238
            .add_request(SequenceRequest {
                request_id: "request_1".to_string(),
                token_sequence: Some(vec![3, 4]),
                isl: 8,
                overlap: 0,
                expected_output_tokens: None,
                worker: WorkerWithDpRank::new(0, 1),
                lora_name: None,
            })
239
            .await?;
240

241
        seq_manager_2
242
243
244
245
246
247
248
249
250
            .add_request(SequenceRequest {
                request_id: "request_2".to_string(),
                token_sequence: Some(vec![0, 1, 2, 3]),
                isl: 16,
                overlap: 0,
                expected_output_tokens: None,
                worker: WorkerWithDpRank::new(1, 0),
                lora_name: None,
            })
251
            .await?;
252

253
        tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
254

255
256
        let blocks_phase1 = seq_manager_1.active_blocks();
        let tokens_phase1 = seq_manager_1.active_tokens();
257

Yan Ru Pei's avatar
Yan Ru Pei committed
258
259
260
261
        let worker_0_dp0 = WorkerWithDpRank::new(0, 0);
        let worker_0_dp1 = WorkerWithDpRank::new(0, 1);
        let worker_1_dp0 = WorkerWithDpRank::new(1, 0);

262
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
263
264
            blocks_phase1[&worker_0_dp0], 3,
            "Worker 0 dp_rank 0 should have 3 active blocks (from request_0)"
265
        );
266
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
267
268
            blocks_phase1[&worker_0_dp1], 2,
            "Worker 0 dp_rank 1 should have 2 active blocks (from request_1)"
269
270
        );
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
271
272
            blocks_phase1[&worker_1_dp0], 4,
            "Worker 1 dp_rank 0 should have 4 active blocks (from request_2 added by seq_manager_2)"
273
274
        );
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
275
276
            tokens_phase1[&worker_0_dp0], 12,
            "Worker 0 dp_rank 0 should have 12 active tokens"
277
        );
278
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
279
280
281
282
283
284
            tokens_phase1[&worker_0_dp1], 8,
            "Worker 0 dp_rank 1 should have 8 active tokens"
        );
        assert_eq!(
            tokens_phase1[&worker_1_dp0], 16,
            "Worker 1 dp_rank 0 should have 16 active tokens (from request_2 added by seq_manager_2)"
285
        );
286
287
288
289
290
291
292
293

        seq_manager_1.free(&"request_2".to_string()).await?;

        seq_manager_2.free(&"request_0".to_string()).await?;
        seq_manager_2.free(&"request_1".to_string()).await?;

        tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;

294
295
        let blocks_phase2 = seq_manager_2.active_blocks();
        let tokens_phase2 = seq_manager_2.active_tokens();
296

Yan Ru Pei's avatar
Yan Ru Pei committed
297
298
299
300
301
302
303
        let all_workers = vec![
            WorkerWithDpRank::new(0, 0),
            WorkerWithDpRank::new(0, 1),
            WorkerWithDpRank::new(1, 0),
        ];

        for worker in all_workers {
304
            assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
305
306
307
                blocks_phase2[&worker], 0,
                "Worker (id={}, dp_rank={}) should have 0 active blocks after all requests freed",
                worker.worker_id, worker.dp_rank
308
309
            );
            assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
310
311
312
                tokens_phase2[&worker], 0,
                "Worker (id={}, dp_rank={}) should have 0 active tokens after all requests freed",
                worker.worker_id, worker.dp_rank
313
314
315
316
317
318
319
320
321
322
323
            );
        }

        Ok(())
    }

    #[tokio::test]
    #[ignore]
    async fn test_multi_worker_no_token_sequence_sync() -> Result<()> {
        dynamo_runtime::logging::init();

324
        let block_size = 4;
325
326
327
328
329

        let runtime = Runtime::from_current()?;
        let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;

        let namespace = distributed.namespace("test_no_token_seq_sync")?;
330
        let component = namespace.component("sequences")?;
331

Yan Ru Pei's avatar
Yan Ru Pei committed
332
        let mut workers_with_configs = HashMap::new();
333
334
335
336
337
338
339
340
341
342
343
344
        workers_with_configs.insert(
            0,
            crate::local_model::runtime_config::ModelRuntimeConfig::new(),
        );
        workers_with_configs.insert(
            1,
            crate::local_model::runtime_config::ModelRuntimeConfig::new(),
        );
        workers_with_configs.insert(
            2,
            crate::local_model::runtime_config::ModelRuntimeConfig::new(),
        );
Yan Ru Pei's avatar
Yan Ru Pei committed
345

346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
        let seq_manager_1 = create_multi_worker_sequences(
            component.clone(),
            block_size,
            workers_with_configs.clone(),
            true,
            1,
            crate::discovery::WORKER_TYPE_DECODE,
        )
        .await?;
        let seq_manager_2 = create_multi_worker_sequences(
            component,
            block_size,
            workers_with_configs,
            true,
            2,
            crate::discovery::WORKER_TYPE_DECODE,
        )
        .await?;
364
365
366
367

        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

        seq_manager_1
368
369
370
371
372
373
374
375
376
            .add_request(SequenceRequest {
                request_id: "request_0".to_string(),
                token_sequence: None,
                isl: 12,
                overlap: 0,
                expected_output_tokens: None,
                worker: WorkerWithDpRank::from_worker_id(0),
                lora_name: None,
            })
377
378
379
            .await?;

        seq_manager_1
380
381
382
383
384
385
386
387
388
            .add_request(SequenceRequest {
                request_id: "request_1".to_string(),
                token_sequence: None,
                isl: 8,
                overlap: 0,
                expected_output_tokens: None,
                worker: WorkerWithDpRank::from_worker_id(1),
                lora_name: None,
            })
389
390
391
            .await?;

        seq_manager_2
392
393
394
395
396
397
398
399
400
            .add_request(SequenceRequest {
                request_id: "request_2".to_string(),
                token_sequence: None,
                isl: 16,
                overlap: 0,
                expected_output_tokens: None,
                worker: WorkerWithDpRank::from_worker_id(2),
                lora_name: None,
            })
401
402
403
404
            .await?;

        tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;

405
        let tokens_phase1 = seq_manager_1.active_tokens();
406

Yan Ru Pei's avatar
Yan Ru Pei committed
407
408
409
410
        let worker_0 = WorkerWithDpRank::from_worker_id(0);
        let worker_1 = WorkerWithDpRank::from_worker_id(1);
        let worker_2 = WorkerWithDpRank::from_worker_id(2);

411
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
412
            tokens_phase1[&worker_0], 12,
413
            "Worker 0 should have 12 active tokens"
414
415
        );
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
416
417
418
419
420
            tokens_phase1[&worker_1], 8,
            "Worker 1 should have 8 active tokens"
        );
        assert_eq!(
            tokens_phase1[&worker_2], 16,
421
            "Worker 2 should have 16 active tokens (from request_2 added by seq_manager_2)"
422
        );
423

424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
        seq_manager_1
            .mark_prefill_completed(&"request_2".to_string())
            .await?;
        seq_manager_1.free(&"request_2".to_string()).await?;

        seq_manager_2
            .mark_prefill_completed(&"request_0".to_string())
            .await?;
        seq_manager_2
            .mark_prefill_completed(&"request_1".to_string())
            .await?;
        seq_manager_2.free(&"request_0".to_string()).await?;
        seq_manager_2.free(&"request_1".to_string()).await?;

        tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;

440
        let tokens_phase2 = seq_manager_2.active_tokens();
441
442

        for worker_id in 0..=2 {
Yan Ru Pei's avatar
Yan Ru Pei committed
443
            let worker = WorkerWithDpRank::from_worker_id(worker_id);
444
            assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
445
                tokens_phase2[&worker], 0,
446
447
448
449
450
                "Worker {} should have 0 active tokens after all requests freed",
                worker_id
            );
        }

451
        Ok(())
452
453
    }
}