"vllm/vscode:/vscode.git/clone" did not exist on "1c046447a6d1ac3c99b9f453796f0d355d673deb"
events.rs 11.2 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use super::block::registry::RegistrationHandle;

8
9
10
use crate::block_manager::kv_consolidator::EventSource;
use crate::block_manager::kv_consolidator::KvEventConsolidator;

Ryan Olson's avatar
Ryan Olson committed
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/// The [EventManager] is not responsible for managing the history of the blocks, nor what
/// events have been published.
///
/// The [EventManager] is only responsible for issuing events on state changes. In this case,
/// there are two states:
///
/// - Store: a dynamo event plane message will be published which defines the registration/storing
///   of the block. Details include, but are not limited to, the sequence/prefix hash, the local block
///   hash, the sequence position of the block, the block size, and the storage location/class which
///   the block is stored in.
///
/// - Remove: a dynamo event plane message will be published which defines the removal of the block
///   from the cache. This messasge will include enough information to identify the block within a
///   storage hierarchy; minmally, the sequence hash and the storage location/class.
///
/// The [RegistrationHandle] associated from [EventManager::block_register] call is an RAII object
/// which will trigger a `Remove` event on being dropped.
pub trait EventManager: EventPublisher + EventReleaseManager + Send + Sync {
    // fn register_block(&self, token_block: &TokenBlock) -> PublishHandle;
    // fn publisher(&self) -> Publisher;
}

pub trait EventPublisher: Send + Sync {
    fn publish(&self, handles: Vec<Arc<RegistrationHandle>>);
}

pub trait EventReleaseManager: Send + Sync {
    fn block_release(&self, registration_handle: &RegistrationHandle);
}

/// A handle to a registered block.
///
/// Ensures that the register event published before the release event by
/// holding an [Arc] to the [RegistrationHandle], which by extension holds
/// issues the release event when dropped.
///
/// Ownership of the [PublishHandle] transferred to a [Publisher] object
/// which is responsible for coordinating the publication of multiple
/// registration events.
pub struct PublishHandle {
    handle: Arc<RegistrationHandle>,
    publisher: Option<Arc<dyn EventPublisher>>,
}

impl PublishHandle {
    pub fn new(handle: RegistrationHandle, publisher: Arc<dyn EventPublisher>) -> Self {
        let handle = Arc::new(handle);
        let publisher = Some(publisher);
        Self { handle, publisher }
    }

    pub fn remove_handle(&self) -> Arc<RegistrationHandle> {
        self.handle.clone()
    }

    fn disarm(&mut self) {
        self.publisher = None;
    }
}

impl Drop for PublishHandle {
    fn drop(&mut self) {
        if let Some(publisher) = self.publisher.take() {
            publisher.publish(vec![self.handle.clone()]);
        }
    }
}

/// Responsible for publishing multiple registration events.
///
/// Because [EventPublisher::publish] takes a list of shared [RegistrationHandles][RegistrationHandle]
/// this allows the [EventPublisher] logic to optimize the number of events published
/// by consoldiate multiple registration events with additional sequence logic.
///
/// The behavior of the [EventPublisher] is left entirely up to the the implementor.
#[derive(Clone)]
pub struct Publisher {
    handles: Vec<Arc<RegistrationHandle>>,
    publisher: Arc<dyn EventPublisher>,
}

impl Publisher {
    pub fn new(publisher: Arc<dyn EventPublisher>) -> Self {
        Self {
            handles: Vec::new(),
            publisher,
        }
    }

    pub fn take_handle(&mut self, publish_handle: PublishHandle) -> Arc<RegistrationHandle> {
        let handle = publish_handle.remove_handle();
        self.handles.push(handle.clone());
        let mut publish_handle = publish_handle;
        publish_handle.disarm();
        handle
    }

    pub fn publish(&mut self) {
        let handles = std::mem::take(&mut self.handles);
        if !handles.is_empty() {
            self.publisher.publish(handles);
        }
    }
}

impl Drop for Publisher {
    fn drop(&mut self) {
        self.publish();
    }
}

// Implementation notes:
//
// - Removable events are per blocks. I think we will want to leverage a task to collect drop/remove
//   events so that we can batch them together.
//
// - Registration events are can be batched by the nature of the [EventManager::register_blocks] call.

pub struct NullEventManager;

impl NullEventManager {
    pub fn new() -> Arc<Self> {
        Arc::new(Self {})
    }
}

impl EventManager for NullEventManager {}

impl EventPublisher for NullEventManager {
    fn publish(&self, _handles: Vec<Arc<RegistrationHandle>>) {}
}

impl EventReleaseManager for NullEventManager {
    fn block_release(&self, _registration_handle: &RegistrationHandle) {}
}

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
/// Event manager that sends KVBM events to the kv event consolidator
pub struct DynamoEventManager {
    consolidator_handle: Arc<crate::block_manager::kv_consolidator::KvEventConsolidatorHandle>,
    #[allow(dead_code)]
    _consolidator: Option<Arc<crate::block_manager::kv_consolidator::KvEventConsolidator>>,
}

impl DynamoEventManager {
    /// Create a new DynamoEventManager with a consolidator handle
    pub fn new(
        consolidator_handle: Arc<crate::block_manager::kv_consolidator::KvEventConsolidatorHandle>,
    ) -> Arc<Self> {
        Arc::new(Self {
            consolidator_handle,
            _consolidator: None,
        })
    }

    /// Create a new DynamoEventManager with kv event consolidator configuration
    ///
    /// This creates and manages the kv event consolidator internally.
    /// The kv event consolidator will be started asynchronously.
    pub async fn new_with_config(
        config: crate::block_manager::kv_consolidator::KvEventConsolidatorConfig,
    ) -> anyhow::Result<Arc<Self>> {
        let mut kv_event_consolidator = KvEventConsolidator::new(config)?;
        kv_event_consolidator.start().await?;
        let handle = kv_event_consolidator.get_handle();

        Ok(Arc::new(Self {
            consolidator_handle: Arc::new(handle),
            _consolidator: Some(Arc::new(kv_event_consolidator)),
        }))
    }

    /// Send store events to the kv event consolidator
    ///
    /// Called when KVBM registers/stores blocks. Sends events to the kv event consolidator
    /// which will deduplicate them with vLLM events.
    ///
    fn publish_store_events(&self, handles: Vec<Arc<RegistrationHandle>>) {
        if handles.is_empty() {
            return;
        }

        tracing::debug!(
            "DynamoEventManager::publish_store_events called with {} blocks",
            handles.len()
        );

        // Send each block to the consolidator
        let kv_event_consolidator = self.consolidator_handle.clone();

        if let Ok(rt) = tokio::runtime::Handle::try_current() {
            rt.spawn(async move {
                for handle in handles {
                    // Extract block metadata from RegistrationHandle
                    let block_hash = handle.sequence_hash().to_string();
                    let parent_hash = handle.parent_sequence_hash().map(|h| h.to_string());

                    // Extract block_size and tokens from RegistrationHandle
                    let block_size = handle.block_size(); // usize
                    let tokens: Vec<u32> = handle.tokens().iter().copied().collect();

                    tracing::debug!(
                        "DynamoEventManager sending store event to kv event consolidator: block_hash={}, block_size={}, tokens={}",
                        block_hash,
                        block_size,
                        tokens.len()
                    );

                    // Send to consolidator with EventSource::Kvbm
                    kv_event_consolidator
                        .handle_store(
                            block_hash,
                            EventSource::Kvbm,
                            tokens,
                            parent_hash,
                            block_size,
                            None, // lora_id
                            None, // tier
                            None, // data_parallel_rank
                        )
                        .await;
                }
            });
        } else {
            tracing::error!(
                "No Tokio runtime in context; dropping store events for {} blocks",
                handles.len()
            );
        }
    }

    /// Send remove event to the kv event consolidator
    ///
    /// Called when a RegistrationHandle is dropped (block evicted from KVBM).
    fn publish_remove_event(&self, registration_handle: &RegistrationHandle) {
        let block_hash = registration_handle.sequence_hash().to_string();

        tracing::debug!(
            "DynamoEventManager::publish_remove_event called: block_hash={}",
            block_hash
        );

        let kv_event_consolidator = self.consolidator_handle.clone();

        if let Ok(rt) = tokio::runtime::Handle::try_current() {
            rt.spawn(async move {
                kv_event_consolidator
                    .handle_remove(&block_hash, EventSource::Kvbm)
                    .await;
            });
        } else {
            tracing::error!(
                "No Tokio runtime in context; dropping remove event for block {}",
                block_hash
            );
        }
    }
}

impl std::fmt::Debug for DynamoEventManager {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "DynamoEventManager(kv_event_consolidator)")
    }
}

impl EventManager for DynamoEventManager {}

impl EventPublisher for DynamoEventManager {
    fn publish(&self, handles: Vec<Arc<RegistrationHandle>>) {
        self.publish_store_events(handles);
    }
}

impl EventReleaseManager for DynamoEventManager {
    fn block_release(&self, registration_handle: &RegistrationHandle) {
        self.publish_remove_event(registration_handle);
    }
}

Ryan Olson's avatar
Ryan Olson committed
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
#[cfg(test)]
pub mod tests {
    use crate::tokens::SequenceHash;

    use super::*;

    #[derive(Debug, PartialEq, Eq)]
    pub enum EventType {
        Register(SequenceHash),
        Remove(SequenceHash),
    }

    pub struct MockEventManager {
        tx: tokio::sync::mpsc::UnboundedSender<Vec<EventType>>,
    }

    impl MockEventManager {
        pub fn new() -> (
            Arc<Self>,
            tokio::sync::mpsc::UnboundedReceiver<Vec<EventType>>,
        ) {
            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
            (Arc::new(Self { tx }), rx)
        }

        pub fn publisher(self: &Arc<Self>) -> Publisher {
            Publisher::new(self.clone())
        }
    }

    impl EventManager for MockEventManager {}

    impl EventPublisher for MockEventManager {
        fn publish(&self, handles: Vec<Arc<RegistrationHandle>>) {
            let events = handles
                .iter()
                .map(|handle| EventType::Register(handle.sequence_hash()))
                .collect::<Vec<_>>();
            self.tx.send(events).unwrap();
        }
    }

    impl EventReleaseManager for MockEventManager {
        fn block_release(&self, registration_handle: &RegistrationHandle) {
            let events = vec![EventType::Remove(registration_handle.sequence_hash())];
            self.tx.send(events).unwrap();
        }
    }
}