Unverified Commit c8adbe6f authored by atchernych's avatar atchernych Committed by GitHub
Browse files

fix: Load deployment card from ModelExpress for EPP (#3793)


Signed-off-by: default avatarAnna Tchernych <atchernych@nvidia.com>
parent 8642c4bd
......@@ -18,11 +18,11 @@ ENV CXX=g++
# C/C++ toolchain for cgo, and libstdc++ for link-time
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc g++ \
libc6-dev \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
build-essential \
gcc g++ \
libc6-dev \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
ARG COMMIT_SHA=unknown
ARG BUILD_REF
......@@ -56,13 +56,18 @@ FROM ${BASE_IMAGE} AS runtime
# Minimal runtime deps; include libstdc++ runtime for -lstdc++
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
libstdc++6 \
&& rm -rf /var/lib/apt/lists/* \
&& groupadd -r nonroot && useradd -r -g nonroot nonroot
ca-certificates \
libstdc++6 \
&& rm -rf /var/lib/apt/lists/* \
&& groupadd -r nonroot && useradd -r -g nonroot -m -d /home/nonroot nonroot \
&& mkdir -p /home/nonroot/.cache/huggingface/hub \
&& chown -R nonroot:nonroot /home/nonroot
WORKDIR /
COPY --from=builder /epp /epp
# Set HOME so ModelExpress can find the cache directory
ENV HOME=/home/nonroot
USER nonroot:nonroot
ENTRYPOINT ["/epp"]
......@@ -69,8 +69,8 @@ if [[ ! -f "${DYNAMO_LIB_DIR}/libdynamo_llm_capi.a" ]]; then
exit 1
fi
if [[ ! -f "${GAIE_DIR}/Dockerfile.epp" ]]; then
echo "Docker.epp file copy failed!"
if [[ ! -f "${GAIE_DIR}/Dockerfile.dynamo" ]]; then
echo "Docker.dynamo file copy failed!"
exit 1
fi
......
......@@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use dynamo_llm::kv_router::{
indexer::compute_block_hash_for_seq, protocols::*, publisher::KvEventPublisher,
};
use dynamo_runtime::{DistributedRuntime, Worker, storage::key_value_store::Key};
use dynamo_runtime::{DistributedRuntime, Worker};
static WK: OnceCell<Worker> = OnceCell::new();
static DRT: AsyncOnceCell<DistributedRuntime> = AsyncOnceCell::new();
// [FIXME] shouldn't the publisher be instance passing between API calls?
......@@ -329,18 +329,15 @@ pub extern "C" fn dynamo_kv_event_publish_removed(
/* ------------------------------------------------------------------------
* Worker selection pipeline
* ------------------------------------------------------------------------ */
use std::{pin::Pin, sync::Arc};
use std::pin::Pin;
const GENERATE_ENDPOINT: &str = "generate";
use anyhow::Context;
use dynamo_runtime::{
Runtime, distributed::DistributedConfig, slug::Slug, traits::DistributedRuntimeProvider,
};
use dynamo_runtime::{Runtime, distributed::DistributedConfig, traits::DistributedRuntimeProvider};
use dynamo_llm::discovery::ModelManager;
use dynamo_llm::entrypoint::build_routed_pipeline;
use dynamo_llm::kv_router::KvRouter;
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_llm::model_card::ModelDeploymentCard;
use dynamo_llm::protocols::openai::nvext::NvExt;
......@@ -351,7 +348,6 @@ use dynamo_llm::types::{
},
};
use dynamo_runtime::{
component::Client,
engine::AsyncEngineStream,
pipeline::{ManyOut, RouterMode, ServiceEngine, SingleIn},
};
......@@ -902,41 +898,7 @@ pub async fn query_worker_selection_and_annotate(
Ok((worker_id, tokens, original_request))
}
/// Build a worker selection pipeline
/// The router handles query_instance_id annotations and returns worker_instance_id and token_data annotations.
pub async fn build_worker_selection_pipeline_chat(
card: &ModelDeploymentCard,
client: &Client,
router_mode: RouterMode,
busy_threshold: Option<f64>,
chooser: Option<Arc<KvRouter>>,
) -> anyhow::Result<
ServiceEngine<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
>,
> {
let hf_tokenizer = card
.tokenizer_hf()
.with_context(|| "Failed to load HF tokenizer")?;
let engine = build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(
card,
client,
router_mode,
busy_threshold,
chooser,
hf_tokenizer,
None,
)
.await?;
Ok(engine)
}
/// Helper function to create worker selection pipeline for OpenAI Chat Completion requests
/// Create a worker selection pipeline for OpenAI Chat Completion requests
///
/// This is a concrete implementation that works specifically with NvCreateChatCompletionRequest
/// and is designed for use with C bindings. Uses the "generate" endpoint by default.
......@@ -974,22 +936,32 @@ pub async fn create_worker_selection_pipeline_chat(
.component(component_name)?;
let client = component.endpoint(GENERATE_ENDPOINT).client().await?;
let model_slug = Slug::from_string(model_name);
let card = match ModelDeploymentCard::load_from_store(
&Key::from_raw(model_slug.to_string()),
component.drt(),
)
.await
{
Ok(Some(card)) => card,
Ok(None) => anyhow::bail!("ModelDeploymentCard not found for model: {}", model_name),
Err(err) => anyhow::bail!(
"Error fetching ModelDeploymentCard from storage under key {model_slug}. {err}"
),
};
// Discover the model card by searching all instances with this model name
tracing::debug!("Looking for model: {}", model_name);
tracing::debug!("Namespace: {}", namespace);
use dynamo_llm::discovery::ModelWatcher;
let model_manager = std::sync::Arc::new(ModelManager::new());
let watcher = ModelWatcher::new(
component.drt().clone(),
model_manager.clone(),
router_mode,
kv_router_config,
busy_threshold,
);
let cards = watcher
.cards_for_model(model_name, Some(namespace), false)
.await
.with_context(|| format!("Failed to discover model: {}", model_name))?;
tracing::debug!("Found {} cards for model {}", cards.len(), model_name);
let card = cards.into_iter().next().ok_or_else(|| {
tracing::error!("No ModelDeploymentCard found for model: {}", model_name);
anyhow::anyhow!("ModelDeploymentCard not found for model: {}", model_name)
})?;
let chooser = if router_mode == RouterMode::KV {
let model_manager = std::sync::Arc::new(ModelManager::new());
Some(
model_manager
.kv_chooser_for(&component, card.kv_cache_block_size, kv_router_config)
......@@ -999,5 +971,52 @@ pub async fn create_worker_selection_pipeline_chat(
None
};
build_worker_selection_pipeline_chat(&card, &client, router_mode, busy_threshold, chooser).await
// Download model config files from HuggingFace for EPP
// The backend's card has NATS URLs which aren't accessible from EPP
tracing::debug!(
"Downloading model config files for EPP: {}",
card.display_name
);
let local_path = dynamo_llm::hub::from_hf(&card.display_name, true)
.await
.with_context(|| {
format!(
"Failed to download model config files for: {}",
card.display_name
)
})?;
// Load a fresh card from local files, then copy runtime config from original card
tracing::debug!("Loading ModelDeploymentCard from local path...");
let mut card_with_local_files = ModelDeploymentCard::load_from_disk(&local_path, None)
.with_context(|| format!("Failed to load card from disk: {:?}", local_path))?;
// Copy runtime settings from the backend's card
tracing::debug!("Copying runtime config from backend card...");
card_with_local_files.runtime_config = card.runtime_config.clone();
card_with_local_files.kv_cache_block_size = card.kv_cache_block_size;
card_with_local_files.context_length = card.context_length;
// Load the tokenizer from the downloaded files
tracing::debug!("Loading tokenizer from local files...");
let hf_tokenizer = card_with_local_files
.tokenizer_hf()
.with_context(|| format!("Failed to load tokenizer for: {}", card.display_name))?;
let engine = build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(
&card_with_local_files,
&client,
router_mode,
busy_threshold,
chooser,
hf_tokenizer,
None,
)
.await?;
Ok(engine)
}
......@@ -21,10 +21,6 @@ use crate::local_model::runtime_config::ModelRuntimeConfig;
use crate::model_type::{ModelInput, ModelType};
use anyhow::{Context, Result};
use derive_builder::Builder;
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::storage::key_value_store::{
EtcdStore, Key, KeyValueStore, KeyValueStoreManager,
};
use dynamo_runtime::{slug::Slug, storage::key_value_store::Versioned};
use serde::{Deserialize, Serialize};
use tokenizers::Tokenizer as HfTokenizer;
......@@ -373,30 +369,6 @@ impl ModelDeploymentCard {
matches!(self.model_input, ModelInput::Tokens)
}
/// Load a ModelDeploymentCard from storage the DistributedRuntime is configured to use.
/// Card should be fully local and ready to use when the call returns.
pub async fn load_from_store(
mdc_key: &Key,
drt: &DistributedRuntime,
) -> anyhow::Result<Option<Self>> {
let Some(etcd_client) = drt.etcd_client() else {
// Should be impossible because we only get here on an etcd event
anyhow::bail!("Missing etcd_client");
};
let store: Box<dyn KeyValueStore> = Box::new(EtcdStore::new(etcd_client));
let card_store = Arc::new(KeyValueStoreManager::new(store));
let Some(mut card) = card_store
.load::<ModelDeploymentCard>(ROOT_PATH, mdc_key)
.await?
else {
return Ok(None);
};
card.download_config().await?;
Ok(Some(card))
}
/// Download the files this card needs to work: config.json, tokenizer.json, etc.
pub async fn download_config(&mut self) -> anyhow::Result<()> {
if self.has_local_files() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment