service.rs 12.7 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use kvbm_physical::manager::SerializedLayout;

use super::{
    Arc, ConnectRemoteMessage, DirectWorker, ExecuteRemoteOnboardForInstanceMessage,
    LocalTransferMessage, ObjectGetBlocksMessage, ObjectHasBlocksMessage, ObjectHasBlocksResponse,
    ObjectPutBlocksMessage, ObjectPutGetBlocksResponse, RemoteOffloadMessage, RemoteOnboardMessage,
    Result, TransferOptions, WorkerTransfers,
};
use crate::object::ObjectBlockOps;

use bytes::Bytes;
use derive_builder::Builder;

use ::velo::{Handler, Messenger};

/// Builder for VeloWorkerService - provides flexibility in construction.
///
/// Use this builder when you need to:
/// - Pass a pre-built DirectWorker (when caller manages layout registration)
/// - Pass a pre-built TransferManager (service creates DirectWorker)
/// - Have more control over worker configuration
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct VeloWorkerService {
    messenger: Arc<Messenger>,
    worker: Arc<DirectWorker>,
}

impl VeloWorkerService {
    pub fn new(messenger: Arc<Messenger>, worker: Arc<DirectWorker>) -> Result<Self> {
        let service = Self { messenger, worker };
        service.register_handlers()?;
        Ok(service)
    }

    /// Access the underlying DirectWorker.
    ///
    /// This is useful for:
    /// - Registering additional layouts after service creation
    /// - Exporting metadata for handshake
    /// - Accessing the TransferManager
    pub fn worker(&self) -> &Arc<DirectWorker> {
        &self.worker
    }

    /// Register all worker handlers with Nova
    fn register_handlers(&self) -> Result<()> {
        self.register_local_transfer_handler()?;
        self.register_remote_onboard_handler()?;
        self.register_remote_offload_handler()?;
        self.register_import_metadata_handler()?;
        self.register_export_metadata_handler()?;
        self.register_connect_remote_handler()?;
        self.register_execute_remote_onboard_for_instance_handler()?;
        // Object storage handlers
        self.register_object_has_blocks_handler()?;
        self.register_object_put_blocks_handler()?;
        self.register_object_get_blocks_handler()?;
        Ok(())
    }

    fn register_local_transfer_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        // Use unary_handler_async for explicit response (client waits for transfer completion)
        let handler = Handler::unary_handler_async("kvbm.worker.local_transfer", move |ctx| {
            let worker = worker.clone();

            async move {
                // Deserialize the message
                let message: LocalTransferMessage = serde_json::from_slice(&ctx.payload)?;

                // Convert options and resolve bounce buffer if present
                let bounce_buffer_parts = message.options.bounce_buffer_parts();
                let mut options: TransferOptions = message.options.into();
                if let Some((handle, block_ids)) = bounce_buffer_parts {
                    options.bounce_buffer = Some(worker.create_bounce_buffer(handle, block_ids)?);
                }

                let notification = worker.execute_local_transfer(
                    message.src,
                    message.dst,
                    Arc::from(message.src_block_ids),
                    Arc::from(message.dst_block_ids),
                    options,
                )?;

                // Await the transfer completion
                notification.await?;

                // Return empty response to signal success
                Ok(Some(Bytes::new()))
            }
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    fn register_remote_onboard_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        // Use unary_handler_async for explicit response (works with unary client)
        let handler = Handler::unary_handler_async("kvbm.worker.remote_onboard", move |ctx| {
            let worker = worker.clone();

            async move {
                let message: RemoteOnboardMessage = serde_json::from_slice(&ctx.payload)?;

                // Convert options and resolve bounce buffer if present
                let bounce_buffer_parts = message.options.bounce_buffer_parts();
                let mut options: TransferOptions = message.options.into();
                if let Some((handle, block_ids)) = bounce_buffer_parts {
                    options.bounce_buffer = Some(worker.create_bounce_buffer(handle, block_ids)?);
                }

                let notification = worker.execute_remote_onboard(
                    message.src,
                    message.dst,
                    Arc::from(message.dst_block_ids),
                    options,
                )?;

                notification.await?;

                Ok(Some(Bytes::new()))
            }
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    fn register_remote_offload_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        // Use unary_handler_async for explicit response (works with unary client)
        let handler = Handler::unary_handler_async("kvbm.worker.remote_offload", move |ctx| {
            let worker = worker.clone();

            async move {
                let message: RemoteOffloadMessage = serde_json::from_slice(&ctx.payload)?;

                // Convert options and resolve bounce buffer if present
                let bounce_buffer_parts = message.options.bounce_buffer_parts();
                let mut options: TransferOptions = message.options.into();
                if let Some((handle, block_ids)) = bounce_buffer_parts {
                    options.bounce_buffer = Some(worker.create_bounce_buffer(handle, block_ids)?);
                }

                let notification = worker.execute_remote_offload(
                    message.src,
                    Arc::from(message.src_block_ids),
                    message.dst,
                    options,
                )?;

                notification.await?;

                Ok(Some(Bytes::new()))
            }
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    fn register_import_metadata_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        let handler = Handler::unary_handler("kvbm.worker.import_metadata", move |ctx| {
            let metadata = SerializedLayout::from_bytes(ctx.payload.to_vec());
            let handles = worker.import_metadata(metadata)?;
            Ok(Some(Bytes::from(serde_json::to_vec(&handles)?)))
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    fn register_export_metadata_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        let handler = Handler::unary_handler("kvbm.worker.export_metadata", move |_ctx| {
            let response = worker.export_metadata()?;
            Ok(Some(Bytes::from(response.as_bytes().to_vec())))
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    /// Register handler for connect_remote - stores remote instance metadata in local worker
    fn register_connect_remote_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        let handler = Handler::unary_handler("kvbm.worker.connect_remote", move |ctx| {
            let message: ConnectRemoteMessage = serde_json::from_slice(&ctx.payload)?;

            // Deserialize metadata (SerializedLayout stored as raw bytes)
            let metadata: Vec<SerializedLayout> = message
                .metadata
                .into_iter()
                .map(SerializedLayout::from_bytes)
                .collect();

            // Call DirectWorker.connect_remote()
            worker.connect_remote(message.instance_id, metadata)?;

            // Return empty response to signal success
            Ok(Some(Bytes::new()))
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    /// Register handler for execute_remote_onboard_for_instance - pulls from remote using instance ID
    fn register_execute_remote_onboard_for_instance_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        let handler =
            Handler::unary_handler_async("kvbm.worker.remote_onboard_for_instance", move |ctx| {
                let worker = worker.clone();
                async move {
                    let message: ExecuteRemoteOnboardForInstanceMessage =
                        serde_json::from_slice(&ctx.payload)?;

                    // Convert options and resolve bounce buffer if present
                    let bounce_buffer_parts = message.options.bounce_buffer_parts();
                    let mut options: TransferOptions = message.options.into();
                    if let Some((handle, block_ids)) = bounce_buffer_parts {
                        options.bounce_buffer =
                            Some(worker.create_bounce_buffer(handle, block_ids)?);
                    }

                    let notification = worker.execute_remote_onboard_for_instance(
                        message.instance_id,
                        message.remote_logical_type,
                        message.src_block_ids,
                        message.dst,
                        Arc::from(message.dst_block_ids),
                        options,
                    )?;

                    notification.await?;
                    Ok(Some(Bytes::new()))
                }
            })
            .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    // ========================================================================
    // Object Storage Handlers
    // ========================================================================

    /// Register handler for object_has_blocks - check if blocks exist in object storage
    fn register_object_has_blocks_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        let handler = Handler::unary_handler_async("kvbm.worker.object_has_blocks", move |ctx| {
            let worker = worker.clone();

            async move {
                let message: ObjectHasBlocksMessage = serde_json::from_slice(&ctx.payload)?;

                // Call DirectWorker's ObjectBlockOps implementation
                let results = worker.has_blocks(message.keys).await;

                let response = ObjectHasBlocksResponse { results };
                Ok(Some(Bytes::from(serde_json::to_vec(&response)?)))
            }
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    /// Register handler for object_put_blocks - upload blocks to object storage
    fn register_object_put_blocks_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        let handler = Handler::unary_handler_async("kvbm.worker.object_put_blocks", move |ctx| {
            let worker = worker.clone();

            async move {
                let message: ObjectPutBlocksMessage = serde_json::from_slice(&ctx.payload)?;

                // Call DirectWorker's ObjectBlockOps implementation
                // DirectWorker resolves logical handle to physical layout internally
                let results = worker
                    .put_blocks(message.keys, message.layout, message.block_ids)
                    .await;

                let response = ObjectPutGetBlocksResponse::from_results(results);
                Ok(Some(Bytes::from(serde_json::to_vec(&response)?)))
            }
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }

    /// Register handler for object_get_blocks - download blocks from object storage
    fn register_object_get_blocks_handler(&self) -> Result<()> {
        let worker = self.worker.clone();

        let handler = Handler::unary_handler_async("kvbm.worker.object_get_blocks", move |ctx| {
            let worker = worker.clone();

            async move {
                let message: ObjectGetBlocksMessage = serde_json::from_slice(&ctx.payload)?;

                // Call DirectWorker's ObjectBlockOps implementation
                // DirectWorker resolves logical handle to physical layout internally
                let results = worker
                    .get_blocks(message.keys, message.layout, message.block_ids)
                    .await;

                let response = ObjectPutGetBlocksResponse::from_results(results);
                Ok(Some(Bytes::from(serde_json::to_vec(&response)?)))
            }
        })
        .build();

        self.messenger.register_handler(handler)?;
        Ok(())
    }
}