Commit 670661f6 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat: Allow passing any arguments to vllm and sglang engines (#368)

Put the arguments in a JSON file:
```
{
    "dtype": "half",
    "trust_remote_code": true
}
```

Pass it like this:
```
dynamo-run out=sglang ~/llm_models/Llama-3.2-3B-Instruct --extra-engine-args sglang_extra.json
```

Requested here https://github.com/ai-dynamo/dynamo/issues/290 (`dtype`) and here https://github.com/ai-dynamo/dynamo/issues/360 (`trust_remote_code`).
parent a03dd474
...@@ -165,14 +165,16 @@ Any example above using `out=sglang` will work, but our sglang backend is also m ...@@ -165,14 +165,16 @@ Any example above using `out=sglang` will work, but our sglang backend is also m
Node 1: Node 1:
``` ```
dynamo-run in=http out=sglang --model-path ~/llm_models/DeepSeek-R1-Distill-Llama-70B/ --tensor-parallel-size 8 --num-nodes 2 --node-rank 0 --dist-init-addr 10.217.98.122:9876 dynamo-run in=http out=sglang --model-path ~/llm_models/DeepSeek-R1-Distill-Llama-70B/ --tensor-parallel-size 8 --num-nodes 2 --node-rank 0 --leader-addr 10.217.98.122:9876
``` ```
Node 2: Node 2:
``` ```
dynamo-run in=none out=sglang --model-path ~/llm_models/DeepSeek-R1-Distill-Llama-70B/ --tensor-parallel-size 8 --num-nodes 2 --node-rank 1 --dist-init-addr 10.217.98.122:9876 dynamo-run in=none out=sglang --model-path ~/llm_models/DeepSeek-R1-Distill-Llama-70B/ --tensor-parallel-size 8 --num-nodes 2 --node-rank 1 --leader-addr 10.217.98.122:9876
``` ```
To pass extra arguments to the sglang engine see *Extra engine arguments* below.
## llama_cpp ## llama_cpp
- `cargo build --features llamacpp,cuda` - `cargo build --features llamacpp,cuda`
...@@ -225,6 +227,8 @@ Node 2: ...@@ -225,6 +227,8 @@ Node 2:
dynamo-run in=none out=vllm ~/llm_models/Llama-3.2-3B-Instruct/ --num-nodes 2 --leader-addr 10.217.98.122:6539 --node-rank 1 dynamo-run in=none out=vllm ~/llm_models/Llama-3.2-3B-Instruct/ --num-nodes 2 --leader-addr 10.217.98.122:6539 --node-rank 1
``` ```
To pass extra arguments to the vllm engine see *Extra engine arguments* below.
## Python bring-your-own-engine ## Python bring-your-own-engine
You can provide your own engine in a Python file. The file must provide a generator with this signature: You can provide your own engine in a Python file. The file must provide a generator with this signature:
...@@ -434,3 +438,20 @@ The output looks like this: ...@@ -434,3 +438,20 @@ The output looks like this:
The input defaults to `in=text`. The output will default to `mistralrs` engine. If not available whatever engine you have compiled in (so depending on `--features`). The input defaults to `in=text`. The output will default to `mistralrs` engine. If not available whatever engine you have compiled in (so depending on `--features`).
## Extra engine arguments
The vllm and sglang backends support passing any argument the engine accepts.
Put the arguments in a JSON file:
```
{
"dtype": "half",
"trust_remote_code": true
}
```
Pass it like this:
```
dynamo-run out=sglang ~/llm_models/Llama-3.2-3B-Instruct --extra-engine-args sglang_extra.json
```
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
...@@ -106,6 +107,11 @@ pub struct Flags { ...@@ -106,6 +107,11 @@ pub struct Flags {
#[arg(long, hide = true, value_parser = parse_sglang_flags)] #[arg(long, hide = true, value_parser = parse_sglang_flags)]
pub internal_sglang_process: Option<SgLangFlags>, pub internal_sglang_process: Option<SgLangFlags>,
/// Additional engine-specific arguments from a JSON file.
/// Contains a mapping of parameter names to values.
#[arg(long)]
pub extra_engine_args: Option<PathBuf>,
/// Everything after a `--`. /// Everything after a `--`.
/// These are the command line arguments to the python engine when using `pystr` or `pytok`. /// These are the command line arguments to the python engine when using `pystr` or `pytok`.
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)] #[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
...@@ -146,9 +152,27 @@ impl Flags { ...@@ -146,9 +152,27 @@ impl Flags {
out.push("--leader-addr".to_string()); out.push("--leader-addr".to_string());
out.push(leader.to_string()); out.push(leader.to_string());
} }
if let Some(extra_engine_args) = self.extra_engine_args.as_ref() {
out.push("--extra-engine-args".to_string());
out.push(extra_engine_args.display().to_string());
}
out.extend(self.last.clone()); out.extend(self.last.clone());
out out
} }
/// Load extra engine arguments from a JSON file
/// Returns a HashMap of parameter names to values
pub fn load_extra_engine_args(
&self,
) -> anyhow::Result<Option<HashMap<String, serde_json::Value>>> {
if let Some(path) = &self.extra_engine_args {
let file_content = std::fs::read_to_string(path)?;
let args: HashMap<String, serde_json::Value> = serde_json::from_str(&file_content)?;
Ok(Some(args))
} else {
Ok(None)
}
}
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
......
...@@ -257,6 +257,7 @@ pub async fn run( ...@@ -257,6 +257,7 @@ pub async fn run(
node_conf, node_conf,
flags.tensor_parallel_size, flags.tensor_parallel_size,
flags.base_gpu_id, flags.base_gpu_id,
flags.extra_engine_args,
) )
.await?; .await?;
extra = Some(Box::pin(async move { extra = Some(Box::pin(async move {
...@@ -310,6 +311,7 @@ pub async fn run( ...@@ -310,6 +311,7 @@ pub async fn run(
&sock_prefix, &sock_prefix,
node_conf, node_conf,
flags.tensor_parallel_size, flags.tensor_parallel_size,
flags.extra_engine_args,
) )
.await?; .await?;
extra = Some(Box::pin(async move { extra = Some(Box::pin(async move {
......
...@@ -32,7 +32,7 @@ Example: ...@@ -32,7 +32,7 @@ Example:
const ZMQ_SOCKET_PREFIX: &str = "dyn"; const ZMQ_SOCKET_PREFIX: &str = "dyn";
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>|none] out=[mistralrs|sglang|llamacpp|vllm|trtllm|echo_full|echo_core|pystr:<engine.py>|pytok:<engine.py>] [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0]"; const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>|none] out=[mistralrs|sglang|llamacpp|vllm|trtllm|echo_full|echo_core|pystr:<engine.py>|pytok:<engine.py>] [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json]";
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
logging::init(); logging::init();
...@@ -68,6 +68,7 @@ fn main() -> anyhow::Result<()> { ...@@ -68,6 +68,7 @@ fn main() -> anyhow::Result<()> {
sglang_flags.pipe_fd as std::os::fd::RawFd, sglang_flags.pipe_fd as std::os::fd::RawFd,
node_config, node_config,
gpu_config, gpu_config,
flags.extra_engine_args,
); );
} }
} else { } else {
...@@ -94,6 +95,7 @@ fn main() -> anyhow::Result<()> { ...@@ -94,6 +95,7 @@ fn main() -> anyhow::Result<()> {
&model_path, &model_path,
node_config, node_config,
flags.tensor_parallel_size, flags.tensor_parallel_size,
flags.extra_engine_args,
); );
} }
} else { } else {
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use crate::backend::ExecutionContext; use crate::backend::ExecutionContext;
...@@ -40,6 +40,8 @@ pub async fn make_engine( ...@@ -40,6 +40,8 @@ pub async fn make_engine(
tensor_parallel_size: u32, tensor_parallel_size: u32,
// The base GPU ID to start allocating GPUs from // The base GPU ID to start allocating GPUs from
base_gpu_id: u32, base_gpu_id: u32,
// Extra arguments to pass directly as sglang ServerArgs
extra_engine_args: Option<PathBuf>,
) -> pipeline_error::Result<(ExecutionContext, tokio::task::JoinHandle<()>)> { ) -> pipeline_error::Result<(ExecutionContext, tokio::task::JoinHandle<()>)> {
let mut engine = SgLangEngine::new( let mut engine = SgLangEngine::new(
cancel_token, cancel_token,
...@@ -48,6 +50,7 @@ pub async fn make_engine( ...@@ -48,6 +50,7 @@ pub async fn make_engine(
node_conf, node_conf,
tensor_parallel_size, tensor_parallel_size,
base_gpu_id, base_gpu_id,
extra_engine_args,
) )
.await?; .await?;
let sglang_process = engine.take_sglang_worker_handle(); let sglang_process = engine.take_sglang_worker_handle();
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path; use std::path::{Path, PathBuf};
use async_stream::stream; use async_stream::stream;
use async_trait::async_trait; use async_trait::async_trait;
...@@ -39,6 +39,7 @@ impl SgLangEngine { ...@@ -39,6 +39,7 @@ impl SgLangEngine {
node_conf: MultiNodeConfig, node_conf: MultiNodeConfig,
tensor_parallel_size: u32, tensor_parallel_size: u32,
base_gpu_id: u32, base_gpu_id: u32,
extra_engine_args: Option<PathBuf>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let w = super::worker::start( let w = super::worker::start(
cancel_token.clone(), cancel_token.clone(),
...@@ -47,6 +48,7 @@ impl SgLangEngine { ...@@ -47,6 +48,7 @@ impl SgLangEngine {
node_conf, node_conf,
tensor_parallel_size, tensor_parallel_size,
base_gpu_id, base_gpu_id,
extra_engine_args,
) )
.await?; .await?;
let engine = SgLangEngine { let engine = SgLangEngine {
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# This file is included as a string in subprocess.rs. Most work should be done in the Rust caller.
#
import json
import logging
import tempfile
from multiprocessing.connection import Connection
from sglang.srt.entrypoints.engine import _set_envs_and_config
from sglang.srt.managers.scheduler import run_scheduler_process
from sglang.srt.server_args import PortArgs, ServerArgs
logging.basicConfig(
level="DEBUG",
force=True,
datefmt="%Y-%m-%d %H:%M:%S",
format="[%(asctime)s] %(message)s",
)
# These can all be overridden by --extra-engine-args json file
arg_map = {
"model_path": f"{model_path}",
"enable_metrics": False,
"log_level": "debug",
"log_requests": True,
"tp_size": int(tp_size_str),
# Multi-node
"dist_init_addr": dist_init_addr if dist_init_addr != "" else None,
"nnodes": int(nnodes_str),
"node_rank": int(node_rank_str),
}
json_map = {}
if extra_engine_args != "":
# extra_engine_args is a filename
try:
with open(extra_engine_args) as f:
json_map = json.load(f)
except FileNotFoundError:
logging.debug(f"File {extra_engine_args} not found.")
except json.JSONDecodeError as e:
logging.debug(f"Invalid JSON in {extra_engine_args}: {e}")
logging.debug(f"Adding extra engine arguments: {json_map}")
arg_map = {**arg_map, **json_map} # json_map gets precedence
server_args = ServerArgs(**arg_map)
_set_envs_and_config(server_args)
logging.debug(server_args)
ipc_path = f"ipc:///tmp/{socket_id}"
# These must match worker.rs zmq_sockets, which is the other side
port_args = PortArgs(
# we don't use this one so use anything
tokenizer_ipc_name=f"ipc://{tempfile.NamedTemporaryFile(delete=False).name}",
# Us -> sglang
scheduler_input_ipc_name=f"{ipc_path}_input_socket",
# sglang -> us
detokenizer_ipc_name=f"{ipc_path}_output_socket",
# The port for nccl initialization (torch.dist), which we don't use
nccl_port=9876,
)
# Rank must be globally unique across nodes
tp_rank = int(tp_rank_str)
# See nvidia-smi for GPU IDs, they run 0,1,2,etc.
# In a single-node setup this is the same as rank
gpu_id = int(gpu_id_str)
pipe_fd_int = int(pipe_fd)
writer = Connection(handle=pipe_fd_int, readable=False, writable=True)
run_scheduler_process(server_args, port_args, gpu_id, tp_rank, None, writer)
...@@ -14,68 +14,16 @@ ...@@ -14,68 +14,16 @@
// limitations under the License. // limitations under the License.
use pyo3::{types::IntoPyDict, Python}; use pyo3::{types::IntoPyDict, Python};
use std::{env, os::fd::RawFd, path::Path}; use std::{
env,
ffi::CString,
os::fd::RawFd,
path::{Path, PathBuf},
};
use crate::engines::MultiNodeConfig; use crate::engines::MultiNodeConfig;
const PY_START_ENGINE: &std::ffi::CStr = cr#" const PY_START_ENGINE: &str = include_str!("sglang_inc.py");
from multiprocessing.connection import Connection
import signal
import tempfile
import logging
from sglang.srt.server_args import ServerArgs, PortArgs
import sglang as sgl
from sglang.srt.managers.scheduler import run_scheduler_process
from sglang.srt.entrypoints.engine import _set_envs_and_config
server_args = ServerArgs(
model_path=f"{model_path}",
enable_metrics = False,
log_level = "debug",
log_requests = True,
tp_size = int(tp_size_str),
# Multi-node
dist_init_addr = dist_init_addr if dist_init_addr != "" else None,
nnodes = int(nnodes_str),
node_rank = int(node_rank_str),
)
logging.basicConfig(
level="DEBUG",
force=True,
datefmt="%Y-%m-%d %H:%M:%S",
format=f"[%(asctime)s] %(message)s",
)
_set_envs_and_config(server_args)
logging.debug(server_args)
ipc_path = f"ipc:///tmp/{socket_id}";
# These must match worker.rs zmq_sockets, which is the other side
port_args = PortArgs(
# we don't use this one so use anything
tokenizer_ipc_name=f"ipc://{tempfile.NamedTemporaryFile(delete=False).name}",
# Us -> sglang
scheduler_input_ipc_name=f"{ipc_path}_input_socket",
# sglang -> us
detokenizer_ipc_name=f"{ipc_path}_output_socket",
# The port for nccl initialization (torch.dist), which we don't use
nccl_port=9876,
)
# Rank must be globally unique across nodes
tp_rank = int(tp_rank_str)
# See nvidia-smi for GPU IDs, they run 0,1,2,etc.
# In a single-node setup this is the same as rank
gpu_id = int(gpu_id_str)
pipe_fd_int = int(pipe_fd)
writer = Connection(handle=pipe_fd_int, readable=False, writable=True)
run_scheduler_process(server_args, port_args, gpu_id, tp_rank, None, writer)
"#;
/// Start the Python sglang engine that listens on zmq socket /// Start the Python sglang engine that listens on zmq socket
/// This is called by running `nio --internal-sglang-process /// This is called by running `nio --internal-sglang-process
...@@ -91,12 +39,17 @@ pub fn run_subprocess( ...@@ -91,12 +39,17 @@ pub fn run_subprocess(
node_config: MultiNodeConfig, node_config: MultiNodeConfig,
// Multi GPU. Usually Default::default // Multi GPU. Usually Default::default
gpu_config: super::MultiGPUConfig, gpu_config: super::MultiGPUConfig,
// Allow passing any arguments to sglang
extra_engine_args: Option<PathBuf>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize" pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
if let Ok(venv) = env::var("VIRTUAL_ENV") { if let Ok(venv) = env::var("VIRTUAL_ENV") {
let _ = Python::with_gil(|py| crate::engines::fix_venv(venv, py)); let _ = Python::with_gil(|py| crate::engines::fix_venv(venv, py));
} }
let dir = model_path.display().to_string(); let dir = model_path.display().to_string();
let extra_engine_args_str = &extra_engine_args
.map(|p| p.display().to_string())
.unwrap_or_default();
Python::with_gil(|py| { Python::with_gil(|py| {
let locals = [ let locals = [
("socket_id", socket_id), ("socket_id", socket_id),
...@@ -109,10 +62,11 @@ pub fn run_subprocess( ...@@ -109,10 +62,11 @@ pub fn run_subprocess(
("nnodes_str", &node_config.num_nodes.to_string()), ("nnodes_str", &node_config.num_nodes.to_string()),
("node_rank_str", &node_config.node_rank.to_string()), ("node_rank_str", &node_config.node_rank.to_string()),
("dist_init_addr", &node_config.leader_addr), ("dist_init_addr", &node_config.leader_addr),
("extra_engine_args", extra_engine_args_str),
] ]
.into_py_dict(py) .into_py_dict(py)
.unwrap(); .unwrap();
if let Err(err) = py.run(PY_START_ENGINE, None, Some(&locals)) { if let Err(err) = py.run(CString::new(PY_START_ENGINE)?.as_ref(), None, Some(&locals)) {
anyhow::bail!("sglang engine run error: {err}"); anyhow::bail!("sglang engine run error: {err}");
} }
tracing::info!("sglang subprocess exit"); tracing::info!("sglang subprocess exit");
......
...@@ -17,7 +17,7 @@ use std::{ ...@@ -17,7 +17,7 @@ use std::{
collections::HashMap, collections::HashMap,
env, fmt, env, fmt,
os::fd::{FromRawFd as _, RawFd}, os::fd::{FromRawFd as _, RawFd},
path::Path, path::{Path, PathBuf},
process::Stdio, process::Stdio,
sync::Arc, sync::Arc,
time::Duration, time::Duration,
...@@ -288,6 +288,7 @@ pub async fn start( ...@@ -288,6 +288,7 @@ pub async fn start(
node_conf: MultiNodeConfig, node_conf: MultiNodeConfig,
tp_size: u32, tp_size: u32,
base_gpu_id: u32, base_gpu_id: u32,
extra_engine_args: Option<PathBuf>,
) -> anyhow::Result<SgLangWorker> { ) -> anyhow::Result<SgLangWorker> {
pyo3::prepare_freethreaded_python(); pyo3::prepare_freethreaded_python();
if let Ok(venv) = env::var("VIRTUAL_ENV") { if let Ok(venv) = env::var("VIRTUAL_ENV") {
...@@ -321,8 +322,13 @@ pub async fn start( ...@@ -321,8 +322,13 @@ pub async fn start(
tp_rank, tp_rank,
gpu_id, gpu_id,
}; };
let (sglang_process, ready_fd) = let (sglang_process, ready_fd) = start_sglang(
start_sglang(model_path, node_conf.clone(), gpu_conf).await?; model_path,
node_conf.clone(),
gpu_conf,
extra_engine_args.clone(),
)
.await?;
process_group.push((tp_rank, ready_fd)); process_group.push((tp_rank, ready_fd));
let watcher_join_handle = watch_sglang(cancel_token.clone(), sglang_process); let watcher_join_handle = watch_sglang(cancel_token.clone(), sglang_process);
// TODO: Do we want to hold on to this? // TODO: Do we want to hold on to this?
...@@ -442,6 +448,7 @@ async fn start_sglang( ...@@ -442,6 +448,7 @@ async fn start_sglang(
model_path: &Path, model_path: &Path,
node_conf: MultiNodeConfig, node_conf: MultiNodeConfig,
gpu_conf: MultiGPUConfig, gpu_conf: MultiGPUConfig,
extra_engine_args: Option<PathBuf>,
) -> anyhow::Result<(tokio::process::Child, RawFd)> { ) -> anyhow::Result<(tokio::process::Child, RawFd)> {
// This pipe is how sglang tells us it's ready // This pipe is how sglang tells us it's ready
let mut pipe_fds: [libc::c_int; 2] = [-1, -1]; let mut pipe_fds: [libc::c_int; 2] = [-1, -1];
...@@ -462,6 +469,12 @@ async fn start_sglang( ...@@ -462,6 +469,12 @@ async fn start_sglang(
format!("--num-nodes={}", node_conf.num_nodes), format!("--num-nodes={}", node_conf.num_nodes),
format!("--node-rank={}", node_conf.node_rank), format!("--node-rank={}", node_conf.node_rank),
]; ];
if let Some(extra_engine_args) = extra_engine_args {
args.push(format!(
"--extra-engine-args={}",
extra_engine_args.display()
));
};
if node_conf.num_nodes > 1 { if node_conf.num_nodes > 1 {
if node_conf.leader_addr.is_empty() { if node_conf.leader_addr.is_empty() {
anyhow::bail!("Missing --leader-addr for multi-node"); anyhow::bail!("Missing --leader-addr for multi-node");
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
// limitations under the License. // limitations under the License.
use std::future::Future; use std::future::Future;
use std::path::Path; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
...@@ -46,6 +46,8 @@ pub async fn make_leader_engine( ...@@ -46,6 +46,8 @@ pub async fn make_leader_engine(
node_conf: MultiNodeConfig, node_conf: MultiNodeConfig,
// How many GPUs to use // How many GPUs to use
tensor_parallel_size: u32, tensor_parallel_size: u32,
// Path to extra engine args file
extra_engine_args: Option<PathBuf>,
) -> pipeline_error::Result<(ExecutionContext, impl Future<Output = ()>)> { ) -> pipeline_error::Result<(ExecutionContext, impl Future<Output = ()>)> {
let ray_obj = if node_conf.num_nodes > 1 { let ray_obj = if node_conf.num_nodes > 1 {
let r = ray::start_leader(node_conf.leader_addr.parse()?)?; let r = ray::start_leader(node_conf.leader_addr.parse()?)?;
...@@ -64,6 +66,7 @@ pub async fn make_leader_engine( ...@@ -64,6 +66,7 @@ pub async fn make_leader_engine(
model_path, model_path,
node_conf, node_conf,
tensor_parallel_size, tensor_parallel_size,
extra_engine_args,
) )
.await?; .await?;
let vllm_process = engine.take_vllm_worker_handle(); let vllm_process = engine.take_vllm_worker_handle();
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path; use std::path::{Path, PathBuf};
use async_stream::stream; use async_stream::stream;
use async_trait::async_trait; use async_trait::async_trait;
...@@ -38,6 +38,7 @@ impl VllmEngine { ...@@ -38,6 +38,7 @@ impl VllmEngine {
model_path: &Path, model_path: &Path,
node_conf: MultiNodeConfig, node_conf: MultiNodeConfig,
tensor_parallel_size: u32, tensor_parallel_size: u32,
extra_engine_args: Option<PathBuf>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let w = worker::start( let w = worker::start(
cancel_token.clone(), cancel_token.clone(),
...@@ -45,6 +46,7 @@ impl VllmEngine { ...@@ -45,6 +46,7 @@ impl VllmEngine {
model_path, model_path,
node_conf, node_conf,
tensor_parallel_size, tensor_parallel_size,
extra_engine_args,
) )
.await?; .await?;
let engine = VllmEngine { let engine = VllmEngine {
......
...@@ -15,44 +15,12 @@ ...@@ -15,44 +15,12 @@
use pyo3::{types::IntoPyDict, Python}; use pyo3::{types::IntoPyDict, Python};
use std::env; use std::env;
use std::path::Path; use std::ffi::CString;
use std::path::{Path, PathBuf};
use crate::engines::MultiNodeConfig; use crate::engines::MultiNodeConfig;
const PY_START_ENGINE: &std::ffi::CStr = cr#" const PY_START_ENGINE: &str = include_str!("vllm_inc.py");
import multiprocessing
import signal
from vllm.engine.multiprocessing.engine import run_mp_engine
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.usage.usage_lib import UsageContext
engine_args = AsyncEngineArgs(
model=f"{model_path}",
served_model_name=None,
task='generate',
skip_tokenizer_init=True,
seed=0,
max_model_len=8192,
max_seq_len_to_capture=8192,
tensor_parallel_size = int(tp_size_str),
pipeline_parallel_size = int(nnodes_str),
)
ipc_path = f"ipc:///tmp/{socket_id}";
engine_alive = multiprocessing.Value('b', True, lock=False)
# 0.7.3
run_mp_engine(engine_args, UsageContext.OPENAI_API_SERVER, ipc_path, engine_alive)
# 0.8.1
# TODO: In 0.8+ first argument is VllmConfig, not AsyncEngineArgs
# disable_log_stats = False
# disable_log_requests = True
# run_mp_engine(engine_args, UsageContext.OPENAI_API_SERVER, ipc_path, disable_log_stats, disable_log_requests, engine_alive)
"#;
/// Start the Python vllm engine that listens on zmq socket /// Start the Python vllm engine that listens on zmq socket
/// This is called by running `<bin> --internal-vllm-process /// This is called by running `<bin> --internal-vllm-process
...@@ -62,22 +30,27 @@ pub fn run_subprocess( ...@@ -62,22 +30,27 @@ pub fn run_subprocess(
model_path: &Path, model_path: &Path,
node_config: MultiNodeConfig, node_config: MultiNodeConfig,
tp_size: u32, tp_size: u32,
extra_engine_args: Option<PathBuf>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize" pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
if let Ok(venv) = env::var("VIRTUAL_ENV") { if let Ok(venv) = env::var("VIRTUAL_ENV") {
let _ = Python::with_gil(|py| crate::engines::fix_venv(venv, py)); let _ = Python::with_gil(|py| crate::engines::fix_venv(venv, py));
} }
let model_path_str = model_path.display().to_string(); let model_path_str = model_path.display().to_string();
let extra_engine_args_str = &extra_engine_args
.map(|p| p.display().to_string())
.unwrap_or_default();
Python::with_gil(|py| { Python::with_gil(|py| {
let locals = [ let locals = [
("socket_id", socket_id), ("socket_id", socket_id),
("model_path", model_path_str.as_str()), ("model_path", model_path_str.as_str()),
("tp_size_str", &tp_size.to_string()), ("tp_size_str", &tp_size.to_string()),
("nnodes_str", &node_config.num_nodes.to_string()), ("nnodes_str", &node_config.num_nodes.to_string()),
("extra_engine_args", extra_engine_args_str),
] ]
.into_py_dict(py) .into_py_dict(py)
.unwrap(); .unwrap();
if let Err(err) = py.run(PY_START_ENGINE, None, Some(&locals)) { if let Err(err) = py.run(CString::new(PY_START_ENGINE)?.as_ref(), None, Some(&locals)) {
anyhow::bail!("vllm engine run error: {err}"); anyhow::bail!("vllm engine run error: {err}");
} }
tracing::info!("vllm subprocess exit"); tracing::info!("vllm subprocess exit");
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# This file is included as a string in subprocess.rs. Most work should be done in the Rust caller.
#
import json
import logging
import multiprocessing
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.multiprocessing.engine import run_mp_engine
from vllm.usage.usage_lib import UsageContext
arg_map = {
"model": f"{model_path}",
"served_model_name": None,
"task": "generate",
"skip_tokenizer_init": True,
"seed": 0,
"max_model_len": 8192,
"max_seq_len_to_capture": 8192,
"tensor_parallel_size": int(tp_size_str),
"pipeline_parallel_size": int(nnodes_str),
}
json_map = {}
if extra_engine_args != "":
# extra_engine_args is a filename
try:
with open(extra_engine_args) as f:
json_map = json.load(f)
except FileNotFoundError:
logging.debug(f"File {extra_engine_args} not found.")
except json.JSONDecodeError as e:
logging.debug(f"Invalid JSON in {extra_engine_args}: {e}")
logging.debug(f"Adding extra engine arguments: {json_map}")
arg_map = {**arg_map, **json_map} # json_map gets precedence
engine_args = AsyncEngineArgs(**arg_map)
ipc_path = f"ipc:///tmp/{socket_id}"
engine_alive = multiprocessing.Value("b", True, lock=False)
# 0.7.3
run_mp_engine(engine_args, UsageContext.OPENAI_API_SERVER, ipc_path, engine_alive)
# 0.8.1
# TODO: In 0.8+ first argument is VllmConfig, not AsyncEngineArgs
# disable_log_stats = False
# disable_log_requests = True
# run_mp_engine(engine_args, UsageContext.OPENAI_API_SERVER, ipc_path, disable_log_stats, disable_log_requests, engine_alive)
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::{Path, PathBuf};
use std::process::Stdio; use std::process::Stdio;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
...@@ -163,6 +163,7 @@ pub async fn start( ...@@ -163,6 +163,7 @@ pub async fn start(
model_path: &Path, model_path: &Path,
_node_conf: MultiNodeConfig, _node_conf: MultiNodeConfig,
tensor_parallel_size: u32, tensor_parallel_size: u32,
extra_engine_args: Option<PathBuf>,
) -> anyhow::Result<VllmWorker> { ) -> anyhow::Result<VllmWorker> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize" pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
if let Ok(venv) = env::var("VIRTUAL_ENV") { if let Ok(venv) = env::var("VIRTUAL_ENV") {
...@@ -179,7 +180,14 @@ pub async fn start( ...@@ -179,7 +180,14 @@ pub async fn start(
metrics, metrics,
} = zmq_sockets(sock_code)?; } = zmq_sockets(sock_code)?;
let vllm_process = start_vllm(model_path, &py_imports, data, tensor_parallel_size).await?; let vllm_process = start_vllm(
model_path,
&py_imports,
data,
tensor_parallel_size,
extra_engine_args,
)
.await?;
let vllm_join_handle = watch_vllm(cancel_token.clone(), vllm_process); let vllm_join_handle = watch_vllm(cancel_token.clone(), vllm_process);
tokio::spawn(heartbeat_loop(cancel_token.clone(), heartbeat)); tokio::spawn(heartbeat_loop(cancel_token.clone(), heartbeat));
...@@ -304,17 +312,21 @@ async fn start_vllm( ...@@ -304,17 +312,21 @@ async fn start_vllm(
python_imports: &Imports, python_imports: &Imports,
mut data_socket: async_zmq::Dealer<IntoIter<Vec<u8>>, Vec<u8>>, mut data_socket: async_zmq::Dealer<IntoIter<Vec<u8>>, Vec<u8>>,
tensor_parallel_size: u32, tensor_parallel_size: u32,
extra_engine_args: Option<PathBuf>,
) -> anyhow::Result<tokio::process::Child> { ) -> anyhow::Result<tokio::process::Child> {
let vllm_args = [ let mut vllm_args = vec![
"--internal-vllm-process", "--internal-vllm-process".to_string(),
&format!("--model-path={}", model_path.display()), format!("--model-path={}", model_path.display()),
&format!("--tensor-parallel-size={tensor_parallel_size}"), format!("--tensor-parallel-size={tensor_parallel_size}"),
]; ];
if let Some(args_path) = extra_engine_args {
vllm_args.push(format!("--extra-engine-args={}", args_path.display()));
}
let self_path = std::env::current_exe()?; let self_path = std::env::current_exe()?;
let mut proc = tokio::process::Command::new(self_path) let mut proc = tokio::process::Command::new(self_path)
.env("VLLM_LOGGING_LEVEL", "DEBUG") .env("VLLM_LOGGING_LEVEL", "DEBUG")
.args(vllm_args) .args(&vllm_args)
.kill_on_drop(false) .kill_on_drop(false)
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
......
...@@ -129,6 +129,7 @@ addopts = [ ...@@ -129,6 +129,7 @@ addopts = [
"--strict-config", "--strict-config",
"--mypy", "--mypy",
"--ignore-glob=*model.py", "--ignore-glob=*model.py",
"--ignore-glob=*_inc.py",
# FIXME: Get relative/generic blob paths to work here # FIXME: Get relative/generic blob paths to work here
] ]
xfail_strict = true xfail_strict = true
...@@ -158,6 +159,7 @@ indent-width = 4 ...@@ -158,6 +159,7 @@ indent-width = 4
[tool.ruff.lint.extend-per-file-ignores] [tool.ruff.lint.extend-per-file-ignores]
"icp/tests/**/test_*.py" = ["F811", "F401"] "icp/tests/**/test_*.py" = ["F811", "F401"]
"*_inc.py" = ["F821"]
[tool.mypy] [tool.mypy]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment