Unverified Commit 6a1a801c authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat: Support static workers, run without etcd. (#2281)

parent 0802ecd9
......@@ -8,9 +8,19 @@
# - Auto-discovery: Watches etcd for engine/worker registration (via `register_llm`).
# - Pre-processor: Prompt templating and tokenization.
# - Router, defaulting to round-robin (TODO: Add flags to enable KV routing).
#
# Pass `--interactive` or `-i` for text chat instead of HTTP server.
#
# For static mode (no etcd auto-discovery):
# - python -m dynamo.frontend --model-name Qwen3-0.6B-Q8_0.gguf --model-path ~/llms/Qwen3-0.6B --static-endpoint dynamo.backend.generate
# Worker example:
# - cd lib/bindings/python/examples/hello_world
# - python server_sglang_static.py
import argparse
import asyncio
import os
import re
import uvloop
......@@ -26,6 +36,36 @@ from dynamo.llm import (
from dynamo.runtime import DistributedRuntime
def validate_static_endpoint(value):
"""Validate that static-endpoint is three words separated by dots."""
if not re.match(
r"^[a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*$",
value,
):
raise argparse.ArgumentTypeError(
f"static-endpoint must be three words separated by dots, got: {value}"
)
return value
def validate_model_name(value):
"""Validate that model-name is a non-empty string."""
if not value or not isinstance(value, str) or len(value.strip()) == 0:
raise argparse.ArgumentTypeError(
f"model-name must be a non-empty string, got: {value}"
)
return value.strip()
def validate_model_path(value):
"""Validate that model-path is a valid directory on disk."""
if not os.path.isdir(value):
raise argparse.ArgumentTypeError(
f"model-path must be a valid directory on disk, got: {value}"
)
return value
def parse_args():
parser = argparse.ArgumentParser(
description="Dynamo Frontend: HTTP+Pre-processor+Router",
......@@ -72,13 +112,35 @@ def parse_args():
help=" KV Router. Disable KV events.",
)
parser.set_defaults(use_kv_events=True)
parser.add_argument(
"--static-endpoint",
type=validate_static_endpoint,
help="Static endpoint in format: word.word.word (e.g., dynamo.backend.generate)",
)
parser.add_argument(
"--model-name",
type=validate_model_name,
help="Model name as a string (e.g., 'Llama-3.2-1B-Instruct')",
)
parser.add_argument(
"--model-path",
type=validate_model_path,
help="Path to model directory on disk (e.g., /tmp/model_cache/lama3.2_1B/)",
)
return parser.parse_args()
flags = parser.parse_args()
if flags.static_endpoint and (not flags.model_name or not flags.model_path):
parser.error("--static-endpoint requires both --model-name and --model-path")
return flags
async def async_main():
runtime = DistributedRuntime(asyncio.get_running_loop(), False)
flags = parse_args()
is_static = bool(flags.static_endpoint) # true if the string has a value
runtime = DistributedRuntime(asyncio.get_running_loop(), is_static)
if flags.router_mode == "kv":
router_mode = RouterMode.KV
......@@ -100,8 +162,20 @@ async def async_main():
"router_config": RouterConfig(router_mode, kv_router_config),
}
# out=dyn
e = EntrypointArgs(EngineType.Dynamic, **kwargs)
if flags.static_endpoint:
kwargs["endpoint_id"] = flags.static_endpoint
if flags.model_name:
kwargs["model_name"] = flags.model_name
if flags.model_path:
kwargs["model_path"] = flags.model_path
if is_static:
# out=dyn://<static_endpoint>
engine_type = EngineType.Static
else:
# out=auto, most common
engine_type = EngineType.Dynamic
e = EntrypointArgs(engine_type, **kwargs)
engine = await make_engine(runtime, e)
try:
......
......@@ -93,7 +93,7 @@ You will need [etcd](https://etcd.io/) and [nats](https://nats.io) with jetstrea
**Node 1:** OpenAI compliant HTTP server, optional pre-processing, worker discovery:
```
dynamo-run in=http out=dyn
dynamo-run in=http out=auto
```
**Node 2:** Vllm engine. Receives and returns requests over the network:
......@@ -141,7 +141,18 @@ Example 4: Multiple component in a pipeline.
In the P/D disaggregated setup you would have `deepseek-distill-llama8b.prefill.generate` (possibly multiple instances of this) and `deepseek-distill-llama8b.decode.generate`.
For output it is always only `out=dyn`. This tells Dynamo to auto-discover the instances, group them by model, and load balance appropriately (depending on `--router-mode` flag). The old syntax of `dyn://...` is still accepted for backwards compatibility.
For output it is always only `out=auto`. This tells Dynamo to auto-discover the instances, group them by model, and load balance appropriately (depending on `--router-mode` flag). The exception is static workers, see that section.
### Static workers without etcd
Normally in the distributed system the frontend uses etcd to discover workers. The option exists to have a static endpoint without etcd.
```
Node 1: dynamo-run in=http out=dyn://dynamo.backend.generate --model-name Qwen3-0.6B-Q8_0.gguf --model-path ~/llms/Qwen3-0.6B
Node 2: dynamo-run in=dyn://dynamo.backend.generate out=llamacpp ~/llms/Qwen3-0.6B-Q8_0.gguf --static-worker --context-length 4096
```
Note how `out=` points to a single endpoint, which must match the worker. The model's name and config (to do pre-processing) are usually discovered by the frontend via etcd. Now we must pass them in (`--model-name` and `--model-path`).
### KV-aware routing
......@@ -194,7 +205,7 @@ dynamo-run in=dyn://dynamo.endpoint.generate out=vllm /data/llms/Qwen/Qwen3-4B
**Start the ingress node**
```
dynamo-run in=http out=dyn --router-mode kv
dynamo-run in=http out=auto --router-mode kv
```
The only difference from the distributed system above is `--router-mode kv`. The patched vllm announces when a KV block is created or removed. The Dynamo router run finds the worker with the best match for those KV blocks and directs the traffic to that node.
......@@ -569,7 +580,7 @@ And below are arguments that are mocker-specific:
```bash
echo '{"speedup_ratio": 10.0}' > mocker_args.json
dynamo-run in=dyn://dynamo.mocker.generate out=mocker --model-path TinyLlama/TinyLlama-1.1B-Chat-v1.0 --extra-engine-args mocker_args.json
dynamo-run in=http out=dyn --router-mode kv
dynamo-run in=http out=auto --router-mode kv
```
### Extra engine arguments
......
......@@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use clap::ValueEnum;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::RouterConfig;
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_llm::local_model::LocalModel;
......@@ -127,6 +128,12 @@ pub struct Flags {
#[arg(long, value_parser = clap::value_parser!(u32).range(0..1024))]
pub migration_limit: Option<u32>,
/// Make this a static worker.
/// Do not connect to or advertise self on etcd.
/// in=dyn://x.y.z only
#[arg(long, default_value = "false")]
pub static_worker: bool,
/// Everything after a `--`.
/// These are the command line arguments to the python engine when using `pystr` or `pytok`.
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
......@@ -136,9 +143,23 @@ pub struct Flags {
impl Flags {
/// For each Output variant, check if it would be able to run.
/// This takes validation out of the main engine creation path.
pub fn validate(&self, local_model: &LocalModel, out_opt: &Output) -> anyhow::Result<()> {
pub fn validate(
&self,
local_model: &LocalModel,
in_opt: &Input,
out_opt: &Output,
) -> anyhow::Result<()> {
match in_opt {
Input::Endpoint(_) => {}
_ => {
if self.static_worker {
anyhow::bail!("'--static-worker true' only applies to in=dyn://x.y.z");
}
}
}
match out_opt {
Output::Dynamic => {
Output::Auto => {
if self.context_length.is_some() {
anyhow::bail!("'--context-length' flag should only be used on the worker node, not on the ingress");
}
......@@ -149,6 +170,19 @@ impl Flags {
anyhow::bail!("'--migration-limit' flag should only be used on the worker node, not on the ingress");
}
}
Output::Static(_) => {
if self.model_name.is_none()
|| self
.model_path_pos
.as_ref()
.or(self.model_path_flag.as_ref())
.is_none()
{
anyhow::bail!(
"out=dyn://<path> requires --model-name and --model-path, which are the name and path on disk of the model we expect to serve."
);
}
}
Output::EchoFull => {}
Output::EchoCore => {
if !local_model.card().has_tokenizer() {
......
......@@ -5,6 +5,7 @@ use anyhow::Context as _;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::EngineConfig;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::distributed::DistributedConfig;
use dynamo_runtime::CancellationToken;
use dynamo_runtime::{DistributedRuntime, Runtime};
......@@ -50,9 +51,13 @@ pub async fn run(
if let Input::Endpoint(path) = &in_opt {
builder.endpoint_id(Some(path.parse().with_context(|| path.clone())?));
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let dst_config = DistributedConfig::from_settings(flags.static_worker);
let distributed_runtime = DistributedRuntime::new(runtime.clone(), dst_config).await?;
rt = Either::Right(distributed_runtime);
};
if let Some(Output::Static(path)) = &out_opt {
builder.endpoint_id(Some(path.parse().with_context(|| path.clone())?));
}
let local_model = builder.build().await?;
......@@ -64,7 +69,7 @@ pub async fn run(
print_cuda(&out_opt);
// Now that we know the output we're targeting, check if we expect it to work
flags.validate(&local_model, &out_opt)?;
flags.validate(&local_model, &in_opt, &out_opt)?;
// Make an engine from the local_model, flags and output.
let engine_config = engine_for(
......@@ -94,24 +99,35 @@ async fn engine_for(
rt: Either<Runtime, DistributedRuntime>,
) -> anyhow::Result<EngineConfig> {
match out_opt {
Output::Dynamic => Ok(EngineConfig::Dynamic(Box::new(local_model))),
Output::Auto => {
// Auto-discover backends
Ok(EngineConfig::Dynamic(Box::new(local_model)))
}
Output::Static(_) => {
// A single static backend, no etcd
Ok(EngineConfig::StaticRemote(Box::new(local_model)))
}
Output::EchoFull => Ok(EngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(),
is_static: flags.static_worker,
}),
Output::EchoCore => Ok(EngineConfig::StaticCore {
engine: dynamo_llm::engines::make_engine_core(),
model: Box::new(local_model),
is_static: flags.static_worker,
}),
#[cfg(feature = "mistralrs")]
Output::MistralRs => Ok(EngineConfig::StaticFull {
engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
model: Box::new(local_model),
is_static: flags.static_worker,
}),
#[cfg(feature = "llamacpp")]
Output::LlamaCpp => Ok(EngineConfig::StaticCore {
engine: dynamo_engine_llamacpp::make_engine(cancel_token, &local_model).await?,
model: Box::new(local_model),
is_static: flags.static_worker,
}),
Output::Mocker => {
let Either::Right(drt) = rt else {
......@@ -127,6 +143,7 @@ async fn engine_for(
Ok(EngineConfig::StaticCore {
engine,
model: Box::new(local_model),
is_static: flags.static_worker,
})
}
}
......
......@@ -24,7 +24,7 @@ Example:
- OR: ./dynamo-run /data/models/Llama-3.2-1B-Instruct-Q4_K_M.gguf
"#;
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv] [--kv-overlap-score-weight=2.0] [--kv-gpu-cache-usage-weight=1.0] [--kv-waiting-requests-weight=1.0] [--migration-limit=0] [--verbosity (-v|-vv)]";
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|auto|dyn://<path> [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--static-worker] [--router-mode random|round-robin|kv] [--kv-overlap-score-weight=2.0] [--kv-gpu-cache-usage-weight=1.0] [--kv-waiting-requests-weight=1.0] [--migration-limit=0] [--verbosity (-v|-vv)]";
fn main() -> anyhow::Result<()> {
// Set log level based on verbosity flag
......@@ -134,5 +134,5 @@ fn is_in_dynamic(in_opt: &Input) -> bool {
}
fn is_out_dynamic(out_opt: &Option<Output>) -> bool {
matches!(out_opt, Some(Output::Dynamic))
matches!(out_opt, Some(Output::Auto) | Some(Output::Static(_)))
}
......@@ -12,7 +12,14 @@ pub enum Output {
EchoCore,
/// Listen for models on nats/etcd, add/remove dynamically
Dynamic,
Auto,
/// Static remote: The dyn://namespace.component.endpoint name of a remote worker we expect to
/// exists. THIS DISABLES AUTO-DISCOVERY. Only this endpoint will be connected.
/// `--model-name and `--model-path` must also be set.
///
/// A static remote setup avoids having to run etcd.
Static(String),
#[cfg(feature = "mistralrs")]
/// Run inference on a model in a GGUF file using mistralrs w/ candle
......@@ -40,15 +47,11 @@ impl TryFrom<&str> for Output {
"echo_full" => Ok(Output::EchoFull),
"echo_core" => Ok(Output::EchoCore),
"dyn" => Ok(Output::Dynamic),
"dyn" | "auto" => Ok(Output::Auto),
// Deprecated, should only use `out=dyn`
endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => {
tracing::warn!(
"out=dyn://<path> is deprecated, the path is not used. Please use 'out=dyn'"
);
//let path = endpoint_path.strip_prefix(ENDPOINT_SCHEME).unwrap();
Ok(Output::Dynamic)
let path = endpoint_path.strip_prefix(ENDPOINT_SCHEME).unwrap();
Ok(Output::Static(path.to_string()))
}
e => Err(anyhow::anyhow!("Invalid out= option '{e}'")),
......@@ -69,7 +72,8 @@ impl fmt::Display for Output {
Output::EchoFull => "echo_full",
Output::EchoCore => "echo_core",
Output::Dynamic => "dyn",
Output::Auto => "auto",
Output::Static(endpoint) => &format!("{ENDPOINT_SCHEME}{endpoint}"),
};
write!(f, "{s}")
}
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Static version of server_sglang.py - see there for most details.
#
# The key differences between this and `server_sglang.py` are:
# - We do not call register_llm to advertise ourself in etcd. There is no etcd.
# - The frontend must know up-front all the details for the model: name, pre-processor path, and type.
#
# Window 1: `python server_sglang_static.py`. Wait for log "Starting endpoint".
# Window 2: `dynamo-run out=dyn://dynamo.backend.generate --model-name "Qwen/Qwen3-0.6B" --model-path <hf_path> --model-type Backend
import argparse
import asyncio
import sys
import sglang
import uvloop
from sglang.srt.server_args import ServerArgs
from dynamo.runtime import DistributedRuntime, dynamo_worker
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
DEFAULT_TEMPERATURE = 0.7
class Config:
"""Command line parameters or defaults"""
namespace: str
component: str
endpoint: str
model: str
class RequestHandler:
"""
Request handler for the generate endpoint
"""
def __init__(self, engine):
self.engine_client = engine
async def generate(self, request):
# print(f"Received request: {request}")
sampling_params = {
"temperature": request["sampling_options"]["temperature"]
or DEFAULT_TEMPERATURE,
# sglang defaults this to 128
"max_new_tokens": request["stop_conditions"]["max_tokens"],
}
num_output_tokens_so_far = 0
gen = await self.engine_client.async_generate(
input_ids=request["token_ids"], sampling_params=sampling_params, stream=True
)
async for res in gen:
# res is a dict
finish_reason = res["meta_info"]["finish_reason"]
if finish_reason:
# Don't forward the stop token
out = {"token_ids": [], "finish_reason": finish_reason["type"]}
next_total_toks = num_output_tokens_so_far
else:
next_total_toks = len(res["output_ids"])
out = {"token_ids": res["output_ids"][num_output_tokens_so_far:]}
yield out
num_output_tokens_so_far = next_total_toks
@dynamo_worker(static=True)
async def worker(runtime: DistributedRuntime):
await init(runtime, cmd_line_args())
async def init(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
endpoint = component.endpoint(config.endpoint)
engine_args = ServerArgs(
model_path=config.model,
skip_tokenizer_init=True,
)
engine_client = sglang.Engine(server_args=engine_args)
# the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
# after the lease is revoked
await endpoint.serve_endpoint(RequestHandler(engine_client).generate)
def cmd_line_args():
parser = argparse.ArgumentParser(
description="SGLang server integrated with Dynamo runtime."
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Default: {DEFAULT_ENDPOINT}",
)
parser.add_argument(
"--model",
type=str,
default=DEFAULT_MODEL,
help=f"Path to disk model or HuggingFace model identifier to load. Default: {DEFAULT_MODEL}",
)
args = parser.parse_args()
config = Config()
config.model = args.model
endpoint_str = args.endpoint.replace("dyn://", "", 1)
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
print(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
parsed_namespace, parsed_component_name, parsed_endpoint_name = endpoint_parts
config.namespace = parsed_namespace
config.component = parsed_component_name
config.endpoint = parsed_endpoint_name
return config
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import uvloop
from dynamo.runtime import DistributedRuntime, dynamo_worker
class RequestHandler:
"""
Request handler for the generate endpoint
"""
async def generate(self, request):
print(f"Received request: {request}")
for char in request:
yield char
@dynamo_worker(static=True)
async def worker(runtime: DistributedRuntime):
await init(runtime, "dynamo")
async def init(runtime: DistributedRuntime, ns: str):
"""
Instantiate a `backend` component and serve the `generate` endpoint
A `Component` can serve multiple endpoints
"""
component = runtime.namespace(ns).component("backend")
await component.create_service()
endpoint = component.endpoint("generate")
print("Started server instance")
await endpoint.serve_endpoint(RequestHandler().generate)
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
......@@ -23,6 +23,7 @@ pub enum EngineType {
Echo = 1,
Dynamic = 2,
Mocker = 3,
Static = 4,
}
#[pyclass]
......@@ -178,9 +179,11 @@ async fn select_engine(
RsEngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(),
is_static: false,
}
}
EngineType::Dynamic => RsEngineConfig::Dynamic(Box::new(local_model)),
EngineType::Static => RsEngineConfig::StaticRemote(Box::new(local_model)),
EngineType::Mocker => {
let mocker_args = if let Some(extra_args_path) = args.extra_engine_args {
MockEngineArgs::from_json_file(&extra_args_path).map_err(|e| {
......@@ -209,6 +212,7 @@ async fn select_engine(
RsEngineConfig::StaticCore {
engine,
model: Box::new(local_model),
is_static: false,
}
}
};
......
......@@ -18,16 +18,20 @@ use dynamo_runtime::{
use crate::{
backend::Backend,
kv_router::{KvPushRouter, KvRouterConfig},
migration::Migration,
entrypoint,
kv_router::KvRouterConfig,
model_type::ModelType,
preprocessor::{OpenAIPreprocessor, PreprocessedEmbeddingRequest, PreprocessedRequest},
protocols::common::llm_backend::{EmbeddingsEngineOutput, LLMEngineOutput},
protocols::openai::chat_completions::{
preprocessor::{OpenAIPreprocessor, PreprocessedEmbeddingRequest},
protocols::{
common::llm_backend::EmbeddingsEngineOutput,
openai::{
chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
},
protocols::openai::completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
protocols::openai::embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse},
},
},
};
use super::{ModelEntry, ModelManager, MODEL_ROOT_PATH};
......@@ -197,93 +201,36 @@ impl ModelWatcher {
// function. Needs checking carefully, possibly we need to store it in state.
let _cache_dir = Some(card.move_from_nats(self.drt.nats_client()).await?);
// Chat Completions
let frontend = SegmentSource::<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
>::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
let router =
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
client.clone(),
self.router_mode,
)
.await?;
let service_backend = match self.router_mode {
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
ServiceBackend::from_engine(Arc::new(router))
}
RouterMode::KV => {
let chooser = self
.manager
let kv_chooser = if self.router_mode == RouterMode::KV {
Some(
self.manager
.kv_chooser_for(
&model_entry.name,
&component,
card.kv_cache_block_size,
self.kv_router_config,
)
.await?;
let kv_push_router = KvPushRouter::new(router, chooser);
ServiceBackend::from_engine(Arc::new(kv_push_router))
}
.await?,
)
} else {
None
};
let chat_engine = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(migration.forward_edge())?
.link(service_backend)?
.link(migration.backward_edge())?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
let chat_engine =
entrypoint::build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, &client, self.router_mode, kv_chooser.clone())
.await?;
self.manager
.add_chat_completions_model(&model_entry.name, chat_engine)?;
// Completions
let frontend = SegmentSource::<
SingleIn<NvCreateCompletionRequest>,
ManyOut<Annotated<NvCreateCompletionResponse>>,
>::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
let router =
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
client,
self.router_mode,
)
.await?;
let service_backend = match self.router_mode {
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
ServiceBackend::from_engine(Arc::new(router))
}
RouterMode::KV => {
let chooser = self
.manager
.kv_chooser_for(
&model_entry.name,
&component,
card.kv_cache_block_size,
self.kv_router_config,
)
let completions_engine =
entrypoint::build_routed_pipeline::<
NvCreateCompletionRequest,
NvCreateCompletionResponse,
>(&card, &client, self.router_mode, kv_chooser)
.await?;
let kv_push_router = KvPushRouter::new(router, chooser);
ServiceBackend::from_engine(Arc::new(kv_push_router))
}
};
let completions_engine = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(migration.forward_edge())?
.link(service_backend)?
.link(migration.backward_edge())?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
self.manager
.add_completions_model(&model_entry.name, completions_engine)?;
}
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::sync::Arc;
......
......@@ -6,6 +6,7 @@
//! - Connect it to an Input
pub mod input;
pub use input::build_routed_pipeline;
use std::sync::Arc;
......@@ -33,19 +34,24 @@ impl RouterConfig {
#[derive(Clone)]
pub enum EngineConfig {
/// Remote networked engines
/// Remote networked engines that we discover via etcd
Dynamic(Box<LocalModel>),
/// Remote networked engines that we know about at startup
StaticRemote(Box<LocalModel>),
/// A Full service engine does it's own tokenization and prompt formatting.
StaticFull {
engine: Arc<dyn StreamingEngine>,
model: Box<LocalModel>,
is_static: bool,
},
/// A core engine expects to be wrapped with pre/post processors that handle tokenization.
StaticCore {
engine: ExecutionContext,
model: Box<LocalModel>,
is_static: bool,
},
}
......@@ -54,6 +60,7 @@ impl EngineConfig {
use EngineConfig::*;
match self {
Dynamic(lm) => lm,
StaticRemote(lm) => lm,
StaticFull { model, .. } => model,
StaticCore { model, .. } => model,
}
......
......@@ -16,6 +16,7 @@ use std::{
pub mod batch;
mod common;
pub use common::build_routed_pipeline;
pub mod endpoint;
pub mod http;
pub mod text;
......
......@@ -7,10 +7,12 @@ use crate::{
backend::{Backend, ExecutionContext},
discovery::{ModelManager, ModelWatcher, MODEL_ROOT_PATH},
engines::StreamingEngineAdapter,
entrypoint::EngineConfig,
entrypoint::{self, EngineConfig},
kv_router::{KvPushRouter, KvRouter},
migration::Migration,
model_card::ModelDeploymentCard,
preprocessor::OpenAIPreprocessor,
protocols::common::llm_backend::{BackendOutput, PreprocessedRequest},
protocols::common::llm_backend::{BackendOutput, LLMEngineOutput, PreprocessedRequest},
request_template::RequestTemplate,
types::{
openai::chat_completions::{
......@@ -21,8 +23,13 @@ use crate::{
},
};
use dynamo_runtime::{
component::Client,
distributed::DistributedConfig,
engine::{AsyncEngineStream, Data},
pipeline::{Context, ManyOut, Operator, ServiceBackend, ServiceFrontend, SingleIn, Source},
pipeline::{
Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
ServiceEngine, ServiceFrontend, SingleIn, Source,
},
DistributedRuntime, Runtime,
};
use std::sync::Arc;
......@@ -78,6 +85,7 @@ pub async fn prepare_engine(
// '/models` to list, and notifications when models are added / removed.
let model_service_name = watch_obj.wait_for_chat_model().await;
tracing::info!("Connected to {model_service_name}");
let engine = model_manager.get_chat_completions_engine(&model_service_name)?;
Ok(PreparedEngine {
service_name: model_service_name,
......@@ -87,7 +95,56 @@ pub async fn prepare_engine(
request_template: local_model.request_template(),
})
}
EngineConfig::StaticFull { engine, model } => {
EngineConfig::StaticRemote(local_model) => {
// For now we only do ModelType.Backend
// For batch/text we only do Chat Completions
// The card should have been loaded at 'build' phase earlier
let card = local_model.card();
let router_mode = local_model.router_config().router_mode;
let dst_config = DistributedConfig::from_settings(true);
let distributed_runtime = DistributedRuntime::new(runtime, dst_config).await?;
let endpoint_id = local_model.endpoint_id();
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
let client = component.endpoint(&endpoint_id.name).client().await?;
let kv_chooser = if router_mode == RouterMode::KV {
let model_manager = Arc::new(ModelManager::new());
Some(
model_manager
.kv_chooser_for(
local_model.display_name(),
&component,
card.kv_cache_block_size,
Some(local_model.router_config().kv_router_config),
)
.await?,
)
} else {
None
};
let chat_engine = entrypoint::build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(card, &client, router_mode, kv_chooser.clone())
.await?;
let service_name = local_model.service_name().to_string();
tracing::info!("Static connecting to {service_name}");
Ok(PreparedEngine {
service_name,
engine: chat_engine,
inspect_template: false,
request_template: local_model.request_template(),
card: Some(local_model.into_card()),
})
}
EngineConfig::StaticFull { engine, model, .. } => {
let service_name = model.service_name().to_string();
tracing::debug!("Model: {service_name} with engine pre-processing");
let engine = Arc::new(StreamingEngineAdapter::new(engine));
......@@ -102,6 +159,7 @@ pub async fn prepare_engine(
EngineConfig::StaticCore {
engine: inner_engine,
model,
..
} => {
let pipeline = build_pipeline::<
NvCreateChatCompletionRequest,
......@@ -152,6 +210,56 @@ where
.link(frontend)?)
}
pub async fn build_routed_pipeline<Req, Resp>(
card: &ModelDeploymentCard,
client: &Client,
router_mode: RouterMode,
chooser: Option<Arc<KvRouter>>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
where
Req: Data,
Resp: Data,
OpenAIPreprocessor: Operator<
Context<Req>,
Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
Context<PreprocessedRequest>,
Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
>,
{
let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
let router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
client.clone(),
router_mode,
)
.await?;
let service_backend = match router_mode {
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
ServiceBackend::from_engine(Arc::new(router))
}
RouterMode::KV => {
let Some(chooser) = chooser else {
anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
};
let kv_push_router = KvPushRouter::new(router, chooser);
ServiceBackend::from_engine(Arc::new(kv_push_router))
}
};
let engine = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(migration.forward_edge())?
.link(service_backend)?
.link(migration.backward_edge())?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
Ok(engine)
}
#[cfg(test)]
mod tests {
use super::*;
......
......@@ -42,14 +42,20 @@ pub async fn run(
let (rt_fut, card): (Pin<Box<dyn Future<Output = _> + Send + 'static>>, _) = match engine_config
{
EngineConfig::StaticFull { engine, mut model } => {
EngineConfig::StaticFull {
engine,
mut model,
is_static,
} => {
let engine = Arc::new(StreamingEngineAdapter::new(engine));
let ingress_chat = Ingress::<
Context<NvCreateChatCompletionRequest>,
Pin<Box<dyn AsyncEngineStream<Annotated<NvCreateChatCompletionStreamResponse>>>>,
>::for_engine(engine)?;
if !is_static {
model.attach(&endpoint, ModelType::Chat).await?;
}
let fut_chat = endpoint.endpoint_builder().handler(ingress_chat).start();
(Box::pin(fut_chat), Some(model.card().clone()))
......@@ -57,6 +63,7 @@ pub async fn run(
EngineConfig::StaticCore {
engine: inner_engine,
mut model,
is_static,
} => {
// Pre-processing is done ingress-side, so it should be already done.
let frontend = SegmentSource::<
......@@ -74,11 +81,16 @@ pub async fn run(
.link(frontend)?;
let ingress = Ingress::for_pipeline(pipeline)?;
if !is_static {
model.attach(&endpoint, ModelType::Backend).await?;
}
let fut = endpoint.endpoint_builder().handler(ingress).start();
(Box::pin(fut), Some(model.card().clone()))
}
EngineConfig::StaticRemote(_) => {
panic!("StaticRemote definitions are only for the frontend end node.");
}
EngineConfig::Dynamic(_) => {
unreachable!("An endpoint input will never have a Dynamic engine");
}
......
......@@ -6,18 +6,16 @@ use std::sync::Arc;
use crate::{
discovery::{ModelManager, ModelWatcher, MODEL_ROOT_PATH},
engines::StreamingEngineAdapter,
entrypoint::{input::common, EngineConfig},
entrypoint::{self, input::common, EngineConfig},
http::service::service_v2,
kv_router::KvRouterConfig,
types::{
openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
},
openai::completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
types::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
},
};
use dynamo_runtime::pipeline::RouterMode;
use dynamo_runtime::transports::etcd;
use dynamo_runtime::{distributed::DistributedConfig, pipeline::RouterMode};
use dynamo_runtime::{DistributedRuntime, Runtime};
/// Build and run an HTTP service
......@@ -55,7 +53,50 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
}
}
}
EngineConfig::StaticFull { engine, model } => {
EngineConfig::StaticRemote(local_model) => {
let card = local_model.card();
let router_mode = local_model.router_config().router_mode;
let dst_config = DistributedConfig::from_settings(true);
let distributed_runtime = DistributedRuntime::new(runtime.clone(), dst_config).await?;
let manager = http_service.model_manager();
let endpoint_id = local_model.endpoint_id();
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
let client = component.endpoint(&endpoint_id.name).client().await?;
let kv_chooser = if router_mode == RouterMode::KV {
Some(
manager
.kv_chooser_for(
local_model.display_name(),
&component,
card.kv_cache_block_size,
Some(local_model.router_config().kv_router_config),
)
.await?,
)
} else {
None
};
let chat_engine = entrypoint::build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(card, &client, router_mode, kv_chooser.clone())
.await?;
manager.add_chat_completions_model(local_model.display_name(), chat_engine)?;
let completions_engine = entrypoint::build_routed_pipeline::<
NvCreateCompletionRequest,
NvCreateCompletionResponse,
>(card, &client, router_mode, kv_chooser)
.await?;
manager.add_completions_model(local_model.display_name(), completions_engine)?;
}
EngineConfig::StaticFull { engine, model, .. } => {
let engine = Arc::new(StreamingEngineAdapter::new(engine));
let manager = http_service.model_manager();
manager.add_completions_model(model.service_name(), engine.clone())?;
......@@ -64,6 +105,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
EngineConfig::StaticCore {
engine: inner_engine,
model,
..
} => {
let manager = http_service.model_manager();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment