// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use super::*; use crate::object::ObjectBlockOps; use futures::future::BoxFuture; use parking_lot::RwLock; use std::collections::HashSet; use std::sync::OnceLock; #[derive(Clone)] pub struct VeloWorkerClient { messenger: Arc, remote: InstanceId, g1_handle: Arc>, g2_handle: Arc>, g3_handle: Arc>, /// Track which remote instances we've connected to for has_remote_metadata() connected_instances: Arc>>, } impl WorkerTransfers for VeloWorkerClient { fn execute_local_transfer( &self, src: LogicalLayoutHandle, dst: LogicalLayoutHandle, src_block_ids: Arc<[BlockId]>, dst_block_ids: Arc<[BlockId]>, options: TransferOptions, ) -> Result { // Create a single local event for this operation let event = self.messenger.events().new_event()?; let awaiter = self.messenger.events().awaiter(event.handle())?; // Convert to serializable options // TODO: Extract bounce buffer handle if present in options.bounce_buffer let options = SerializableTransferOptions { layer_range: options.layer_range, nixl_write_notification: options.nixl_write_notification, bounce_buffer_handle: None, bounce_buffer_block_ids: None, }; // Create the message let message = LocalTransferMessage { src, dst, src_block_ids: src_block_ids.to_vec(), dst_block_ids: dst_block_ids.to_vec(), options, }; let bytes = Bytes::from(serde_json::to_vec(&message)?); // Spawn a task for the remote instance let nova = self.messenger.clone(); let remote_instance = self.remote; // Use unary (not am_sync) to wait for transfer completion self.messenger.tracker().spawn_on( async move { let result = nova .unary("kvbm.worker.local_transfer")? .raw_payload(bytes) .instance(remote_instance) .send() .await; match result { Ok(_) => event.trigger(), Err(e) => event.poison(e.to_string()), } }, self.messenger.runtime(), ); Ok(TransferCompleteNotification::from_awaiter(awaiter)) } fn execute_remote_onboard( &self, src: RemoteDescriptor, dst: LogicalLayoutHandle, dst_block_ids: Arc<[BlockId]>, options: TransferOptions, ) -> Result { let event = self.messenger.events().new_event()?; let awaiter = self.messenger.events().awaiter(event.handle())?; let options = SerializableTransferOptions { layer_range: options.layer_range, nixl_write_notification: options.nixl_write_notification, bounce_buffer_handle: None, bounce_buffer_block_ids: None, }; let message = RemoteOnboardMessage { src, dst, dst_block_ids: dst_block_ids.to_vec(), options, }; let bytes = Bytes::from(serde_json::to_vec(&message)?); let nova = self.messenger.clone(); let remote_instance = self.remote; self.messenger.tracker().spawn_on( async move { // Use unary instead of am_sync for explicit response handling let result = nova .unary("kvbm.worker.remote_onboard")? .raw_payload(bytes) .instance(remote_instance) .send() .await; match result { Ok(_) => event.trigger(), Err(e) => event.poison(e.to_string()), } }, self.messenger.runtime(), ); Ok(TransferCompleteNotification::from_awaiter(awaiter)) } fn execute_remote_offload( &self, src: LogicalLayoutHandle, src_block_ids: Arc<[BlockId]>, dst: RemoteDescriptor, options: TransferOptions, ) -> Result { let event = self.messenger.events().new_event()?; let awaiter = self.messenger.events().awaiter(event.handle())?; let options = SerializableTransferOptions { layer_range: options.layer_range, nixl_write_notification: options.nixl_write_notification, bounce_buffer_handle: None, bounce_buffer_block_ids: None, }; let message = RemoteOffloadMessage { src, dst, src_block_ids: src_block_ids.to_vec(), options, }; let bytes = Bytes::from(serde_json::to_vec(&message)?); let nova = self.messenger.clone(); let remote_instance = self.remote; self.messenger.tracker().spawn_on( async move { // Use unary instead of am_sync for explicit response handling let result = nova .unary("kvbm.worker.remote_offload")? .raw_payload(bytes) .instance(remote_instance) .send() .await; match result { Ok(_) => event.trigger(), Err(e) => event.poison(e.to_string()), } }, self.messenger.runtime(), ); Ok(TransferCompleteNotification::from_awaiter(awaiter)) } fn connect_remote( &self, instance_id: InstanceId, metadata: Vec, ) -> Result { // Serialize metadata to bytes (SerializedLayout uses bincode internally) let serialized_metadata: Vec> = metadata.iter().map(|m| m.as_bytes().to_vec()).collect(); let message = ConnectRemoteMessage { instance_id, metadata: serialized_metadata, }; let bytes = Bytes::from(serde_json::to_vec(&message)?); // Create event for completion tracking let event = self.messenger.events().new_event()?; let awaiter = self.messenger.events().awaiter(event.handle())?; let nova = self.messenger.clone(); let remote_instance = self.remote; let connected = self.connected_instances.clone(); let target_instance = instance_id; self.messenger.tracker().spawn_on( async move { let result = nova .unary("kvbm.worker.connect_remote")? .raw_payload(bytes) .instance(remote_instance) .send() .await; match result { Ok(_) => { // Track that we've connected to this instance connected.write().insert(target_instance); event.trigger() } Err(e) => event.poison(e.to_string()), } }, self.messenger.runtime(), ); Ok(ConnectRemoteResponse::from_awaiter(awaiter)) } fn has_remote_metadata(&self, instance_id: InstanceId) -> bool { // Check if we've successfully connected to this instance self.connected_instances.read().contains(&instance_id) } fn execute_remote_onboard_for_instance( &self, instance_id: InstanceId, remote_logical_type: LogicalLayoutHandle, src_block_ids: Vec, dst: LogicalLayoutHandle, dst_block_ids: Arc<[BlockId]>, options: TransferOptions, ) -> Result { let message = ExecuteRemoteOnboardForInstanceMessage { instance_id, remote_logical_type, src_block_ids, dst, dst_block_ids: dst_block_ids.to_vec(), options: SerializableTransferOptions::from(options), }; let bytes = Bytes::from(serde_json::to_vec(&message)?); // Create event for completion tracking let event = self.messenger.events().new_event()?; let awaiter = self.messenger.events().awaiter(event.handle())?; let nova = self.messenger.clone(); let remote_instance = self.remote; self.messenger.tracker().spawn_on( async move { let result = nova .unary("kvbm.worker.remote_onboard_for_instance")? .raw_payload(bytes) .instance(remote_instance) .send() .await; match result { Ok(_) => event.trigger(), Err(e) => event.poison(e.to_string()), } }, self.messenger.runtime(), ); Ok(TransferCompleteNotification::from_awaiter(awaiter)) } } impl Worker for VeloWorkerClient { fn g1_handle(&self) -> Option { self.g1_handle.get().copied() } fn g2_handle(&self) -> Option { self.g2_handle.get().copied() } fn g3_handle(&self) -> Option { self.g3_handle.get().copied() } fn export_metadata(&self) -> Result { // Use unary (not typed_unary) to avoid JSON serialization of bincode data let unary_result = self .messenger .unary("kvbm.worker.export_metadata")? .instance(self.remote) .send(); // Wrap UnaryResult to convert Bytes to SerializedLayout let future = async move { let bytes = unary_result.await?; Ok(SerializedLayout::from_bytes(bytes.to_vec())) }; Ok(SerializedLayoutResponse::from_boxed(Box::pin(future))) } fn import_metadata(&self, metadata: SerializedLayout) -> Result { // Use raw_payload to avoid JSON serialization of bincode data let unary_result = self .messenger .unary("kvbm.worker.import_metadata")? .raw_payload(Bytes::from(metadata.as_bytes().to_vec())) .instance(self.remote) .send(); // Response is JSON-serialized Vec let future = async move { let bytes = unary_result.await?; serde_json::from_slice(&bytes).map_err(|e| { anyhow::anyhow!("Failed to deserialize import_metadata response: {}", e) }) }; Ok(ImportMetadataResponse::from_boxed(Box::pin(future))) } } impl VeloWorkerClient { /// Create a new VeloWorkerClient for communicating with a remote worker. pub fn new(messenger: Arc, remote: InstanceId) -> Self { Self { messenger, remote, g1_handle: Arc::new(OnceLock::new()), g2_handle: Arc::new(OnceLock::new()), g3_handle: Arc::new(OnceLock::new()), connected_instances: Arc::new(RwLock::new(HashSet::new())), } } /// Configure layout handles from serialized metadata. /// /// Call this after worker initialization when handles are known from WorkerLayoutResponse. /// This allows the VeloWorkerClient to provide layout handles like DirectWorker does. /// /// # Arguments /// * `metadata` - SerializedLayout from WorkerLayoutResponse.metadata /// /// # Example /// ```ignore /// let response: WorkerLayoutResponse = worker.initialize(config).await?; /// worker_client.configure_layout_handles(&response.metadata)?; /// ``` pub fn configure_layout_handles(&self, metadata: &SerializedLayout) -> Result<()> { let unpacked = metadata.unpack()?; for desc in &unpacked.layouts { match desc.logical_type { LogicalLayoutHandle::G1 => { self.g1_handle.set(desc.handle).ok(); } LogicalLayoutHandle::G2 => { self.g2_handle.set(desc.handle).ok(); } LogicalLayoutHandle::G3 => { self.g3_handle.set(desc.handle).ok(); } _ => {} } } Ok(()) } /// Get the layout configuration from the remote worker. /// /// This calls the `kvbm.worker.get_layout_config` handler on the remote worker. /// Used by the leader during Phase 3 to gather G1 layout configs from all workers /// and validate they match before creating G2/G3 layouts. /// /// # Returns /// A typed unary result that resolves to the layout configuration pub fn get_layout_config(&self) -> Result<::velo::TypedUnaryResult> { let instance = self.remote; let awaiter = self .messenger .typed_unary::("kvbm.worker.get_layout_config")? .instance(instance) .send(); Ok(awaiter) } /// Configure additional layouts (G2, G3) on the remote worker. /// /// This calls the `kvbm.worker.configure_layouts` handler on the remote worker. /// The worker will create host/pinned cache (G2) and optionally disk cache (G3) /// based on the provided configuration. /// /// # Arguments /// * `config` - Leader-provided configuration specifying block counts and backends /// /// # Returns /// A typed unary result that resolves to the worker's response with updated metadata pub fn configure_layouts( &self, config: LeaderLayoutConfig, ) -> Result<::velo::TypedUnaryResult> { let instance = self.remote; let awaiter = self .messenger .typed_unary::("kvbm.worker.configure_layouts")? .payload(config)? .instance(instance) .send(); Ok(awaiter) } } impl ObjectBlockOps for VeloWorkerClient { fn has_blocks( &self, keys: Vec, ) -> BoxFuture<'static, Vec<(SequenceHash, Option)>> { let message = ObjectHasBlocksMessage { keys: keys.clone() }; let bytes = match serde_json::to_vec(&message) { Ok(b) => Bytes::from(b), Err(_) => { return Box::pin(async move { keys.into_iter().map(|k| (k, None)).collect() }); } }; let nova = self.messenger.clone(); let remote = self.remote; Box::pin(async move { let result = nova .unary("kvbm.worker.object_has_blocks") .ok() .map(|u| u.raw_payload(bytes).instance(remote).send()); match result { Some(unary_result) => match unary_result.await { Ok(response_bytes) => { match serde_json::from_slice::(&response_bytes) { Ok(response) => response.results, Err(_) => keys.into_iter().map(|k| (k, None)).collect(), } } Err(_) => keys.into_iter().map(|k| (k, None)).collect(), }, None => keys.into_iter().map(|k| (k, None)).collect(), } }) } fn put_blocks( &self, keys: Vec, src_layout: LogicalLayoutHandle, block_ids: Vec, ) -> BoxFuture<'static, Vec>> { // For remote workers, we send the logical layout handle - they resolve it locally let message = ObjectPutBlocksMessage { keys: keys.clone(), layout: src_layout, block_ids, }; let bytes = match serde_json::to_vec(&message) { Ok(b) => Bytes::from(b), Err(_) => return Box::pin(async move { keys.into_iter().map(Err).collect() }), }; let nova = self.messenger.clone(); let remote = self.remote; Box::pin(async move { let result = nova .unary("kvbm.worker.object_put_blocks") .ok() .map(|u| u.raw_payload(bytes).instance(remote).send()); match result { Some(unary_result) => match unary_result.await { Ok(response_bytes) => { match serde_json::from_slice::(&response_bytes) { Ok(response) => response.into_results(), Err(_) => keys.into_iter().map(Err).collect(), } } Err(_) => keys.into_iter().map(Err).collect(), }, None => keys.into_iter().map(Err).collect(), } }) } fn get_blocks( &self, keys: Vec, dst_layout: LogicalLayoutHandle, block_ids: Vec, ) -> BoxFuture<'static, Vec>> { // For remote workers, we send the logical layout handle - they resolve it locally let message = ObjectGetBlocksMessage { keys: keys.clone(), layout: dst_layout, block_ids, }; let bytes = match serde_json::to_vec(&message) { Ok(b) => Bytes::from(b), Err(_) => return Box::pin(async move { keys.into_iter().map(Err).collect() }), }; let nova = self.messenger.clone(); let remote = self.remote; Box::pin(async move { let result = nova .unary("kvbm.worker.object_get_blocks") .ok() .map(|u| u.raw_payload(bytes).instance(remote).send()); match result { Some(unary_result) => match unary_result.await { Ok(response_bytes) => { match serde_json::from_slice::(&response_bytes) { Ok(response) => response.into_results(), Err(_) => keys.into_iter().map(Err).collect(), } } Err(_) => keys.into_iter().map(Err).collect(), }, None => keys.into_iter().map(Err).collect(), } }) } }