"tests/fault_tolerance/vscode:/vscode.git/clone" did not exist on "031dc589700824597bbcb44153452abf85727cc7"
sequence.rs 49.9 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// SPDX-License-Identifier: Apache-2.0

//! KV Cache Sequence Management for LLM Inference
//!
//! This module provides efficient management of token sequences and their associated KV cache blocks
//! for distributed LLM inference. It implements a shared block system where multiple requests can
//! reuse the same KV cache blocks for common token prefixes, significantly reducing memory usage.
//!
//! # Key Components
//!
//! - [`ActiveSequences`]: Single-threaded sequence manager that tracks active requests and their
//!   token sequences, managing shared KV cache blocks efficiently.
//!
//! - [`ActiveSequencesMultiWorker`]: Multi-threaded extension that distributes sequence management
//!   across multiple worker threads, enabling parallel processing of requests while maintaining
//!   consistency.
//!
//! # Architecture
//!
//! The system uses a block-based approach where token sequences are divided into fixed-size blocks.
//! Each block is identified by a hash of its contents, allowing for deduplication when multiple
//! requests share common prefixes (e.g., system prompts, few-shot examples).

25
use crate::kv_router::indexer::OverlapScores;
26
use crate::tokens::SequenceHash;
27
28
use anyhow::Result;
use dashmap::DashMap;
29
use derive_getters::Getters;
30
31
use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider;
32
use dynamo_runtime::traits::events::{EventPublisher, EventSubscriber};
33
use futures::StreamExt;
34
use std::collections::{HashMap, HashSet};
35
use std::rc::{Rc, Weak};
36
use std::sync::Arc;
37
38
use std::time::Duration;
use tokio::time::Instant;
39
40
use uuid::Uuid;

41
42
43
44
use super::protocols::{
    ActiveLoad, ActiveSequenceEvent, ActiveSequenceEventData, WorkerWithDpRank,
};
use crate::kv_router::{ACTIVE_SEQUENCES_SUBJECT, KV_METRICS_SUBJECT};
Yan Ru Pei's avatar
Yan Ru Pei committed
45
use crate::local_model::runtime_config::ModelRuntimeConfig;
46
use dynamo_runtime::CancellationToken;
47

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/// Errors that can occur during sequence management operations
#[derive(Debug, thiserror::Error)]
pub enum SequenceError {
    #[error("Worker {worker:?} not found")]
    WorkerNotFound { worker: WorkerWithDpRank },

    #[error("Request {request_id} already exists (assigned to worker {worker:?})")]
    DuplicateRequest {
        request_id: String,
        worker: WorkerWithDpRank,
    },

    #[error("Request {request_id} not found")]
    RequestNotFound { request_id: String },

    #[error("Failed to publish event: {0}")]
    PublishFailed(#[from] anyhow::Error),

    #[error("Failed to send command to worker: channel closed")]
    WorkerChannelClosed,
}

70
71
72
/// Duration after which stale requests are forcibly expired (5 minutes)
const EXPIRY_DURATION: Duration = Duration::from_secs(300);

73
74
75
76
77
78
// TODO: use the common request_id if it exists in the repo
pub type RequestId = String;

/// A multi-request sequence manager that handles multiple active sequences with shared KV cache
#[derive(Debug, Getters)]
pub struct ActiveSequences {
79
    active_seqs: HashMap<RequestId, Vec<(SequenceHash, Rc<()>)>>,
80

81
82
    prefill_tokens: HashMap<RequestId, usize>,

83
    unique_blocks: HashMap<SequenceHash, Weak<()>>,
84
85
86
87

    #[getter(copy)]
    block_size: usize,

88
89
    #[getter(copy)]
    active_tokens: usize,
90
91
92
93
94
95

    /// Timer for when to force expiry of stale requests
    expiry_timer: Instant,

    /// Set of request IDs to check for expiry
    expiry_requests: HashSet<RequestId>,
96
97
98
99
100
101
102
103
104
105
}

impl ActiveSequences {
    /// Create a new SharedSequenceManager instance
    pub fn new(block_size: usize) -> Self {
        // TODO: make this not a hard req
        assert!(block_size > 1, "block_size must be greater than 1");

        Self {
            active_seqs: HashMap::new(),
106
            prefill_tokens: HashMap::new(),
107
108
            unique_blocks: HashMap::new(),
            block_size,
109
            active_tokens: 0,
110
111
            expiry_timer: Instant::now() + EXPIRY_DURATION,
            expiry_requests: HashSet::new(),
112
113
114
        }
    }

115
116
117
118
119
    fn touch_block(&mut self, block: &SequenceHash) -> Rc<()> {
        if let Some(weak) = self.unique_blocks.get(block)
            && let Some(rc) = weak.upgrade()
        {
            return rc;
120
121
        }

122
123
124
125
        let rc = Rc::new(());
        self.unique_blocks.insert(*block, Rc::downgrade(&rc));
        rc
    }
126

127
128
129
130
    fn try_remove_block(&mut self, block: &SequenceHash) {
        if let Some(weak) = self.unique_blocks.get(block)
            && weak.strong_count() == 0
        {
131
132
133
134
            self.unique_blocks.remove(block);
        }
    }

135
136
137
138
    pub fn active_blocks(&self) -> usize {
        self.unique_blocks.len()
    }

139
    /// Add a new request with its initial tokens
140
    /// Returns the set of expired request IDs that were removed during cleanup
141
142
143
    pub fn add_request(
        &mut self,
        request_id: RequestId,
144
        token_sequence: Option<Vec<SequenceHash>>,
145
        isl: usize,
146
        overlap: u32,
147
    ) -> HashSet<RequestId> {
148
        // Check for double-add and log error, returning early
149
        if self.active_seqs.contains_key(&request_id) {
150
151
            tracing::error!("Request {request_id} is already active. Ignoring duplicate add.");
            return HashSet::new();
152
153
        }

154
155
156
        // Lazily check and clean up expired requests, capturing removed IDs
        let removed_requests = self.force_expiry();

157
        let prefill_tokens = self.new_tokens(isl, overlap);
158
159
160
161
        self.prefill_tokens
            .insert(request_id.clone(), prefill_tokens);
        self.active_tokens += prefill_tokens;

162
        if let Some(sequence) = token_sequence {
163
164
165
166
167
168
            let sequence_with_refs: Vec<(SequenceHash, Rc<()>)> = sequence
                .iter()
                .map(|block| (*block, self.touch_block(block)))
                .collect();
            self.active_seqs
                .insert(request_id.clone(), sequence_with_refs);
169
170
171
        } else {
            // dummy empty sequence
            self.active_seqs.insert(request_id.clone(), Vec::new());
172
173
        }

174
        removed_requests
175
176
    }

177
178
179
180
181
    /// Mark prefill as completed for a request, removing it from prefill_tokens tracking
    pub fn mark_prefill_completed(&mut self, request_id: &RequestId) {
        if let Some(tokens) = self.prefill_tokens.remove(request_id) {
            self.active_tokens = self
                .active_tokens
182
                .checked_sub(tokens)
183
184
185
186
187
                .expect("active_tokens underflow");
        }
    }

    pub fn new_tokens(&self, isl: usize, overlap: u32) -> usize {
188
189
190
191
192
193
194
195
196
        let cached_tokens = (overlap as usize) * self.block_size;
        isl.checked_sub(cached_tokens)
            .unwrap_or_else(|| {
                tracing::error!(
                    "prefill_tokens < 0 with ISL {isl} < cached_tokens {cached_tokens} (overlap {overlap} * block_size {}), returning 0",
                    self.block_size
                );
                0
            })
197
198
199
200
    }

    pub fn potential_blocks_and_tokens(
        &self,
201
        token_sequence: Option<&[SequenceHash]>,
202
        isl: usize,
203
204
        overlap: u32,
    ) -> (usize, usize) {
205
        let potential_blocks = if let Some(token_seq) = token_sequence {
206
            self.new_blocks(token_seq) + self.active_blocks()
207
        } else {
208
            self.active_blocks()
209
        };
210
        let potential_tokens = self.new_tokens(isl, overlap) + self.active_tokens;
211
212
213
        (potential_blocks, potential_tokens)
    }

214
    /// Match a request against existing blocks and return the number of new blocks that would be added
215
216
    pub fn new_blocks(&self, token_sequence: &[SequenceHash]) -> usize {
        token_sequence
217
218
219
220
221
222
223
            .iter()
            .filter(|block| !self.unique_blocks.contains_key(block))
            .count()
    }

    /// Return the total number of blocks that would be used if the token sequence was added
    /// This is the sum of new blocks that would be added plus the current active blocks
224
    pub fn potential_blocks(&self, token_sequence: &[SequenceHash]) -> usize {
225
        self.new_blocks(token_sequence) + self.active_blocks()
226
227
228
229
    }

    /// Free all blocks associated with a request
    pub fn free(&mut self, request_id: &RequestId) -> usize {
230
        self.mark_prefill_completed(request_id);
231

232
233
        self.expiry_requests.remove(request_id);

234
235
236
237
238
        // Remove from active_seqs and get the token sequence
        let token_seq = match self.active_seqs.remove(request_id) {
            Some(seq) => seq,
            None => {
                tracing::warn!("Trying to free non-existent request {request_id}");
239
                return self.active_blocks();
240
            }
241
242
        };

243
244
245
246
        // Drop each Rc reference, then clean up the corresponding weak reference
        for (block_hash, rc) in token_seq {
            drop(rc);
            self.try_remove_block(&block_hash);
247
248
        }

249
        self.active_blocks()
250
    }
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273

    /// Force expiry of stale requests if the timer has elapsed
    /// Returns the set of expired request IDs that were removed
    pub fn force_expiry(&mut self) -> HashSet<RequestId> {
        let now = Instant::now();

        // Early return if timer hasn't expired yet
        if now < self.expiry_timer {
            return HashSet::new();
        }

        // Process expired requests - drain to avoid clone
        let expired_requests: HashSet<RequestId> = self.expiry_requests.drain().collect();
        for request_id in &expired_requests {
            tracing::warn!("Force expiring stale request: {}", request_id);
            self.free(request_id);
        }

        self.expiry_timer = now + EXPIRY_DURATION;
        self.expiry_requests = self.active_seqs.keys().cloned().collect();

        expired_requests
    }
274
275
276
277
278
}

enum UpdateSequences {
    AddRequest {
        request_id: RequestId,
279
        token_sequence: Option<Vec<SequenceHash>>,
280
        isl: usize,
281
        overlap: u32,
282
        resp_tx: tokio::sync::oneshot::Sender<HashSet<RequestId>>,
283
284
285
286
    },
    Free {
        request_id: RequestId,
    },
287
    MarkPrefillCompleted {
288
289
290
        request_id: RequestId,
    },
    NewBlocks {
291
        token_sequence: Arc<Vec<SequenceHash>>,
292
        resp_tx: tokio::sync::oneshot::Sender<usize>,
293
294
    },
    PotentialBlocks {
295
        token_sequence: Arc<Vec<SequenceHash>>,
296
        resp_tx: tokio::sync::oneshot::Sender<usize>,
297
    },
298
    PotentialBlocksAndTokens {
299
        token_sequence: Option<Arc<Vec<SequenceHash>>>,
300
        isl: usize,
301
        overlap: u32,
302
        resp_tx: tokio::sync::oneshot::Sender<(usize, usize)>,
303
    },
304
    ActiveBlocks {
305
        resp_tx: tokio::sync::oneshot::Sender<usize>,
306
    },
307
    ActiveTokens {
308
        resp_tx: tokio::sync::oneshot::Sender<usize>,
309
    },
310
311
312
313
314
    Shutdown,
}

/// Multi-worker extension of ActiveSequences that distributes requests across multiple threads
pub struct ActiveSequencesMultiWorker {
Yan Ru Pei's avatar
Yan Ru Pei committed
315
316
317
    senders: Arc<DashMap<WorkerWithDpRank, tokio::sync::mpsc::UnboundedSender<UpdateSequences>>>,
    request_to_worker: Arc<DashMap<RequestId, WorkerWithDpRank>>,
    handles: Arc<DashMap<WorkerWithDpRank, std::thread::JoinHandle<()>>>,
318
    block_size: usize,
319
320
321
    component: Component,
    router_id: Uuid,
    replica_sync: bool,
322
323
324
}

impl ActiveSequencesMultiWorker {
325
326
327
    pub fn new(
        component: Component,
        block_size: usize,
328
        workers_with_configs: HashMap<u64, Option<ModelRuntimeConfig>>,
329
        replica_sync: bool,
330
        router_uuid: String,
331
    ) -> Self {
332
333
        assert!(block_size > 1, "block_size must be greater than 1");

334
335
336
        let senders = Arc::new(DashMap::new());
        let handles = Arc::new(DashMap::new());
        let request_to_worker = Arc::new(DashMap::new());
337
338
339
340
341
342
343
344
        let router_id = Uuid::parse_str(&router_uuid).unwrap_or_else(|e| {
            tracing::warn!(
                "Failed to parse router UUID '{}': {}, using new UUID",
                router_uuid,
                e
            );
            Uuid::new_v4()
        });
345

Yan Ru Pei's avatar
Yan Ru Pei committed
346
347
348
349
350
351
352
353
354
355
356
357
        // Expand workers by their dp_rank
        for (worker_id, config) in workers_with_configs {
            let dp_size = config.as_ref().map(|c| c.data_parallel_size).unwrap_or(1);

            for dp_rank in 0..dp_size {
                let worker = WorkerWithDpRank::new(worker_id, dp_rank);
                // Create a child cancellation token from the component's runtime
                let cancel_token = component.drt().runtime().child_token();
                let (sender, handle) = Self::start_worker(block_size, cancel_token);
                senders.insert(worker, sender);
                handles.insert(worker, handle);
            }
358
359
        }

360
361
362
        let multi_worker = Self {
            senders: senders.clone(),
            request_to_worker: request_to_worker.clone(),
363
364
            handles,
            block_size,
365
366
367
368
369
370
371
372
373
374
375
            component: component.clone(),
            router_id,
            replica_sync,
        };

        // Start the subscription loop only if replica_sync is enabled
        if replica_sync {
            let senders_clone = senders.clone();
            let request_to_worker_clone = request_to_worker.clone();
            let component_clone = component.clone();
            let router_id_clone = router_id;
376
            let cancel_token = component.drt().runtime().child_token();
377

378
            tokio::spawn(async move {
379
                // NATS subscription loop
380
381
382
383
384
                if let Err(e) = Self::subscribe_to_events(
                    senders_clone,
                    request_to_worker_clone,
                    component_clone,
                    router_id_clone,
385
                    cancel_token,
386
387
388
389
390
391
                )
                .await
                {
                    tracing::error!("Error in active sequences events subscription: {}", e);
                }
            });
392
        }
393
394

        multi_worker
395
396
    }

397
398
399
    /// Helper method to start a worker task
    fn start_worker(
        block_size: usize,
400
        cancel_token: CancellationToken,
401
402
    ) -> (
        tokio::sync::mpsc::UnboundedSender<UpdateSequences>,
403
        std::thread::JoinHandle<()>,
404
    ) {
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
        let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();

        let handle = std::thread::spawn(move || {
            // Create a single-threaded tokio runtime
            let runtime = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap();

            runtime.block_on(async move {
                let mut active_sequences = ActiveSequences::new(block_size);
                let mut request_rx = request_rx;

                loop {
                    tokio::select! {
                        command = request_rx.recv() => {
                            let Some(command) = command else {
                                break;
                            };

                            match command {
                                UpdateSequences::AddRequest {
                                    request_id,
                                    token_sequence,
                                    isl,
                                    overlap,
                                    resp_tx,
                                } => {
                                    let removed = active_sequences.add_request(request_id, token_sequence, isl, overlap);
                                    let _ = resp_tx.send(removed);
                                }
                                UpdateSequences::Free { request_id } => {
                                    active_sequences.free(&request_id);
                                }
                                UpdateSequences::MarkPrefillCompleted { request_id } => {
                                    active_sequences.mark_prefill_completed(&request_id);
                                }
                                UpdateSequences::NewBlocks {
                                    token_sequence,
                                    resp_tx,
                                } => {
                                    let new_blocks = active_sequences.new_blocks(&token_sequence);
                                    let _ = resp_tx.send(new_blocks);
                                }
                                UpdateSequences::PotentialBlocks {
                                    token_sequence,
                                    resp_tx,
                                } => {
                                    let potential_blocks = active_sequences.potential_blocks(&token_sequence);
                                    let _ = resp_tx.send(potential_blocks);
                                }
                                UpdateSequences::PotentialBlocksAndTokens {
                                    token_sequence,
                                    isl,
                                    overlap,
                                    resp_tx,
                                } => {
                                    let potential_tokens = active_sequences.potential_blocks_and_tokens(
                                        token_sequence.as_ref().map(|v| v.as_slice()),
464
465
                                        isl,
                                        overlap,
466
467
468
469
470
471
472
473
474
475
476
477
478
                                    );
                                    let _ = resp_tx.send(potential_tokens);
                                }
                                UpdateSequences::ActiveBlocks { resp_tx } => {
                                    let active_blocks = active_sequences.active_blocks();
                                    let _ = resp_tx.send(active_blocks);
                                }
                                UpdateSequences::ActiveTokens { resp_tx } => {
                                    let active_tokens = active_sequences.active_tokens();
                                    let _ = resp_tx.send(active_tokens);
                                }
                                UpdateSequences::Shutdown => {
                                    break;
479
480
481
                                }
                            }
                        }
482
483
484
485
486
                        // Handle cancellation
                        _ = cancel_token.cancelled() => {
                            tracing::debug!("Worker task cancelled");
                            break;
                        }
487
                    }
488
                }
489
490
491
            });

            tracing::debug!("ActiveSequences worker task completed");
492
493
494
495
496
497
498
        });

        (request_tx, handle)
    }

    /// Background task to subscribe to active sequence events and update all workers
    async fn subscribe_to_events(
Yan Ru Pei's avatar
Yan Ru Pei committed
499
500
501
502
        senders: Arc<
            DashMap<WorkerWithDpRank, tokio::sync::mpsc::UnboundedSender<UpdateSequences>>,
        >,
        request_to_worker: Arc<DashMap<RequestId, WorkerWithDpRank>>,
503
504
        component: Component,
        router_id: Uuid,
505
        cancel_token: CancellationToken,
506
507
508
509
510
    ) -> Result<()> {
        let mut subscriber = component
            .subscribe_with_type::<ActiveSequenceEvent>(ACTIVE_SEQUENCES_SUBJECT)
            .await?;

511
512
513
514
515
516
517
518
        loop {
            tokio::select! {
                // Handle incoming events
                result = subscriber.next() => {
                    let Some(result) = result else {
                        // Stream ended
                        break;
                    };
519

520
521
522
523
                    let Ok(event) = result else {
                        tracing::error!(
                            "Error receiving active sequence event: {}",
                            result.unwrap_err()
524
                        );
525
526
527
528
529
530
                        continue;
                    };

                    // Skip events emitted by itself
                    if event.router_id == router_id {
                        continue;
531
                    }
532
533
534
535
536
537
538

                    match &event.data {
                        ActiveSequenceEventData::AddRequest {
                            token_sequence,
                            isl,
                            overlap,
                        } => {
Yan Ru Pei's avatar
Yan Ru Pei committed
539
                            request_to_worker.insert(event.request_id.clone(), event.worker);
540

Yan Ru Pei's avatar
Yan Ru Pei committed
541
                            if let Some(sender) = senders.get(&event.worker) {
542
543
544
545
546
547
548
549
550
551
552
                                // For replicated events, we create a dummy response channel since we don't need to handle expired requests
                                let (resp_tx, _) = tokio::sync::oneshot::channel();
                                let _ = sender.send(UpdateSequences::AddRequest {
                                    request_id: event.request_id.clone(),
                                    token_sequence: token_sequence.clone(),
                                    isl: *isl,
                                    overlap: *overlap,
                                    resp_tx,
                                });
                            } else {
                                tracing::warn!(
Yan Ru Pei's avatar
Yan Ru Pei committed
553
554
                                    "Worker {:?} not found, cannot process AddRequest",
                                    event.worker
555
556
557
558
                                );
                            }
                        }
                        ActiveSequenceEventData::Free => {
Yan Ru Pei's avatar
Yan Ru Pei committed
559
560
                            if let Some((_, worker)) = request_to_worker.remove(&event.request_id)
                                && let Some(sender) = senders.get(&worker)
561
562
563
564
565
566
567
                            {
                                let _ = sender.send(UpdateSequences::Free {
                                    request_id: event.request_id.clone(),
                                });
                            }
                        }
                        ActiveSequenceEventData::MarkPrefillCompleted => {
Yan Ru Pei's avatar
Yan Ru Pei committed
568
569
                            if let Some(worker) = request_to_worker.get(&event.request_id)
                                && let Some(sender) = senders.get(&*worker)
570
571
572
573
574
575
                            {
                                let _ = sender.send(UpdateSequences::MarkPrefillCompleted {
                                    request_id: event.request_id.clone(),
                                });
                            }
                        }
576
                    }
577
                }
578
579
580
581
                // Handle cancellation
                _ = cancel_token.cancelled() => {
                    tracing::debug!("Subscription task cancelled");
                    break;
582
583
                }
            }
584
        }
585

586
        Ok(())
587
588
589
    }

    /// Update the set of workers, adding and removing as needed
Yan Ru Pei's avatar
Yan Ru Pei committed
590
591
    pub fn update_workers(
        &self,
592
        new_workers_with_configs: HashMap<u64, Option<ModelRuntimeConfig>>,
Yan Ru Pei's avatar
Yan Ru Pei committed
593
594
    ) {
        let current_workers: HashSet<WorkerWithDpRank> =
595
            self.senders.iter().map(|entry| *entry.key()).collect();
596

Yan Ru Pei's avatar
Yan Ru Pei committed
597
598
599
600
601
602
603
604
605
606
607
        // Expand new workers by their dp_rank
        let mut new_workers: HashSet<WorkerWithDpRank> = HashSet::new();
        for (worker_id, config) in &new_workers_with_configs {
            let dp_size = config.as_ref().map(|c| c.data_parallel_size).unwrap_or(1);

            for dp_rank in 0..dp_size {
                new_workers.insert(WorkerWithDpRank::new(*worker_id, dp_rank));
            }
        }

        let workers_to_remove: Vec<WorkerWithDpRank> =
608
            current_workers.difference(&new_workers).copied().collect();
Yan Ru Pei's avatar
Yan Ru Pei committed
609
        let workers_to_add: Vec<WorkerWithDpRank> =
610
611
            new_workers.difference(&current_workers).copied().collect();

Yan Ru Pei's avatar
Yan Ru Pei committed
612
613
614
        // Remove workers (this will naturally remove all dp ranks for a worker_id)
        for worker in &workers_to_remove {
            tracing::warn!("Removing worker {:?}", worker);
615
616

            // Send shutdown command to the worker
Yan Ru Pei's avatar
Yan Ru Pei committed
617
            if let Some((_, sender)) = self.senders.remove(worker) {
618
619
                let _ = sender.send(UpdateSequences::Shutdown);
            }
Yan Ru Pei's avatar
Yan Ru Pei committed
620
            self.handles.remove(worker);
621
622
623

            // Clean up request_to_worker mappings for this worker
            self.request_to_worker
Yan Ru Pei's avatar
Yan Ru Pei committed
624
                .retain(|_request_id, mapped_worker| mapped_worker != worker);
625
626
627
        }

        // Add new workers
Yan Ru Pei's avatar
Yan Ru Pei committed
628
629
        for worker in &workers_to_add {
            tracing::warn!("Adding worker {:?}", worker);
630

631
632
633
634
            let (sender, handle) = Self::start_worker(
                self.block_size,
                self.component.drt().runtime().child_token(),
            );
Yan Ru Pei's avatar
Yan Ru Pei committed
635
636
            self.senders.insert(*worker, sender);
            self.handles.insert(*worker, handle);
637
638
639
        }
    }

640
641
    pub async fn add_request(
        &self,
642
        request_id: RequestId,
643
        token_sequence: Option<Vec<SequenceHash>>,
644
        isl: usize,
645
        overlap: u32,
Yan Ru Pei's avatar
Yan Ru Pei committed
646
        worker: WorkerWithDpRank,
647
648
    ) -> Result<(), SequenceError> {
        // Check for worker existence
Yan Ru Pei's avatar
Yan Ru Pei committed
649
        if !self.senders.contains_key(&worker) {
650
651
652
653
654
655
656
657
658
            return Err(SequenceError::WorkerNotFound { worker });
        }

        // Check for duplicate request
        if let Some(existing_worker) = self.request_to_worker.get(&request_id) {
            return Err(SequenceError::DuplicateRequest {
                request_id,
                worker: *existing_worker,
            });
659
660
        }

661
662
663
        // Create response channel
        let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();

664
665
666
667
        // Publish event only if replica_sync is enabled
        if self.replica_sync {
            let event = ActiveSequenceEvent {
                request_id: request_id.clone(),
Yan Ru Pei's avatar
Yan Ru Pei committed
668
                worker,
669
670
671
672
673
674
675
676
677
678
                data: ActiveSequenceEventData::AddRequest {
                    token_sequence: token_sequence.clone(),
                    isl,
                    overlap,
                },
                router_id: self.router_id,
            };
            self.component
                .publish(ACTIVE_SEQUENCES_SUBJECT, &event)
                .await?;
679
680
        }

Yan Ru Pei's avatar
Yan Ru Pei committed
681
682
        // Update local state with full WorkerWithDpRank
        self.request_to_worker.insert(request_id.clone(), worker);
683

684
        self.senders
Yan Ru Pei's avatar
Yan Ru Pei committed
685
            .get(&worker)
686
            .unwrap()
687
688
689
            .send(UpdateSequences::AddRequest {
                request_id,
                token_sequence,
690
                isl,
691
                overlap,
692
                resp_tx,
693
            })
694
            .map_err(|_| SequenceError::WorkerChannelClosed)?;
695

696
697
698
        // Wait for response and handle removed requests
        let removed_requests = resp_rx
            .await
699
            .map_err(|_| SequenceError::WorkerChannelClosed)?;
700
701
702
703
704
705

        // Remove expired requests from request_to_worker mapping
        for expired_id in &removed_requests {
            self.request_to_worker.remove(expired_id);
        }

706
707
708
        // Publish ActiveLoad metrics for this worker
        self.publish_active_load_for_worker(worker).await;

709
        Ok(())
710
711
    }

712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
    /// Free all blocks associated with a request
    ///
    /// Note: This operation is idempotent. Calling it multiple times for the same request
    /// will log a warning but not return an error (double free is allowed).
    pub async fn free(&self, request_id: &RequestId) -> Result<(), SequenceError> {
        // Check if request exists - if not, it's already been freed (idempotent)
        let Some(worker) = self.request_to_worker.get(request_id).map(|entry| *entry) else {
            tracing::debug!("Request {request_id} not found, already freed (idempotent)");
            return Ok(());
        };

        // Verify worker still exists
        if !self.senders.contains_key(&worker) {
            return Err(SequenceError::WorkerNotFound { worker });
        }
727

728
729
730
731
        // Publish event only if replica_sync is enabled
        if self.replica_sync {
            let event = ActiveSequenceEvent {
                request_id: request_id.clone(),
Yan Ru Pei's avatar
Yan Ru Pei committed
732
                worker,
733
734
735
736
737
738
739
740
741
742
                data: ActiveSequenceEventData::Free,
                router_id: self.router_id,
            };
            self.component
                .publish(ACTIVE_SEQUENCES_SUBJECT, &event)
                .await?;
        }

        // Update local state
        self.senders
Yan Ru Pei's avatar
Yan Ru Pei committed
743
            .get(&worker)
744
            .unwrap()
745
746
747
            .send(UpdateSequences::Free {
                request_id: request_id.clone(),
            })
748
            .map_err(|_| SequenceError::WorkerChannelClosed)?;
749
750

        self.request_to_worker.remove(request_id);
751

752
753
754
        // Publish ActiveLoad metrics for this worker
        self.publish_active_load_for_worker(worker).await;

755
        Ok(())
756
757
    }

758
    /// Mark prefill as completed for a request
759
760
761
762
763
764
765
    ///
    /// Note: Calling this multiple times for the same request is allowed and will be a no-op
    /// after the first call (idempotent).
    pub async fn mark_prefill_completed(
        &self,
        request_id: &RequestId,
    ) -> Result<(), SequenceError> {
Yan Ru Pei's avatar
Yan Ru Pei committed
766
        let worker = self
767
768
            .request_to_worker
            .get(request_id)
769
            .map(|entry| *entry)
770
771
772
773
774
775
776
777
            .ok_or_else(|| SequenceError::RequestNotFound {
                request_id: request_id.clone(),
            })?;

        // Verify worker still exists
        if !self.senders.contains_key(&worker) {
            return Err(SequenceError::WorkerNotFound { worker });
        }
778
779
780
781
782

        // Publish event only if replica_sync is enabled
        if self.replica_sync {
            let event = ActiveSequenceEvent {
                request_id: request_id.clone(),
Yan Ru Pei's avatar
Yan Ru Pei committed
783
                worker,
784
785
786
787
788
789
790
                data: ActiveSequenceEventData::MarkPrefillCompleted,
                router_id: self.router_id,
            };
            self.component
                .publish(ACTIVE_SEQUENCES_SUBJECT, &event)
                .await?;
        }
791

792
793
        // Update local state
        self.senders
Yan Ru Pei's avatar
Yan Ru Pei committed
794
            .get(&worker)
795
            .unwrap()
796
            .send(UpdateSequences::MarkPrefillCompleted {
797
798
                request_id: request_id.clone(),
            })
799
            .map_err(|_| SequenceError::WorkerChannelClosed)?;
800

801
802
803
        // Publish ActiveLoad metrics for this worker
        self.publish_active_load_for_worker(worker).await;

804
        Ok(())
805
806
    }

807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
    /// Helper method to query a single worker for active blocks/tokens and publish ActiveLoad
    async fn publish_active_load_for_worker(&self, worker: WorkerWithDpRank) {
        let Some(sender) = self.senders.get(&worker) else {
            tracing::warn!("Worker {worker:?} not found when publishing ActiveLoad");
            return;
        };

        // Query active blocks
        let (blocks_tx, blocks_rx) = tokio::sync::oneshot::channel();
        if sender
            .send(UpdateSequences::ActiveBlocks { resp_tx: blocks_tx })
            .is_err()
        {
            tracing::warn!("Failed to send ActiveBlocks query to worker {worker:?}");
            return;
        }

        // Query active tokens
        let (tokens_tx, tokens_rx) = tokio::sync::oneshot::channel();
        if sender
            .send(UpdateSequences::ActiveTokens { resp_tx: tokens_tx })
            .is_err()
        {
            tracing::warn!("Failed to send ActiveTokens query to worker {worker:?}");
            return;
        }

        // Await both responses
        let (active_blocks, active_tokens) = match tokio::join!(blocks_rx, tokens_rx) {
            (Ok(blocks), Ok(tokens)) => (blocks, tokens),
            _ => {
                tracing::warn!("Failed to receive active blocks/tokens from worker {worker:?}");
                return;
            }
        };

        // Publish ActiveLoad
        let active_load = ActiveLoad {
            worker_id: worker.worker_id,
            dp_rank: worker.dp_rank,
            active_decode_blocks: Some(active_blocks as u64),
            active_prefill_tokens: Some(active_tokens as u64),
        };

        if let Err(e) = self
            .component
            .namespace()
            .publish(KV_METRICS_SUBJECT, &active_load)
            .await
        {
            tracing::warn!("Failed to publish ActiveLoad for worker {worker:?}: {e:?}");
        }
    }

861
862
863
864
865
866
    /// Get the number of workers
    pub fn num_workers(&self) -> usize {
        self.senders.len()
    }

    /// Generic method to query all workers with a given command
867
    async fn query_workers<T: Send + 'static>(
868
        &self,
869
        token_sequence: Option<Vec<SequenceHash>>,
870
871
872
873
        command_fn: impl Fn(
            Option<Arc<Vec<SequenceHash>>>,
            tokio::sync::oneshot::Sender<T>,
        ) -> UpdateSequences,
Yan Ru Pei's avatar
Yan Ru Pei committed
874
    ) -> HashMap<WorkerWithDpRank, T> {
875
876
877
878
879
        let mut results = HashMap::new();
        let token_sequence_shared = token_sequence.map(Arc::new);
        let mut receivers = Vec::new();

        // Send queries to all workers in parallel
880
        for entry in self.senders.iter() {
Yan Ru Pei's avatar
Yan Ru Pei committed
881
            let worker = *entry.key();
882
883
            let sender = entry.value();
            let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
Yan Ru Pei's avatar
Yan Ru Pei committed
884
            receivers.push((worker, resp_rx));
885
            if let Err(e) = sender.send(command_fn(token_sequence_shared.clone(), resp_tx)) {
Yan Ru Pei's avatar
Yan Ru Pei committed
886
                tracing::error!("Failed to send command to worker {:?}: {}", worker, e);
887
            }
888
889
890
        }

        // Collect results from all workers
Yan Ru Pei's avatar
Yan Ru Pei committed
891
        for (worker, receiver) in receivers {
892
893
            match tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver).await {
                Ok(Ok(result)) => {
Yan Ru Pei's avatar
Yan Ru Pei committed
894
                    results.insert(worker, result);
895
896
                }
                Ok(Err(_)) => {
Yan Ru Pei's avatar
Yan Ru Pei committed
897
                    tracing::error!("Worker {:?} dropped response channel", worker);
898
899
                }
                Err(_) => {
Yan Ru Pei's avatar
Yan Ru Pei committed
900
                    tracing::error!("Timeout waiting for response from worker {:?}", worker);
901
902
                }
            }
903
904
905
906
907
908
        }

        results
    }

    /// Query all workers for the number of new blocks that would be added by a token sequence
Yan Ru Pei's avatar
Yan Ru Pei committed
909
910
911
912
    pub async fn new_blocks(
        &self,
        token_sequence: Vec<SequenceHash>,
    ) -> HashMap<WorkerWithDpRank, usize> {
913
914
915
916
917
918
919
        self.query_workers(Some(token_sequence), |ts, resp_tx| match ts {
            Some(ts) => UpdateSequences::NewBlocks {
                token_sequence: ts,
                resp_tx,
            },
            None => unreachable!("token_sequence should always be Some for new_blocks"),
        })
920
        .await
921
922
923
    }

    /// Query all workers for the total number of blocks (new + active) that would be used by a token sequence
924
925
926
    pub async fn potential_blocks(
        &self,
        token_sequence: Vec<SequenceHash>,
Yan Ru Pei's avatar
Yan Ru Pei committed
927
    ) -> HashMap<WorkerWithDpRank, usize> {
928
929
930
931
932
933
934
        self.query_workers(Some(token_sequence), |ts, resp_tx| match ts {
            Some(ts) => UpdateSequences::PotentialBlocks {
                token_sequence: ts,
                resp_tx,
            },
            None => unreachable!("token_sequence should always be Some for potential_blocks"),
        })
935
        .await
936
937
    }

938
    /// Query all workers for the potential tokens (new + active) that would be used by a token sequence with overlap
939
    pub async fn potential_blocks_and_tokens(
940
        &self,
941
        token_sequence: Option<Vec<SequenceHash>>,
942
        isl: usize,
943
        overlaps: OverlapScores,
Yan Ru Pei's avatar
Yan Ru Pei committed
944
945
946
947
    ) -> (
        HashMap<WorkerWithDpRank, usize>,
        HashMap<WorkerWithDpRank, usize>,
    ) {
948
949
        let mut potential_blocks = HashMap::new();
        let mut potential_tokens = HashMap::new();
950
        let token_sequence_shared = token_sequence.map(Arc::new);
951
952
        let mut receivers = Vec::new();

953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
        // Iterate through all workers, not just those with overlap
        // This ensures we properly account for active tokens/blocks on all workers
        for sender_entry in self.senders.iter() {
            let worker = *sender_entry.key();
            let sender = sender_entry.value();

            // Get overlap for this worker (defaults to 0 if not in overlaps)
            let overlap = *overlaps.scores.get(&worker).unwrap_or(&0);

            let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
            receivers.push((worker, resp_rx));

            if let Err(e) = sender.send(UpdateSequences::PotentialBlocksAndTokens {
                token_sequence: token_sequence_shared.clone(),
                isl,
                overlap,
                resp_tx,
            }) {
                tracing::error!(
                    "Failed to send potential_tokens command to worker {:?}: {}",
                    worker,
                    e
                );
976
            }
977
978
979
        }

        // Collect results from all workers
Yan Ru Pei's avatar
Yan Ru Pei committed
980
        for (worker, receiver) in receivers {
981
982
            match tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver).await {
                Ok(Ok((blocks, tokens))) => {
Yan Ru Pei's avatar
Yan Ru Pei committed
983
984
                    potential_blocks.insert(worker, blocks);
                    potential_tokens.insert(worker, tokens);
985
986
                }
                Ok(Err(_)) => {
Yan Ru Pei's avatar
Yan Ru Pei committed
987
                    tracing::error!("Worker {:?} dropped response channel", worker);
988
989
                }
                Err(_) => {
Yan Ru Pei's avatar
Yan Ru Pei committed
990
                    tracing::error!("Timeout waiting for response from worker {:?}", worker);
991
992
                }
            }
993
994
995
996
997
        }

        (potential_blocks, potential_tokens)
    }

998
    /// Query all workers for their current number of active blocks
Yan Ru Pei's avatar
Yan Ru Pei committed
999
    pub async fn active_blocks(&self) -> HashMap<WorkerWithDpRank, usize> {
1000
        self.query_workers(None, |_, resp_tx| UpdateSequences::ActiveBlocks { resp_tx })
1001
            .await
1002
    }
1003
1004

    /// Query all workers for their current number of active tokens
Yan Ru Pei's avatar
Yan Ru Pei committed
1005
    pub async fn active_tokens(&self) -> HashMap<WorkerWithDpRank, usize> {
1006
        self.query_workers(None, |_, resp_tx| UpdateSequences::ActiveTokens { resp_tx })
1007
            .await
1008
    }
1009
1010
1011
1012
}

impl Drop for ActiveSequencesMultiWorker {
    fn drop(&mut self) {
1013
1014
1015
        // Send shutdown to all workers
        for entry in self.senders.iter() {
            let _ = entry.value().send(UpdateSequences::Shutdown);
1016
1017
1018
1019
1020
1021
1022
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
1023
    use dynamo_runtime::{DistributedRuntime, Runtime};
1024
    use std::sync::Arc;
1025

1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
    #[test]
    fn test_active_sequences_shared_blocks() {
        let block_size = 4;
        let mut seq_manager = ActiveSequences::new(block_size);

        seq_manager.add_request("request_1".to_string(), Some(vec![1, 2, 3]), 12, 0);
        assert_eq!(seq_manager.active_blocks(), 3);
        assert_eq!(seq_manager.active_tokens(), 12);

        seq_manager.add_request("request_2".to_string(), Some(vec![4]), 4, 0);
        assert_eq!(seq_manager.active_blocks(), 4);
        assert_eq!(seq_manager.active_tokens(), 16);

        seq_manager.add_request("request_3".to_string(), Some(vec![1, 2, 3, 4]), 16, 4);
        assert_eq!(seq_manager.active_blocks(), 4);
        assert_eq!(seq_manager.active_tokens(), 16);

        seq_manager.free(&"request_2".to_string());
        assert_eq!(seq_manager.active_blocks(), 4);
        assert_eq!(seq_manager.active_tokens(), 12);

        seq_manager.free(&"request_3".to_string());
        assert_eq!(seq_manager.active_blocks(), 3);
        assert_eq!(seq_manager.active_tokens(), 12);

        seq_manager.free(&"request_1".to_string());
        assert_eq!(seq_manager.active_blocks(), 0);
        assert_eq!(seq_manager.active_tokens(), 0);
    }

1056
    #[tokio::test]
1057
    #[ignore]
1058
    async fn test_multi_worker_cross_instance_sync() -> Result<()> {
1059
1060
1061
        // Initialize logging once
        dynamo_runtime::logging::init();

1062
1063
        let block_size = 4; // arbitrary block size

1064
1065
1066
        // Create runtime and distributed runtime
        let runtime = Runtime::from_current()?;
        let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
1067

1068
1069
        // Create namespace and shared component for both seq_managers
        let namespace = distributed.namespace("test_cross_instance_sync")?;
1070
        let component = namespace.component("sequences")?;
1071

Yan Ru Pei's avatar
Yan Ru Pei committed
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
        // Create multi-worker sequence managers with:
        // - Worker 0 with dp_size=2 (dp_ranks 0 and 1)
        // - Worker 1 with dp_size=1 (dp_rank 0)
        // This gives us 3 effective workers total to test dp_rank effect
        // Both seq_managers use the same component to ensure event synchronization works
        let mut workers_with_configs = HashMap::new();

        // Create runtime config for worker 0 with dp_size=2
        let mut config_worker_0 = crate::local_model::runtime_config::ModelRuntimeConfig::new();
        config_worker_0.data_parallel_size = 2;
        workers_with_configs.insert(0, Some(config_worker_0));

        // Create runtime config for worker 1 with dp_size=1 (default)
        let config_worker_1 = crate::local_model::runtime_config::ModelRuntimeConfig::new();
        workers_with_configs.insert(1, Some(config_worker_1));

1088
1089
1090
        let seq_manager_1 = Arc::new(ActiveSequencesMultiWorker::new(
            component.clone(),
            block_size,
Yan Ru Pei's avatar
Yan Ru Pei committed
1091
            workers_with_configs.clone(),
1092
1093
1094
1095
1096
1097
            true,
            Uuid::new_v4().to_string(),
        ));
        let seq_manager_2 = Arc::new(ActiveSequencesMultiWorker::new(
            component,
            block_size,
Yan Ru Pei's avatar
Yan Ru Pei committed
1098
            workers_with_configs,
1099
1100
1101
1102
1103
1104
1105
1106
1107
            true,
            Uuid::new_v4().to_string(),
        ));

        // Give some time for the subscription loops to start
        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

        // PHASE 1: Add requests using both seq_manager_1 and seq_manager_2

Yan Ru Pei's avatar
Yan Ru Pei committed
1108
        // Add request_0 to worker 0, dp_rank 0: sequence [0, 1, 2]
1109
1110
1111
1112
1113
1114
        seq_manager_1
            .add_request(
                "request_0".to_string(),
                Some(vec![0, 1, 2]),
                12, // ISL (3 blocks * 4 block_size)
                0,  // no overlap
Yan Ru Pei's avatar
Yan Ru Pei committed
1115
                WorkerWithDpRank::new(0, 0),
1116
1117
            )
            .await?;
1118

Yan Ru Pei's avatar
Yan Ru Pei committed
1119
        // Add request_1 to worker 0, dp_rank 1: sequence [3, 4]
1120
1121
1122
1123
1124
1125
        seq_manager_1
            .add_request(
                "request_1".to_string(),
                Some(vec![3, 4]),
                8, // ISL (2 blocks * 4 block_size)
                0, // no overlap
Yan Ru Pei's avatar
Yan Ru Pei committed
1126
                WorkerWithDpRank::new(0, 1),
1127
1128
            )
            .await?;
1129

Yan Ru Pei's avatar
Yan Ru Pei committed
1130
        // Add request_2 to worker 1, dp_rank 0: sequence [0, 1, 2, 3] using seq_manager_2
1131
1132
1133
1134
1135
1136
        seq_manager_2
            .add_request(
                "request_2".to_string(),
                Some(vec![0, 1, 2, 3]),
                16, // ISL (4 blocks * 4 block_size)
                0,  // no overlap
Yan Ru Pei's avatar
Yan Ru Pei committed
1137
                WorkerWithDpRank::new(1, 0),
1138
1139
            )
            .await?;
1140

1141
1142
        // Give some time for synchronization
        tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
1143

1144
1145
1146
        // Query seq_manager_1 to verify it sees all requests including request_2 from seq_manager_2
        let blocks_phase1 = seq_manager_1.active_blocks().await;
        let tokens_phase1 = seq_manager_1.active_tokens().await;
1147

Yan Ru Pei's avatar
Yan Ru Pei committed
1148
1149
1150
1151
1152
1153
1154
1155
1156
        // Verify that seq_manager_1 sees all requests including request_2 from seq_manager_2
        // We now have:
        // - Worker 0, dp_rank 0: request_0
        // - Worker 0, dp_rank 1: request_1
        // - Worker 1, dp_rank 0: request_2
        let worker_0_dp0 = WorkerWithDpRank::new(0, 0);
        let worker_0_dp1 = WorkerWithDpRank::new(0, 1);
        let worker_1_dp0 = WorkerWithDpRank::new(1, 0);

1157
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1158
1159
            blocks_phase1[&worker_0_dp0], 3,
            "Worker 0 dp_rank 0 should have 3 active blocks (from request_0)"
1160
        );
1161
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1162
1163
            blocks_phase1[&worker_0_dp1], 2,
            "Worker 0 dp_rank 1 should have 2 active blocks (from request_1)"
1164
1165
        );
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1166
1167
            blocks_phase1[&worker_1_dp0], 4,
            "Worker 1 dp_rank 0 should have 4 active blocks (from request_2 added by seq_manager_2)"
1168
1169
        );
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1170
1171
            tokens_phase1[&worker_0_dp0], 12,
            "Worker 0 dp_rank 0 should have 12 active tokens"
1172
        );
1173
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1174
1175
1176
1177
1178
1179
            tokens_phase1[&worker_0_dp1], 8,
            "Worker 0 dp_rank 1 should have 8 active tokens"
        );
        assert_eq!(
            tokens_phase1[&worker_1_dp0], 16,
            "Worker 1 dp_rank 0 should have 16 active tokens (from request_2 added by seq_manager_2)"
1180
        );
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197

        // PHASE 2: Free requests using opposite sequence managers, verify on seq_manager_2

        // Free request_2 (which was added by seq_manager_2) using seq_manager_1
        seq_manager_1.free(&"request_2".to_string()).await?;

        // Free request_0 and request_1 (which were added by seq_manager_1) using seq_manager_2
        seq_manager_2.free(&"request_0".to_string()).await?;
        seq_manager_2.free(&"request_1".to_string()).await?;

        // Give some time for synchronization
        tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;

        // Query seq_manager_2 to verify everything is empty
        let blocks_phase2 = seq_manager_2.active_blocks().await;
        let tokens_phase2 = seq_manager_2.active_tokens().await;

Yan Ru Pei's avatar
Yan Ru Pei committed
1198
1199
1200
1201
1202
1203
1204
1205
        // Verify phase 2 results - everything should be empty for all 3 workers
        let all_workers = vec![
            WorkerWithDpRank::new(0, 0),
            WorkerWithDpRank::new(0, 1),
            WorkerWithDpRank::new(1, 0),
        ];

        for worker in all_workers {
1206
            assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1207
1208
1209
                blocks_phase2[&worker], 0,
                "Worker (id={}, dp_rank={}) should have 0 active blocks after all requests freed",
                worker.worker_id, worker.dp_rank
1210
1211
            );
            assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1212
1213
1214
                tokens_phase2[&worker], 0,
                "Worker (id={}, dp_rank={}) should have 0 active tokens after all requests freed",
                worker.worker_id, worker.dp_rank
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
            );
        }

        Ok(())
    }

    #[tokio::test]
    #[ignore]
    async fn test_multi_worker_no_token_sequence_sync() -> Result<()> {
        // Initialize logging once
        dynamo_runtime::logging::init();

        let block_size = 4; // arbitrary block size

        // Create runtime and distributed runtime
        let runtime = Runtime::from_current()?;
        let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;

        // Create namespace and shared component for both seq_managers
        let namespace = distributed.namespace("test_no_token_seq_sync")?;
1235
        let component = namespace.component("sequences")?;
1236
1237
1238

        // Create multi-worker sequence managers with ALL workers [0, 1, 2]
        // Both use the same component to ensure event synchronization works
Yan Ru Pei's avatar
Yan Ru Pei committed
1239
1240
1241
1242
1243
        let mut workers_with_configs = HashMap::new();
        workers_with_configs.insert(0, None);
        workers_with_configs.insert(1, None);
        workers_with_configs.insert(2, None);

1244
1245
1246
        let seq_manager_1 = Arc::new(ActiveSequencesMultiWorker::new(
            component.clone(),
            block_size,
Yan Ru Pei's avatar
Yan Ru Pei committed
1247
            workers_with_configs.clone(),
1248
1249
1250
1251
1252
1253
            true,
            Uuid::new_v4().to_string(),
        ));
        let seq_manager_2 = Arc::new(ActiveSequencesMultiWorker::new(
            component,
            block_size,
Yan Ru Pei's avatar
Yan Ru Pei committed
1254
            workers_with_configs,
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
            true,
            Uuid::new_v4().to_string(),
        ));

        // Give some time for the subscription loops to start
        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

        // PHASE 1: Add requests (without token sequences) using both seq_managers

        // Add request_0 to worker 0 with no token sequence
        seq_manager_1
            .add_request(
                "request_0".to_string(),
                None, // No token sequence
                12,   // ISL (12 tokens)
                0,    // no overlap
Yan Ru Pei's avatar
Yan Ru Pei committed
1271
                WorkerWithDpRank::from_worker_id(0),
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
            )
            .await?;

        // Add request_1 to worker 1 with no token sequence
        seq_manager_1
            .add_request(
                "request_1".to_string(),
                None, // No token sequence
                8,    // ISL (8 tokens)
                0,    // no overlap
Yan Ru Pei's avatar
Yan Ru Pei committed
1282
                WorkerWithDpRank::from_worker_id(1),
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
            )
            .await?;

        // Add request_2 to worker 2 with no token sequence using seq_manager_2
        seq_manager_2
            .add_request(
                "request_2".to_string(),
                None, // No token sequence
                16,   // ISL (16 tokens)
                0,    // no overlap
Yan Ru Pei's avatar
Yan Ru Pei committed
1293
                WorkerWithDpRank::from_worker_id(2),
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
            )
            .await?;

        // Give some time for synchronization
        tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;

        // Query seq_manager_1 to verify it sees all requests including request_2 from seq_manager_2
        let tokens_phase1 = seq_manager_1.active_tokens().await;

        // Verify that seq_manager_1 sees all requests including request_2 from thread 2
Yan Ru Pei's avatar
Yan Ru Pei committed
1304
1305
1306
1307
        let worker_0 = WorkerWithDpRank::from_worker_id(0);
        let worker_1 = WorkerWithDpRank::from_worker_id(1);
        let worker_2 = WorkerWithDpRank::from_worker_id(2);

1308
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1309
            tokens_phase1[&worker_0], 12,
1310
            "Worker 0 should have 12 active tokens"
1311
1312
        );
        assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1313
1314
1315
1316
1317
            tokens_phase1[&worker_1], 8,
            "Worker 1 should have 8 active tokens"
        );
        assert_eq!(
            tokens_phase1[&worker_2], 16,
1318
            "Worker 2 should have 16 active tokens (from request_2 added by seq_manager_2)"
1319
        );
1320

1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
        // PHASE 2: Free requests using opposite sequence managers, verify on seq_manager_2

        // Mark prefill completed and free request_2 (which was added by seq_manager_2) using seq_manager_1
        seq_manager_1
            .mark_prefill_completed(&"request_2".to_string())
            .await?;
        seq_manager_1.free(&"request_2".to_string()).await?;

        // Mark prefill completed and free requests 0 and 1 (which were added by seq_manager_1) using seq_manager_2
        seq_manager_2
            .mark_prefill_completed(&"request_0".to_string())
            .await?;
        seq_manager_2
            .mark_prefill_completed(&"request_1".to_string())
            .await?;
        seq_manager_2.free(&"request_0".to_string()).await?;
        seq_manager_2.free(&"request_1".to_string()).await?;

        // Give some time for synchronization
        tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;

        // Query seq_manager_2 to verify everything is empty
        let tokens_phase2 = seq_manager_2.active_tokens().await;

        // Verify phase 2 results - everything should be empty
        for worker_id in 0..=2 {
Yan Ru Pei's avatar
Yan Ru Pei committed
1347
            let worker = WorkerWithDpRank::from_worker_id(worker_id);
1348
            assert_eq!(
Yan Ru Pei's avatar
Yan Ru Pei committed
1349
                tokens_phase2[&worker], 0,
1350
1351
1352
1353
1354
                "Worker {} should have 0 active tokens after all requests freed",
                worker_id
            );
        }

1355
        Ok(())
1356
1357
    }
}