common.rs 13.2 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
use std::pin::Pin;
5
use std::time::Duration;
6

7
use crate::{
8
    backend::{Backend, ExecutionContext},
9
    discovery::{KvWorkerMonitor, ModelManager, ModelWatcher},
10
    engines::StreamingEngineAdapter,
11
    entrypoint::{EngineConfig, RouterConfig},
12
    http::service::metrics::Metrics,
13
14
15
    kv_router::{
        DirectRoutingRouter, KvPushRouter, KvRouter, PrefillRouter, metrics::RouterRequestMetrics,
    },
16
    migration::Migration,
17
    model_card::ModelDeploymentCard,
18
    namespace::NamespaceFilter,
19
    preprocessor::{OpenAIPreprocessor, prompt::PromptFormatter},
20
    protocols::common::llm_backend::{BackendOutput, LLMEngineOutput, PreprocessedRequest},
21
    request_template::RequestTemplate,
22
    types::{
23
        Annotated,
24
25
26
27
28
29
        openai::chat_completions::{
            NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
            OpenAIChatCompletionsStreamingEngine,
        },
    },
};
30

31
use anyhow::Context as _;
32
use dynamo_kv_router::config::min_initial_workers_from_env;
33
use dynamo_runtime::{
34
    DistributedRuntime,
35
    component::Client,
36
    engine::{AsyncEngineStream, Data},
37
38
39
40
    pipeline::{
        Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
        ServiceEngine, ServiceFrontend, SingleIn, Source,
    },
41
42
43
};
use std::sync::Arc;

44
45
46
47
pub struct PreparedEngine {
    pub service_name: String,
    pub engine: OpenAIChatCompletionsStreamingEngine,
    pub inspect_template: bool,
48
49
50
51
    pub card: Option<ModelDeploymentCard>,
    pub request_template: Option<RequestTemplate>,
}

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
async fn wait_for_min_initial_workers(
    client: &Client,
    min_initial_workers: usize,
) -> anyhow::Result<()> {
    if min_initial_workers == 0 {
        return Ok(());
    }

    if min_initial_workers == 1 {
        client.wait_for_instances().await?;
        return Ok(());
    }

    let mut watcher = client.instance_avail_watcher();
    loop {
        let available = watcher.borrow_and_update().len();
        if available >= min_initial_workers {
            return Ok(());
        }

        tokio::time::timeout(Duration::from_secs(120), watcher.changed())
            .await
            .map_err(|_| {
                anyhow::anyhow!(
                    "timed out waiting for {} initial workers for endpoint {}",
                    min_initial_workers,
                    client.endpoint.id()
                )
            })?
            .map_err(|_| {
                anyhow::anyhow!(
                    "instance watcher closed before {} workers appeared for endpoint {}",
                    min_initial_workers,
                    client.endpoint.id()
                )
            })?;
    }
}

91
92
93
94
95
96
97
98
impl PreparedEngine {
    pub fn has_tokenizer(&self) -> bool {
        if let Some(card) = self.card.as_ref() {
            card.has_tokenizer()
        } else {
            false
        }
    }
99
100
}

101
/// Turns an EngineConfig into an OpenAI chat-completions and completions supported StreamingEngine.
102
pub async fn prepare_engine(
103
    distributed_runtime: DistributedRuntime,
104
    engine_config: EngineConfig,
105
) -> anyhow::Result<PreparedEngine> {
106
    match engine_config {
107
108
109
        EngineConfig::Dynamic {
            model: local_model, ..
        } => {
110
            let model_manager = Arc::new(ModelManager::new());
111
112
            // Create metrics for migration tracking (not exposed via /metrics in Dynamic engine mode)
            let metrics = Arc::new(Metrics::new());
113
            let watch_obj = Arc::new(ModelWatcher::new(
114
                distributed_runtime.clone(),
115
                model_manager.clone(),
116
                RouterConfig::default(),
117
                local_model.migration_limit(),
118
                None,
119
                metrics,
120
            ));
121
122
123
124
125
126
127
            let discovery = distributed_runtime.discovery();
            let discovery_stream = discovery
                .list_and_watch(
                    dynamo_runtime::discovery::DiscoveryQuery::AllModels,
                    Some(distributed_runtime.primary_token().clone()),
                )
                .await?;
128
            let inner_watch_obj = watch_obj.clone();
129
130
131
132
            let namespace_filter = NamespaceFilter::from_namespace_and_prefix(
                local_model.namespace(),
                local_model.namespace_prefix(),
            );
133
            let _watcher_task = tokio::spawn(async move {
134
135
136
                inner_watch_obj
                    .watch(discovery_stream, namespace_filter)
                    .await;
137
            });
138
            tracing::info!("Waiting for remote model..");
139

140
141
142
143
144
            // TODO: We use the first model to appear, usually we have only one
            // We should add slash commands to text input `/model <name>` to choose,
            // '/models` to list, and notifications when models are added / removed.

            let model_service_name = watch_obj.wait_for_chat_model().await;
145
            tracing::info!("Connected to {model_service_name}");
146
            let engine = model_manager.get_chat_completions_engine(&model_service_name)?;
147
            Ok(PreparedEngine {
148
                service_name: model_service_name,
149
150
                engine,
                inspect_template: false,
151
152
                card: None,
                request_template: local_model.request_template(),
153
            })
154
        }
155
        EngineConfig::InProcessText { engine, model, .. } => {
156
            let service_name = model.service_name().to_string();
157
            tracing::debug!("Model: {service_name} with engine pre-processing");
158
            let engine = Arc::new(StreamingEngineAdapter::new(engine));
159
160
161
162
            Ok(PreparedEngine {
                service_name,
                engine,
                inspect_template: false,
163
164
                request_template: model.request_template(),
                card: Some(model.into_card()),
165
            })
166
        }
167
        EngineConfig::InProcessTokens {
168
            engine: inner_engine,
169
            model,
170
            ..
171
        } => {
172
173
174
            let pipeline = build_pipeline::<
                NvCreateChatCompletionRequest,
                NvCreateChatCompletionStreamResponse,
Nikita's avatar
Nikita committed
175
            >(model.card(), inner_engine, model.card().tokenizer()?)
176
            .await?;
177

178
            let service_name = model.service_name().to_string();
179
180
181
182
183
            tracing::debug!("Model: {service_name} with Dynamo pre-processing");
            Ok(PreparedEngine {
                service_name,
                engine: pipeline,
                inspect_template: true,
184
185
                request_template: model.request_template(),
                card: Some(model.into_card()),
186
            })
187
188
189
        }
    }
}
190
191
192
193

pub async fn build_pipeline<Req, Resp>(
    card: &ModelDeploymentCard,
    engine: ExecutionContext,
Nikita's avatar
Nikita committed
194
    tokenizer: crate::tokenizers::Tokenizer,
195
196
197
198
199
) -> anyhow::Result<Arc<ServiceFrontend<SingleIn<Req>, ManyOut<Annotated<Resp>>>>>
where
    Req: Data,
    Resp: Data,
    OpenAIPreprocessor: Operator<
200
201
202
203
204
            Context<Req>,
            Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
            Context<PreprocessedRequest>,
            Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
        >,
205
206
{
    let frontend = ServiceFrontend::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
207
208
    let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?;
    let preprocessor =
Nikita's avatar
Nikita committed
209
        OpenAIPreprocessor::new_with_parts(card.clone(), formatter, tokenizer.clone())?
210
            .into_operator();
Nikita's avatar
Nikita committed
211
    let backend = Backend::from_tokenizer(tokenizer).into_operator();
212
213
214
215
216
217
218
219
220
221
222
    let engine = ServiceBackend::from_engine(engine);

    Ok(frontend
        .link(preprocessor.forward_edge())?
        .link(backend.forward_edge())?
        .link(engine)?
        .link(backend.backward_edge())?
        .link(preprocessor.backward_edge())?
        .link(frontend)?)
}

223
#[allow(clippy::too_many_arguments)]
224
225
226
pub async fn build_routed_pipeline<Req, Resp>(
    card: &ModelDeploymentCard,
    client: &Client,
227
    model_manager: Arc<crate::discovery::ModelManager>,
228
    router_mode: RouterMode,
229
    worker_monitor: Option<KvWorkerMonitor>,
230
    chooser: Option<Arc<KvRouter>>,
Nikita's avatar
Nikita committed
231
    tokenizer: crate::tokenizers::Tokenizer,
232
    prefill_chooser: Option<Arc<PrefillRouter>>,
233
    enforce_disagg: bool,
234
    migration_limit: u32,
235
    metrics: Arc<Metrics>,
236
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
237
238
239
240
241
242
243
244
245
246
where
    Req: Data,
    Resp: Data,
    OpenAIPreprocessor: Operator<
            Context<Req>,
            Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
            Context<PreprocessedRequest>,
            Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
        >,
{
247
248
    let PromptFormatter::OAI(formatter) =
        PromptFormatter::from_mdc(card).context("PromptFormatter.from_mdc")?;
249
    let preprocessor =
Nikita's avatar
Nikita committed
250
        OpenAIPreprocessor::new_with_parts(card.clone(), formatter, tokenizer.clone())
251
            .context("OpenAIPreprocessor.new_with_parts")?;
252
253
254
    build_routed_pipeline_with_preprocessor(
        card,
        client,
255
        model_manager,
256
        router_mode,
257
        worker_monitor,
258
259
        chooser,
        preprocessor,
Nikita's avatar
Nikita committed
260
        tokenizer,
261
        prefill_chooser,
262
        enforce_disagg,
263
        migration_limit,
264
        metrics,
265
266
267
268
    )
    .await
}

269
#[allow(clippy::too_many_arguments)]
270
271
272
pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>(
    card: &ModelDeploymentCard,
    client: &Client,
273
    model_manager: Arc<crate::discovery::ModelManager>,
274
    router_mode: RouterMode,
275
    worker_monitor: Option<KvWorkerMonitor>,
276
277
    chooser: Option<Arc<KvRouter>>,
    preprocessor: Arc<OpenAIPreprocessor>,
Nikita's avatar
Nikita committed
278
    tokenizer: crate::tokenizers::Tokenizer,
279
    prefill_chooser: Option<Arc<PrefillRouter>>,
280
    enforce_disagg: bool,
281
    migration_limit: u32,
282
    metrics: Arc<Metrics>,
283
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
284
285
286
287
where
    Req: Data,
    Resp: Data,
    OpenAIPreprocessor: Operator<
288
289
290
291
292
            Context<Req>,
            Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
            Context<PreprocessedRequest>,
            Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
        >,
293
294
{
    let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
295
    let preprocessor_op = preprocessor.into_operator();
Nikita's avatar
Nikita committed
296
    let backend = Backend::from_tokenizer(tokenizer).into_operator();
297
    let migration = Migration::from_mdc(card, migration_limit, metrics).into_operator();
298
    let min_initial_workers = min_initial_workers_from_env()?;
299

300
301
302
303
304
305
306
307
308
309
    // For KV routing, use the client from the chooser to ensure shared state
    let router_client = if router_mode == RouterMode::KV {
        let Some(ref chooser) = chooser else {
            anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
        };
        chooser.client().clone()
    } else {
        client.clone()
    };

310
311
    wait_for_min_initial_workers(&router_client, min_initial_workers).await?;

312
    // Get threshold value and wrap monitor for PushRouter
313
314
315
316
    // Note: PushRouter uses active_decode_blocks_threshold for its internal logic
    let threshold_value = worker_monitor
        .as_ref()
        .map(|m| m.active_decode_blocks_threshold());
317
318
    let monitor_arc =
        worker_monitor.map(|m| Arc::new(m) as Arc<dyn dynamo_runtime::pipeline::WorkerLoadMonitor>);
319

320
321
    let router =
        PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold(
322
            router_client,
323
            router_mode,
324
325
            threshold_value,
            monitor_arc,
326
327
        )
        .await?;
328

329
330
331
332
333
334
    // Eagerly register router request metrics so they appear as zeros even in
    // non-KV modes (Direct, Random, RoundRobin) where KvPushRouter is never created.
    // In KV mode, KvPushRouter::new() also calls from_component() (idempotent via
    // OnceLock), which covers the standalone router path as well.
    RouterRequestMetrics::from_component(client.endpoint.component());

335
    let service_backend = match router_mode {
336
337
338
        RouterMode::Direct => {
            ServiceBackend::from_engine(Arc::new(DirectRoutingRouter::new(router)))
        }
339
340
341
342
        RouterMode::Random
        | RouterMode::RoundRobin
        | RouterMode::PowerOfTwoChoices
        | RouterMode::LeastLoaded => ServiceBackend::from_engine(Arc::new(router)),
343
344
345
346
        RouterMode::KV => {
            let Some(chooser) = chooser else {
                anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
            };
347
            ServiceBackend::from_engine(Arc::new(KvPushRouter::new(router, chooser)))
348
349
350
        }
    };

351
    // Use the provided prefill chooser, or create a disabled one if not provided
352
    let prefill_chooser = prefill_chooser
353
        .unwrap_or_else(|| PrefillRouter::disabled(model_manager, router_mode, enforce_disagg));
354
355
356
    let prefill_op = prefill_chooser.into_operator();

    // Link with prefill chooser including backward edge for response flow
357
    let engine = frontend
358
        .link(preprocessor_op.forward_edge())?
359
        .link(migration.forward_edge())?
360
        .link(backend.forward_edge())?
361
        .link(prefill_op.forward_edge())?
362
        .link(service_backend)?
363
        .link(prefill_op.backward_edge())?
364
        .link(backend.backward_edge())?
365
        .link(migration.backward_edge())?
366
        .link(preprocessor_op.backward_edge())?
367
        .link(frontend)?;
368

369
370
    Ok(engine)
}