Unverified Commit e7bcf651 authored by Richard Huo's avatar Richard Huo Committed by GitHub
Browse files

feat: make the kvbm v1's MAX_CONCURRENT_TRANSFERS and MAX_TRANSFER_BATCH_SIZE...

feat: make the kvbm v1's MAX_CONCURRENT_TRANSFERS and MAX_TRANSFER_BATCH_SIZE configurable from env var (#7527)
parent aa3d2859
......@@ -19,7 +19,7 @@ use crate::block_manager::{
transfer::{TransferContext, WriteTo, WriteToStrategy},
},
connector::scheduler::{SchedulingDecision, TransferSchedulerClient},
offload::MAX_TRANSFER_BATCH_SIZE,
offload::max_transfer_batch_size,
storage::{DeviceStorage, DiskStorage, Local, PinnedStorage},
};
......@@ -40,7 +40,7 @@ pub struct ConnectorTransferBatcher {
impl ConnectorTransferBatcher {
pub fn new() -> Self {
Self {
max_batch_size: MAX_TRANSFER_BATCH_SIZE,
max_batch_size: max_transfer_batch_size(),
}
}
......
......@@ -16,7 +16,7 @@ use crate::block_manager::{
},
connector::scheduler::TransferSchedulerClient,
layout::LayoutType,
offload::{MAX_CONCURRENT_TRANSFERS, MAX_TRANSFER_BATCH_SIZE},
offload::{max_concurrent_transfers, max_transfer_batch_size},
storage::{DeviceAllocator, DeviceStorage, DiskAllocator, PinnedAllocator, torch::TorchTensor},
};
......@@ -115,8 +115,8 @@ async fn perform_allocation_and_build_handler(
let agent = build_agent(worker_id, leader_meta.num_disk_blocks > 0)?;
let pool_config = PoolConfig {
enable_pool: true,
max_concurrent_transfers: MAX_CONCURRENT_TRANSFERS,
max_transfer_batch_size: MAX_TRANSFER_BATCH_SIZE,
max_concurrent_transfers: max_concurrent_transfers(),
max_transfer_batch_size: max_transfer_batch_size(),
num_outer_components: device_layout.config().outer_dim,
num_layers: device_layout.config().num_layers,
};
......
......@@ -54,6 +54,7 @@ use tokio_util::sync::CancellationToken;
use anyhow::Result;
use std::any::Any;
use std::env;
use std::collections::BTreeSet;
......@@ -69,8 +70,50 @@ use derive_builder::Builder;
use derive_getters::Getters;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub const MAX_CONCURRENT_TRANSFERS: usize = 4;
pub const MAX_TRANSFER_BATCH_SIZE: usize = 16;
const DEFAULT_MAX_CONCURRENT_TRANSFERS: usize = 4;
const DEFAULT_MAX_TRANSFER_BATCH_SIZE: usize = 16;
pub fn max_concurrent_transfers() -> usize {
read_usize_env(
"DYN_KVBM_MAX_CONCURRENT_TRANSFERS",
DEFAULT_MAX_CONCURRENT_TRANSFERS,
)
}
pub fn max_transfer_batch_size() -> usize {
read_usize_env(
"DYN_KVBM_MAX_TRANSFER_BATCH_SIZE",
DEFAULT_MAX_TRANSFER_BATCH_SIZE,
)
}
fn read_usize_env(name: &str, default: usize) -> usize {
match env::var(name) {
Ok(value) => match value.parse::<usize>() {
Ok(parsed) if parsed > 0 => parsed,
Ok(_) => {
tracing::warn!(
env_var = name,
value = %value,
default,
"Environment variable must be > 0; using default"
);
default
}
Err(err) => {
tracing::warn!(
env_var = name,
value = %value,
default,
error = %err,
"Failed to parse environment variable as usize; using default"
);
default
}
},
Err(_) => default,
}
}
/// Configuration for creating an OffloadManager
pub struct OffloadManagerConfig {
......@@ -145,10 +188,19 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
let cuda_ctx = Cuda::device_or_create(0)?;
let max_concurrent_transfers = max_concurrent_transfers();
let max_transfer_batch_size = max_transfer_batch_size();
tracing::info!(
max_concurrent_transfers,
max_transfer_batch_size,
"Configured offload transfer settings"
);
let pool_config = PoolConfig {
enable_pool: true,
max_concurrent_transfers: MAX_CONCURRENT_TRANSFERS,
max_transfer_batch_size: MAX_TRANSFER_BATCH_SIZE,
max_concurrent_transfers,
max_transfer_batch_size,
num_outer_components: config.model_config.outer_dim,
num_layers: config.model_config.num_layers,
};
......@@ -179,11 +231,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
Arc::new(TransferBatcher::new(
LocalTransferManager::new(
device_offload_transfer_ctx,
MAX_CONCURRENT_TRANSFERS,
max_concurrent_transfers,
&config.async_rt_handle,
config.cancellation_token.clone(),
)?,
MAX_TRANSFER_BATCH_SIZE,
max_transfer_batch_size,
&config.async_rt_handle,
config.cancellation_token.clone(),
)),
......@@ -225,11 +277,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
Arc::new(TransferBatcher::new(
LocalTransferManager::new(
transfer_ctx.clone(),
MAX_CONCURRENT_TRANSFERS,
max_concurrent_transfers,
&config.async_rt_handle,
config.cancellation_token.clone(),
)?,
MAX_TRANSFER_BATCH_SIZE,
max_transfer_batch_size,
&config.async_rt_handle,
config.cancellation_token.clone(),
)),
......@@ -256,11 +308,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
Arc::new(TransferBatcher::new(
LocalTransferManager::new(
transfer_ctx.clone(),
MAX_CONCURRENT_TRANSFERS,
max_concurrent_transfers,
&config.async_rt_handle,
config.cancellation_token.clone(),
)?,
MAX_TRANSFER_BATCH_SIZE,
max_transfer_batch_size,
&config.async_rt_handle,
config.cancellation_token.clone(),
)),
......@@ -282,11 +334,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
Arc::new(TransferBatcher::new(
LocalTransferManager::new(
transfer_ctx.clone(),
MAX_CONCURRENT_TRANSFERS,
max_concurrent_transfers,
&config.async_rt_handle,
config.cancellation_token.clone(),
)?,
MAX_TRANSFER_BATCH_SIZE,
max_transfer_batch_size,
&config.async_rt_handle,
config.cancellation_token.clone(),
)),
......@@ -313,11 +365,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
Arc::new(TransferBatcher::new(
LocalTransferManager::new(
transfer_ctx.clone(),
MAX_CONCURRENT_TRANSFERS,
max_concurrent_transfers,
&config.async_rt_handle,
config.cancellation_token.clone(),
)?,
MAX_TRANSFER_BATCH_SIZE,
max_transfer_batch_size,
&config.async_rt_handle,
config.cancellation_token.clone(),
)),
......@@ -1599,9 +1651,9 @@ mod tests {
#[tokio::test]
async fn test_transfer_batcher() -> Result<()> {
let (offload_manager, device_pool, _, disk_pool) = build_pools(
2 * MAX_TRANSFER_BATCH_SIZE + 1,
2 * max_transfer_batch_size() + 1,
None,
Some(2 * MAX_TRANSFER_BATCH_SIZE + 1),
Some(2 * max_transfer_batch_size() + 1),
None,
)?;
......@@ -1610,7 +1662,7 @@ mod tests {
let mut disk_blocks = Vec::new();
for i in 0..2 * MAX_TRANSFER_BATCH_SIZE + 1 {
for i in 0..2 * max_transfer_batch_size() + 1 {
let disk_block = completed_block(disk_pool, [i as u32; 4]).await?;
populate_block(&disk_block, i as u8)?;
disk_blocks.push(disk_block);
......@@ -1621,7 +1673,7 @@ mod tests {
let device_blocks = offload_manager
.onboard(immutable_disk_blocks.clone(), None)
.await??;
assert_eq!(device_blocks.len(), 2 * MAX_TRANSFER_BATCH_SIZE + 1);
assert_eq!(device_blocks.len(), 2 * max_transfer_batch_size() + 1);
for (i, device_block) in device_blocks.iter().enumerate() {
let blocks = device_pool
......@@ -2634,4 +2686,50 @@ mod tests {
Ok(())
}
// ============================================================================
// ENVIRONMENT CONFIGURATION TESTS
// ============================================================================
#[test]
fn test_config_defaults() {
temp_env::with_vars(
vec![
("DYN_KVBM_MAX_CONCURRENT_TRANSFERS", None::<&str>),
("DYN_KVBM_MAX_TRANSFER_BATCH_SIZE", None::<&str>),
],
|| {
assert_eq!(max_concurrent_transfers(), DEFAULT_MAX_CONCURRENT_TRANSFERS);
assert_eq!(max_transfer_batch_size(), DEFAULT_MAX_TRANSFER_BATCH_SIZE);
},
);
}
#[test]
fn test_config_custom_values() {
temp_env::with_vars(
vec![
("DYN_KVBM_MAX_CONCURRENT_TRANSFERS", Some("64")),
("DYN_KVBM_MAX_TRANSFER_BATCH_SIZE", Some("128")),
],
|| {
assert_eq!(max_concurrent_transfers(), 64);
assert_eq!(max_transfer_batch_size(), 128);
},
);
}
#[test]
fn test_config_invalid_values_fallback() {
temp_env::with_vars(
vec![
("DYN_KVBM_MAX_CONCURRENT_TRANSFERS", Some("not_a_number")),
("DYN_KVBM_MAX_TRANSFER_BATCH_SIZE", Some("0")),
],
|| {
// Should log a tracing::warn and return defaults
assert_eq!(max_concurrent_transfers(), DEFAULT_MAX_CONCURRENT_TRANSFERS);
assert_eq!(max_transfer_batch_size(), DEFAULT_MAX_TRANSFER_BATCH_SIZE);
},
);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment