subprocess.rs 3.24 KB
Newer Older
Graham King's avatar
Graham King committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pyo3::{types::IntoPyDict, Python};
17
use std::env;
18
19
use std::ffi::CString;
use std::path::{Path, PathBuf};
Graham King's avatar
Graham King committed
20

21
use dynamo_llm::engines::MultiNodeConfig;
22

23
const PY_START_ENGINE: &str = include_str!("vllm_inc.py");
Graham King's avatar
Graham King committed
24
25
26
27
28
29
30

/// Start the Python vllm engine that listens on zmq socket
/// This is called by running `<bin> --internal-vllm-process
/// This does not return until vllm exits.
pub fn run_subprocess(
    socket_id: &str,
    model_path: &Path,
31
32
    node_config: MultiNodeConfig,
    tp_size: u32,
33
    extra_engine_args: Option<PathBuf>,
34
    with_kv_routing: bool,
Graham King's avatar
Graham King committed
35
) -> anyhow::Result<()> {
36
37
38
    if with_kv_routing {
        set_kv_routing_vars()?;
    }
Graham King's avatar
Graham King committed
39
    pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
40
    if let Ok(venv) = env::var("VIRTUAL_ENV") {
41
        let _ = Python::with_gil(|py| crate::fix_venv(venv, py));
42
    }
Graham King's avatar
Graham King committed
43
    let model_path_str = model_path.display().to_string();
44
45
46
    let extra_engine_args_str = &extra_engine_args
        .map(|p| p.display().to_string())
        .unwrap_or_default();
Graham King's avatar
Graham King committed
47
48
49
50
    Python::with_gil(|py| {
        let locals = [
            ("socket_id", socket_id),
            ("model_path", model_path_str.as_str()),
51
52
            ("tp_size_str", &tp_size.to_string()),
            ("nnodes_str", &node_config.num_nodes.to_string()),
53
            ("extra_engine_args", extra_engine_args_str),
54
            ("enable_prefix_caching", &with_kv_routing.to_string()),
Graham King's avatar
Graham King committed
55
56
57
        ]
        .into_py_dict(py)
        .unwrap();
58
        if let Err(err) = py.run(CString::new(PY_START_ENGINE)?.as_ref(), None, Some(&locals)) {
Graham King's avatar
Graham King committed
59
60
61
62
63
64
            anyhow::bail!("vllm engine run error: {err}");
        }
        tracing::info!("vllm subprocess exit");
        Ok(())
    })
}
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

// These environment variables trigger our vllm patch to emit KV routing events
fn set_kv_routing_vars() -> anyhow::Result<()> {
    let exe = env::current_exe()?;
    let exe_dir = exe
        .parent()
        .ok_or(anyhow::anyhow!("Current binary has no directory"))?;
    let mut lib = PathBuf::from(exe_dir);
    lib.set_file_name("libdynamo_llm_capi.so");
    let vars = [
        // Path to the C API Library
        ("VLLM_KV_CAPI_PATH", lib.display().to_string()),
        // Identifiers to publish KV related information
        ("VLLM_KV_NAMESPACE", "dynamo".to_string()),
        ("VLLM_KV_COMPONENT", "vllm".to_string()),
        // Worker ID used for identifying workers in distributed settings
        ("VLLM_WORKER_ID", "0".to_string()),
    ];
    for (kvar, default_v) in vars {
        if env::var(kvar).is_err() {
            env::set_var(kvar, default_v);
        }
    }
    Ok(())
}