Commit 995f71cc authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat(pystr): Pass command line arguments (#123)

Command line arguments are passed to the python engine like this:
```
dynamo-run out=pystr:my_python_engine.py -- -n 42 --custom-arg Orange --yes
```

The python engine receives the arguments in `sys.argv`. The argument list will include some standard ones as well as anything after the `--`.

This input:
```
dynamo-run out=pystr:my_engine.py /opt/models/Llama-3.2-3B-Instruct/ --model-name llama_3.2 --tensor-parallel-size 4 -- -n 1
```

is read like this:
```
async def generate(request):
    .. as before ..

if __name__ == "__main__":
    print(f"MAIN: {sys.argv}")
```

and produces this output:
```
MAIN: ['my_engine.py', '--model-path', '/opt/models/Llama-3.2-3B-Instruct/', '--model-name', 'llama3.2', '--http-port', '8080', '--tensor-parallel-size', '4', '--base-gpu-id', '0', '--num-nodes', '1', '--node-rank', '0', '-n', '1']
```

This allows quick iteration on the engine setup. Note how the `-n` `1` is included. Flags `--leader-addr` and `--model-config` will also be added if provided to `dynamo-run`.
parent 666cf87b
...@@ -211,6 +211,34 @@ async def generate(request): ...@@ -211,6 +211,34 @@ async def generate(request):
yield {"id":"1","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop"}],"created":1841762283,"model":"Llama-3.2-1B-Instruct","system_fingerprint":"local","object":"chat.completion.chunk"} yield {"id":"1","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop"}],"created":1841762283,"model":"Llama-3.2-1B-Instruct","system_fingerprint":"local","object":"chat.completion.chunk"}
``` ```
Command line arguments are passed to the python engine like this:
```
dynamo-run out=pystr:my_python_engine.py -- -n 42 --custom-arg Orange --yes
```
The python engine receives the arguments in `sys.argv`. The argument list will include some standard ones as well as anything after the `--`.
This input:
```
dynamo-run out=pystr:my_engine.py /opt/models/Llama-3.2-3B-Instruct/ --model-name llama_3.2 --tensor-parallel-size 4 -- -n 1
```
is read like this:
```
async def generate(request):
.. as before ..
if __name__ == "__main__":
print(f"MAIN: {sys.argv}")
```
and produces this output:
```
MAIN: ['my_engine.py', '--model-path', '/opt/models/Llama-3.2-3B-Instruct/', '--model-name', 'llama3.2', '--http-port', '8080', '--tensor-parallel-size', '4', '--base-gpu-id', '0', '--num-nodes', '1', '--node-rank', '0', '-n', '1']
```
This allows quick iteration on the engine setup. Note how the `-n` `1` is included. Flags `--leader-addr` and `--model-config` will also be added if provided to `dynamo-run`.
### Dynamo does the pre-processing ### Dynamo does the pre-processing
If the Python engine wants to receive and return tokens - the prompt templating and tokenization is already done - run it like this: If the Python engine wants to receive and return tokens - the prompt templating and tokenization is already done - run it like this:
...@@ -250,6 +278,8 @@ async def generate(request): ...@@ -250,6 +278,8 @@ async def generate(request):
yield {"token_ids":[13]} yield {"token_ids":[13]}
``` ```
`pytok` supports the same ways of passing command line arguments as `pystr` - `initialize` or `main` with `sys.argv`.
## trtllm ## trtllm
TensorRT-LLM. Requires `clang` and `libclang-dev`. TensorRT-LLM. Requires `clang` and `libclang-dev`.
......
...@@ -93,8 +93,7 @@ pub struct Flags { ...@@ -93,8 +93,7 @@ pub struct Flags {
/// Internal use only. /// Internal use only.
// Start the python vllm engine sub-process. // Start the python vllm engine sub-process.
#[arg(long)] #[arg(long, hide = true, default_value = "false")]
#[clap(hide = true, default_value = "false")]
pub internal_vllm_process: bool, pub internal_vllm_process: bool,
/// Internal use only. /// Internal use only.
...@@ -104,9 +103,52 @@ pub struct Flags { ...@@ -104,9 +103,52 @@ pub struct Flags {
/// - the node rank (0 for first host, 1 for second host, etc) /// - the node rank (0 for first host, 1 for second host, etc)
/// - the workers' rank (globally unique) /// - the workers' rank (globally unique)
/// - the GPU to use (locally unique) /// - the GPU to use (locally unique)
#[arg(long)] #[arg(long, hide = true, value_parser = parse_sglang_flags)]
#[clap(hide = true, value_parser = parse_sglang_flags)]
pub internal_sglang_process: Option<SgLangFlags>, pub internal_sglang_process: Option<SgLangFlags>,
/// Everything after a `--`.
/// These are the command line arguments to the python engine when using `pystr` or `pytok`.
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
pub last: Vec<String>,
}
impl Flags {
/// Convert the flags back to a command line. Including only the non-null values, but
/// include the defaults. Includes the canonicalized model path and normalized model name.
///
/// Used to pass arguments to python engines via `pystr` and `pytok`.
pub fn as_vec(&self, path: &str, name: &str) -> Vec<String> {
let mut out = vec![
"--model-path".to_string(),
path.to_string(),
"--model-name".to_string(),
name.to_string(),
"--http-port".to_string(),
self.http_port.to_string(),
// Default 1
"--tensor-parallel-size".to_string(),
self.tensor_parallel_size.to_string(),
// Default 0
"--base-gpu-id".to_string(),
self.base_gpu_id.to_string(),
// Default 1
"--num-nodes".to_string(),
self.num_nodes.to_string(),
// Default 0
"--node-rank".to_string(),
self.node_rank.to_string(),
];
if let Some(model_config_path) = self.model_config.as_ref() {
out.push("--model-config".to_string());
out.push(model_config_path.display().to_string());
}
if let Some(leader) = self.leader_addr.as_ref() {
out.push("--leader-addr".to_string());
out.push(leader.to_string());
}
out.extend(self.last.clone());
out
}
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
......
...@@ -82,7 +82,8 @@ pub async fn run( ...@@ -82,7 +82,8 @@ pub async fn run(
// Turn relative paths into absolute paths // Turn relative paths into absolute paths
let model_path = flags let model_path = flags
.model_path_pos .model_path_pos
.or(flags.model_path_flag) .clone()
.or(flags.model_path_flag.clone())
.and_then(|p| { .and_then(|p| {
if p.exists() { if p.exists() {
p.canonicalize().ok() p.canonicalize().ok()
...@@ -93,6 +94,7 @@ pub async fn run( ...@@ -93,6 +94,7 @@ pub async fn run(
// Serve the model under the name provided, or the name of the GGUF file or HF repo. // Serve the model under the name provided, or the name of the GGUF file or HF repo.
let model_name = flags let model_name = flags
.model_name .model_name
.clone()
.or_else(|| { .or_else(|| {
model_path model_path
.as_ref() .as_ref()
...@@ -338,8 +340,9 @@ pub async fn run( ...@@ -338,8 +340,9 @@ pub async fn run(
let Some(model_name) = model_name else { let Some(model_name) = model_name else {
anyhow::bail!("Provide model service name as `--model-name <this>`"); anyhow::bail!("Provide model service name as `--model-name <this>`");
}; };
let py_args = flags.as_vec(&path_str, &model_name);
let p = std::path::PathBuf::from(path_str); let p = std::path::PathBuf::from(path_str);
let engine = python::make_string_engine(cancel_token.clone(), &p).await?; let engine = python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
EngineConfig::StaticFull { EngineConfig::StaticFull {
service_name: model_name, service_name: model_name,
engine, engine,
...@@ -354,8 +357,9 @@ pub async fn run( ...@@ -354,8 +357,9 @@ pub async fn run(
let Some(model_name) = model_name else { let Some(model_name) = model_name else {
unreachable!("If we have a card we must have a model name"); unreachable!("If we have a card we must have a model name");
}; };
let py_args = flags.as_vec(&path_str, &model_name);
let p = std::path::PathBuf::from(path_str); let p = std::path::PathBuf::from(path_str);
let engine = python::make_token_engine(cancel_token.clone(), &p).await?; let engine = python::make_token_engine(cancel_token.clone(), &p, py_args).await?;
EngineConfig::StaticCore { EngineConfig::StaticCore {
service_name: model_name.clone(), service_name: model_name.clone(),
engine, engine,
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
use std::ffi::CStr; use std::ffi::CStr;
use std::{path::Path, sync::Arc}; use std::{path::Path, sync::Arc};
use anyhow::Context;
use dynamo_runtime::pipeline::error as pipeline_error; use dynamo_runtime::pipeline::error as pipeline_error;
pub use dynamo_runtime::{ pub use dynamo_runtime::{
error, error,
...@@ -43,12 +44,11 @@ const PY_IMPORT: &CStr = cr#" ...@@ -43,12 +44,11 @@ const PY_IMPORT: &CStr = cr#"
import importlib.util import importlib.util
import sys import sys
module_name = file_path.split("/")[-1].replace(".py", "") spec = importlib.util.spec_from_file_location("__main__", file_path)
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module sys.argv = sys_argv
sys.modules["__main__"] = module
spec.loader.exec_module(module) spec.loader.exec_module(module)
"#; "#;
...@@ -56,10 +56,11 @@ spec.loader.exec_module(module) ...@@ -56,10 +56,11 @@ spec.loader.exec_module(module)
pub async fn make_string_engine( pub async fn make_string_engine(
cancel_token: CancellationToken, cancel_token: CancellationToken,
py_file: &Path, py_file: &Path,
py_args: Vec<String>,
) -> pipeline_error::Result<OpenAIChatCompletionsStreamingEngine> { ) -> pipeline_error::Result<OpenAIChatCompletionsStreamingEngine> {
pyo3::prepare_freethreaded_python(); pyo3::prepare_freethreaded_python();
let engine = new_engine(cancel_token, py_file).await?; let engine = new_engine(cancel_token, py_file, py_args).await?;
let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine); let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine);
Ok(engine) Ok(engine)
} }
...@@ -68,10 +69,11 @@ pub async fn make_string_engine( ...@@ -68,10 +69,11 @@ pub async fn make_string_engine(
pub async fn make_token_engine( pub async fn make_token_engine(
cancel_token: CancellationToken, cancel_token: CancellationToken,
py_file: &Path, py_file: &Path,
py_args: Vec<String>,
) -> pipeline_error::Result<ExecutionContext> { ) -> pipeline_error::Result<ExecutionContext> {
pyo3::prepare_freethreaded_python(); pyo3::prepare_freethreaded_python();
let engine = new_engine(cancel_token, py_file).await?; let engine = new_engine(cancel_token, py_file, py_args).await?;
let engine: ExecutionContext = Arc::new(engine); let engine: ExecutionContext = Arc::new(engine);
Ok(engine) Ok(engine)
} }
...@@ -86,13 +88,30 @@ pub struct PythonServerStreamingEngine { ...@@ -86,13 +88,30 @@ pub struct PythonServerStreamingEngine {
async fn new_engine( async fn new_engine(
cancel_token: CancellationToken, cancel_token: CancellationToken,
py_file: &Path, py_file: &Path,
py_args: Vec<String>,
) -> anyhow::Result<PythonServerStreamingEngine> { ) -> anyhow::Result<PythonServerStreamingEngine> {
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(move || run_asyncio(tx)); tokio::task::spawn_blocking(move || run_asyncio(tx));
let event_loop = rx.await?; let event_loop = rx.await?;
let user_module = python_file_to_module(py_file)?; let user_module =
let generator = Python::with_gil(|py| user_module.getattr(py, "generate").unwrap()); python_file_to_module(py_file, py_args).with_context(|| py_file.display().to_string())?;
let generator = Python::with_gil(|py| {
/* Leave commented, `initialize` may be needed to match Triton
if let Ok(initialize) = user_module.getattr(py, "initialize") {
initialize
.call1(py, (py_args,))
.inspect_err(|err| {
println!();
err.display(py);
})
.with_context(|| "Failed calling python engine's initialize(args)")?;
};
*/
user_module
.getattr(py, "generate")
.with_context(|| "generate")
})?;
Ok(PythonServerStreamingEngine::new( Ok(PythonServerStreamingEngine::new(
cancel_token, cancel_token,
Arc::new(generator), Arc::new(generator),
...@@ -127,16 +146,25 @@ fn run_asyncio(tx: Sender<Arc<PyObject>>) { ...@@ -127,16 +146,25 @@ fn run_asyncio(tx: Sender<Arc<PyObject>>) {
}); });
} }
fn python_file_to_module(p: &Path) -> Result<PyObject> { fn python_file_to_module(p: &Path, mut py_args: Vec<String>) -> Result<PyObject> {
if let Some(filename) = p.file_name() {
py_args.insert(0, filename.to_string_lossy().to_string());
};
let module: PyObject = Python::with_gil(|py| { let module: PyObject = Python::with_gil(|py| {
let globals = [("file_path", p.display().to_string())] let py_file_path: PyObject = p.display().to_string().into_pyobject(py).unwrap().into();
let py_sys_argv: PyObject = py_args.into_pyobject(py).unwrap().into();
let globals = [("file_path", py_file_path), ("sys_argv", py_sys_argv)]
.into_py_dict(py) .into_py_dict(py)
.unwrap(); .context("into_py_dict")?;
let locals = PyDict::new(py); let locals = PyDict::new(py);
py.run(PY_IMPORT, Some(&globals), Some(&locals)).unwrap(); py.run(PY_IMPORT, Some(&globals), Some(&locals))
let module = locals.get_item("module").unwrap().unwrap(); .context("PY_IMPORT")?;
module.extract().unwrap() let module = locals
}); .get_item("module")
.unwrap()
.context("get module after import")?;
module.extract().context("extract")
})?;
Ok(module) Ok(module)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment