// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use super::*; use crate::block_manager::pool::{AsyncBlockPoolController, BlockPoolStatus}; use futures::stream; use serde_json::Value; impl ControllerHandler { pub fn new(block_manager: KvBlockManager) -> Arc { Arc::new(Self { block_manager }) } fn get_pool_controller( &self, cache_level: &CacheLevel, ) -> Result<&dyn AsyncBlockPoolController> { match cache_level { CacheLevel::G1 => Ok(self .block_manager .device() .ok_or_else(|| anyhow::anyhow!("Device pool not found"))?), CacheLevel::G2 => Ok(self .block_manager .host() .ok_or_else(|| anyhow::anyhow!("Host pool not found"))?), CacheLevel::G3 => Ok(self .block_manager .disk() .ok_or_else(|| anyhow::anyhow!("Disk pool not found"))?), } } async fn reset_pool(&self, cache_level: &CacheLevel) -> Result<()> { Ok(self.get_pool_controller(cache_level)?.reset().await?) } async fn handle_status(&self, cache_level: &CacheLevel) -> Result { let pool_controller = self.get_pool_controller(cache_level)?; Ok(pool_controller.status().await?) } async fn handle_pool_reset(&self, cache_level: &CacheLevel) -> Result<()> { self.reset_pool(cache_level).await } async fn handle_blocks_reset( &self, cache_level: &CacheLevel, sequence_hashes: Vec, ) -> Result { let pool_controller = self.get_pool_controller(cache_level)?; Ok(pool_controller.reset_blocks(&sequence_hashes).await?) } async fn handle_reset_all(&self) -> Result<()> { for cache_level in &[CacheLevel::G1, CacheLevel::G2, CacheLevel::G3] { if let Ok(pool_controller) = self.get_pool_controller(cache_level) { pool_controller.reset().await?; } } Ok(()) } } #[async_trait] impl AsyncEngine for ControllerHandler { async fn generate(&self, input: HandlerInput) -> Result { let (data, ctx) = input.into_parts(); let annotated = match data { ControlMessage::Status(cache_level) => { // handle status make_response(self.handle_status(&cache_level).await) } ControlMessage::ResetPool(cache_level) => { // handle reset make_unit_response(self.handle_pool_reset(&cache_level).await) } ControlMessage::ResetBlocks(request) => { // handle reset blocks make_response( self.handle_blocks_reset(&request.cache_level, request.sequence_hashes) .await, ) } ControlMessage::ResetAll => { // hadnle reset all make_unit_response(self.handle_reset_all().await) } }; let stream = stream::once(async move { annotated }); Ok(ResponseStream::new(Box::pin(stream), ctx.context())) } } fn make_unit_response(response: Result<()>) -> Annotated { match response { Ok(()) => Annotated::from_data(serde_json::Value::Null), Err(e) => Annotated::from_error(e.to_string()), } } fn make_response(response: Result) -> Annotated { match response { Ok(response) => match serde_json::to_value(response) { Ok(values) => Annotated::from_data(values), Err(e) => Annotated::from_error(e.to_string()), }, Err(e) => Annotated::from_error(e.to_string()), } }