"deploy/operator/internal/consts/consts.go" did not exist on "403344e53492bef1c5ba844912b80533e2fffcd7"
entrypoint.rs 3.33 KB
Newer Older
1
2
3
4
5
6
7
8
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! The entrypoint module provides tools to build a Dynamo runner.
//! - Create an EngineConfig of the engine (potentially auto-discovered) to execute
//! - Connect it to an Input

pub mod input;
9
pub use input::{build_routed_pipeline, build_routed_pipeline_with_preprocessor};
10

11
12
use std::future::Future;
use std::pin::Pin;
13
14
15
16
17
18
use std::sync::Arc;

use dynamo_runtime::pipeline::RouterMode;

use crate::{
    backend::ExecutionContext, engines::StreamingEngine, kv_router::KvRouterConfig,
19
20
    local_model::LocalModel, model_card::ModelDeploymentCard,
    types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine,
21
22
};

23
24
25
26
27
28
29
30
31
32
/// Callback type for engine factory (async)
pub type EngineFactoryCallback = Arc<
    dyn Fn(
            ModelDeploymentCard,
        ) -> Pin<
            Box<dyn Future<Output = anyhow::Result<OpenAIChatCompletionsStreamingEngine>> + Send>,
        > + Send
        + Sync,
>;

33
34
35
36
#[derive(Debug, Clone, Default)]
pub struct RouterConfig {
    pub router_mode: RouterMode,
    pub kv_router_config: KvRouterConfig,
37
38
39
40
    /// Threshold for active decode blocks utilization (0.0-1.0)
    pub active_decode_blocks_threshold: Option<f64>,
    /// Threshold for active prefill tokens utilization (literal token count)
    pub active_prefill_tokens_threshold: Option<u64>,
41
    pub enforce_disagg: bool,
42
43
44
45
46
47
48
}

impl RouterConfig {
    pub fn new(router_mode: RouterMode, kv_router_config: KvRouterConfig) -> Self {
        Self {
            router_mode,
            kv_router_config,
49
50
            active_decode_blocks_threshold: None,
            active_prefill_tokens_threshold: None,
51
            enforce_disagg: false,
52
53
        }
    }
54

55
56
57
58
59
60
61
    pub fn with_active_decode_blocks_threshold(mut self, threshold: Option<f64>) -> Self {
        self.active_decode_blocks_threshold = threshold;
        self
    }

    pub fn with_active_prefill_tokens_threshold(mut self, threshold: Option<u64>) -> Self {
        self.active_prefill_tokens_threshold = threshold;
62
63
        self
    }
64
65
66
67
68

    pub fn with_enforce_disagg(mut self, enforce_disagg: bool) -> Self {
        self.enforce_disagg = enforce_disagg;
        self
    }
69
70
}

71
#[derive(Clone)]
72
pub enum EngineConfig {
73
    /// Remote networked engines that we discover via etcd
74
75
76
77
    Dynamic {
        model: Box<LocalModel>,
        engine_factory: Option<EngineFactoryCallback>,
    },
78

79
80
    /// A Text engine receives text, does it's own tokenization and prompt formatting.
    InProcessText {
81
82
83
84
        engine: Arc<dyn StreamingEngine>,
        model: Box<LocalModel>,
    },

85
86
    /// A Tokens engine receives tokens, expects to be wrapped with pre/post processors that handle tokenization.
    InProcessTokens {
87
88
        engine: ExecutionContext,
        model: Box<LocalModel>,
89
        is_prefill: bool,
90
91
92
93
    },
}

impl EngineConfig {
94
    pub fn local_model(&self) -> &LocalModel {
95
96
        use EngineConfig::*;
        match self {
97
            Dynamic { model, .. } => model,
98
99
            InProcessText { model, .. } => model,
            InProcessTokens { model, .. } => model,
100
101
        }
    }
102
103
104
105
106
107
108

    pub fn engine_factory(&self) -> Option<&EngineFactoryCallback> {
        match self {
            EngineConfig::Dynamic { engine_factory, .. } => engine_factory.as_ref(),
            _ => None,
        }
    }
109
}