Commit 12714d90 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat: Python bring-your-own-engine with our tokenizer (#47)

Instead of using `out=pystr:<my.py>` we can now do this:
```
dynemo-run out=pytok:/home/graham/my_python_engine.py --model-path <hf-repo-checkout>
```

That engine will receive and respond with tokens. Here's an example engine file:
```
import asyncio

async def generate(request):
    yield {"token_ids":[791]}
    await asyncio.sleep(0.1)
    yield {"token_ids":[6864]}
    await asyncio.sleep(0.1)
    yield {"token_ids":[315]}
    await asyncio.sleep(0.1)
    yield {"token_ids":[9822]}
    await asyncio.sleep(0.1)
    yield {"token_ids":[374]}
    await asyncio.sleep(0.1)
    yield {"token_ids":[12366]}
    await asyncio.sleep(0.1)
    yield {"token_ids":[13]}
```

Also reduce duplication by making the bindings engine use the llm lib engine.
parent d752a1a2
......@@ -156,21 +156,29 @@ Node 2:
dynemo-run in=none out=vllm ~/llm_models/Llama-3.2-3B-Instruct/ --num-nodes 2 --leader-addr 10.217.98.122:6539 --node-rank 1
```
## python
## Python bring-your-own-engine
You can provide your own engine in a Python file. The file must provide a generator with this signature:
```
async def generate(request):
```
Build: `cargo build --release --features python`
### Python does the pre-processing
If the Python engine wants to receive and returns strings - it will do the prompt templating and tokenization itself - run it like this:
```
dynemo-run out=pystr:/home/user/my_python_engine.py --name <model-name>
```
- The `request` parameter is a map, an OpenAI compatible create chat completion request: https://platform.openai.com/docs/api-reference/chat/create
- The function must `yield` a series of maps conforming to create chat completion stream response (example below).
- The `--name` flag is the name we serve the model under, if using an HTTP front end.
The file is loaded once at startup and kept in memory.
- Build: `cargo build --release --features python`
- Run: `dynemo-run out=pystr:/home/user/my_python_engine.py --name <model-name>`
Example engine:
```
import asyncio
......@@ -193,6 +201,44 @@ async def generate(request):
yield {"id":"1","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop"}],"created":1841762283,"model":"Llama-3.2-1B-Instruct","system_fingerprint":"local","object":"chat.completion.chunk"}
```
### Dynemo does the pre-processing
If the Python engine wants to receive and return tokens - the prompt templating and tokenization is already done - run it like this:
```
dynemo-run out=pytok:/home/user/my_python_engine.py --model-path <hf-repo-checkout>
```
- The request parameter is a map that looks like this:
```
{'token_ids': [128000, 128006, 9125, 128007, ... lots more ... ], 'stop_conditions': {'max_tokens': 8192, 'stop': None, 'stop_token_ids_hidden': [128001, 128008, 128009], 'min_tokens': None, 'ignore_eos': None}, 'sampling_options': {'n': None, 'best_of': None, 'presence_penalty': None, 'frequency_penalty': None, 'repetition_penalty': None, 'temperature': None, 'top_p': None, 'top_k': None, 'min_p': None, 'use_beam_search': None, 'length_penalty': None, 'seed': None}, 'eos_token_ids': [128001, 128008, 128009], 'mdc_sum': 'f1cd44546fdcbd664189863b7daece0f139a962b89778469e4cffc9be58ccc88', 'annotations': []}
```
- The `generate` function must `yield` a series of maps that look like this:
```
{"token_ids":[791],"tokens":None,"text":None,"cum_log_probs":None,"log_probs":None,"finish_reason":None}
```
- Command like flag `--model-path` which must point to a Hugging Face repo checkout containing the `tokenizer.json`. The `--name` flag is optional. If not provided we use the HF repo name (directory name) as the model name.
Example engine:
```
import asyncio
async def generate(request):
yield {"token_ids":[791]}
await asyncio.sleep(0.1)
yield {"token_ids":[6864]}
await asyncio.sleep(0.1)
yield {"token_ids":[315]}
await asyncio.sleep(0.1)
yield {"token_ids":[9822]}
await asyncio.sleep(0.1)
yield {"token_ids":[374]}
await asyncio.sleep(0.1)
yield {"token_ids":[12366]}
await asyncio.sleep(0.1)
yield {"token_ids":[13]}
```
## trtllm
......@@ -228,7 +274,7 @@ The `--model-path` you give to `dynemo-run` must contain the `config.json` (TRT-
+ Execute
TRT-LLM is a C++ library that must have been previously built and installed. It needs a lot of memory to compile. Gitlab builds a container you can try:
```
sudo docker run --gpus all -it -v /home/graham:/outside-home gitlab-master.nvidia.com:5005/dl/ai-services/libraries/rust/nim-nvllm/tensorrt_llm_runtime:85fa4a6f
sudo docker run --gpus all -it -v /home/user:/outside-home gitlab-master.nvidia.com:5005/dl/ai-services/libraries/rust/nim-nvllm/tensorrt_llm_runtime:85fa4a6f
```
Copy the trt-llm engine, the model's `.json` files (for the model deployment card) and the `nio` binary built for the correct glibc (container is Ubuntu 22.04 currently) into that container.
......@@ -47,6 +47,10 @@ const ENDPOINT_SCHEME: &str = "dyn://";
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";
/// How we identify a python token endpoint
#[cfg(feature = "python")]
const PYTHON_TOK_SCHEME: &str = "pytok:";
pub enum EngineConfig {
/// An remote networked engine we don't know about yet
/// We don't have the pre-processor yet so this is only text requests. Type will change later.
......@@ -345,12 +349,29 @@ pub async fn run(
anyhow::bail!("Provide model service name as `--model-name <this>`");
};
let p = std::path::PathBuf::from(path_str);
let engine = python::make_string_engine(&p).await?;
let engine = python::make_string_engine(cancel_token.clone(), &p).await?;
EngineConfig::StaticFull {
service_name: model_name,
engine,
}
}
#[cfg(feature = "python")]
Output::PythonTok(path_str) => {
use dynemo_llm::engines::python;
let Some(card) = maybe_card.clone() else {
anyhow::bail!("Could not find tokenizer. Pass flag --model-path <path>");
};
let Some(model_name) = model_name else {
unreachable!("If we have a card we must have a model name");
};
let p = std::path::PathBuf::from(path_str);
let engine = python::make_token_engine(cancel_token.clone(), &p).await?;
EngineConfig::StaticCore {
service_name: model_name.clone(),
engine,
card: Box::new(card),
}
}
};
match in_opt {
......
......@@ -41,7 +41,7 @@ const DEFAULT_OUT: Output = Output::EchoFull;
const ZMQ_SOCKET_PREFIX: &str = "dyn";
const USAGE: &str = "USAGE: dynemo-run in=[http|text|dyn://<path>|none] out=[mistralrs|sglang|llamacpp|vllm|trtllm|echo_full|echo_core|pystr:<engine.py>] [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0]";
const USAGE: &str = "USAGE: dynemo-run in=[http|text|dyn://<path>|none] out=[mistralrs|sglang|llamacpp|vllm|trtllm|echo_full|echo_core|pystr:<engine.py>|pytok:<engine.py>] [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0]";
fn main() -> anyhow::Result<()> {
logging::init();
......
......@@ -92,10 +92,15 @@ pub enum Output {
/// Run inference using trtllm
TrtLLM,
/// Run inference using a user supplied python file that accepts and return
/// strings (meaning it does it's own pre-processing).
/// Run inference using a user supplied python file that accepts and returns
/// strings. It does it's own pre-processing.
#[cfg(feature = "python")]
PythonStr(String),
/// Run inference using a user supplied python file that accepts and returns
/// tokens. We do the pre-processing.
#[cfg(feature = "python")]
PythonTok(String),
}
impl TryFrom<&str> for Output {
......@@ -134,6 +139,14 @@ impl TryFrom<&str> for Output {
Ok(Output::PythonStr(path.to_string()))
}
#[cfg(feature = "python")]
python_tok_gen if python_tok_gen.starts_with(crate::PYTHON_TOK_SCHEME) => {
let path = python_tok_gen
.strip_prefix(crate::PYTHON_TOK_SCHEME)
.unwrap();
Ok(Output::PythonTok(path.to_string()))
}
e => Err(anyhow::anyhow!("Invalid out= option '{e}'")),
}
}
......@@ -164,6 +177,9 @@ impl fmt::Display for Output {
#[cfg(feature = "python")]
Output::PythonStr(path) => path,
#[cfg(feature = "python")]
Output::PythonTok(path) => path,
};
write!(f, "{s}")
}
......
......@@ -984,6 +984,8 @@ dependencies = [
"minijinja-contrib",
"prometheus",
"pyo3",
"pyo3-async-runtimes",
"pythonize",
"regex",
"semver",
"serde",
......
......@@ -30,7 +30,7 @@ crate-type = ["cdylib"]
[dependencies]
dynemo-llm = { path = "../../llm" }
dynemo-llm = { path = "../../llm", features = ["python"] }
dynemo-runtime = { path = "../../runtime" }
futures = "0.3"
......
......@@ -15,23 +15,16 @@
use std::sync::Arc;
use dynemo_llm::engines::python::PythonServerStreamingEngine;
use dynemo_runtime::CancellationToken;
pub use dynemo_runtime::{
error,
pipeline::{
async_trait, AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream,
SingleIn,
},
pipeline::{async_trait, AsyncEngine, Data, ManyOut, SingleIn},
protocols::annotated::Annotated,
Error, Result,
};
pub use serde::{Deserialize, Serialize};
use pyo3::prelude::*;
use pyo3_async_runtimes::TaskLocals;
use pythonize::{depythonize, pythonize};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
/// Add bingings from this crate to the provided module
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
......@@ -39,22 +32,9 @@ pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
Ok(())
}
#[derive(Debug, thiserror::Error)]
enum ResponseProcessingError {
#[error("python exception: {0}")]
PythonException(String),
#[error("deserialize error: {0}")]
DeserializeError(String),
#[error("gil offload error: {0}")]
OffloadError(String),
}
// todos:
// - [ ] enable context cancellation
// - this will likely require a change to the function signature python calling arguments
// - [ ] rename `PythonAsyncEngine` to `PythonServerStreamingEngine` to be more descriptive
// - [ ] other `AsyncEngine` implementations will have a similar pattern, i.e. one AsyncEngine
// implementation per struct
......@@ -87,10 +67,7 @@ enum ResponseProcessingError {
/// ```
#[pyclass]
#[derive(Clone)]
pub struct PythonAsyncEngine {
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
}
pub struct PythonAsyncEngine(PythonServerStreamingEngine);
#[pymethods]
impl PythonAsyncEngine {
......@@ -106,10 +83,12 @@ impl PythonAsyncEngine {
/// and we would not know until runtime.
#[new]
pub fn new(generator: PyObject, event_loop: PyObject) -> PyResult<Self> {
Ok(PythonAsyncEngine {
generator: Arc::new(generator),
event_loop: Arc::new(event_loop),
})
let cancel_token = CancellationToken::new();
Ok(PythonAsyncEngine(PythonServerStreamingEngine::new(
cancel_token,
Arc::new(generator),
Arc::new(event_loop),
)))
}
}
......@@ -120,144 +99,6 @@ where
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
// Create a context
let (request, context) = request.transfer(());
let ctx = context.context();
let id = context.id().to_string();
tracing::trace!("processing request: {}", id);
// Clone the PyObject to move into the thread
// Create a channel to communicate between the Python thread and the Rust async context
let (tx, rx) = mpsc::channel::<Annotated<Resp>>(128);
let generator = self.generator.clone();
let event_loop = self.event_loop.clone();
// Acquiring the GIL is similar to acquiring a standard lock/mutex
// Performing this in an tokio async task could block the thread for an undefined amount of time
// To avoid this, we spawn a blocking task to acquire the GIL and perform the operations needed
// while holding the GIL.
//
// Under low GIL contention, we wouldn't need to do this.
// However, under high GIL contention, this can lead to significant performance degradation.
//
// Since we cannot predict the GIL contention, we will always use the blocking task and pay the
// cost. The Python GIL is the gift that keeps on giving -- performance hits...
let stream = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| {
let py_request = pythonize(py, &request)?;
let gen = generator.call1(py, (py_request,))?;
let locals = TaskLocals::new(event_loop.bind(py).clone());
pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py))
})
})
.await??;
let stream = Box::pin(stream);
// process the stream
// any error thrown in the stream will be caught and complete the processing task
// errors are captured by a task that is watching the processing task
// the error will be emitted as an annotated error
let request_id = id.clone();
tokio::spawn(async move {
tracing::debug!(
request_id,
"starting task to process python async generator stream"
);
let mut stream = stream;
let mut count = 0;
while let Some(item) = stream.next().await {
count += 1;
tracing::trace!(
request_id,
"processing the {}th item from python async generator",
count
);
let mut done = false;
let response = match process_item::<Resp>(item).await {
Ok(response) => response,
Err(e) => {
done = true;
let msg = match &e {
ResponseProcessingError::DeserializeError(e) => {
// tell the python async generator to stop generating
// right now, this is impossible as we are not passing the context to the python async generator
// todo: add task-local context to the python async generator
ctx.stop_generating();
let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e);
tracing::error!(request_id, "{}", msg);
msg
}
ResponseProcessingError::PythonException(e) => {
let msg = format!("a python exception was caught while processing the async generator: {}", e);
tracing::warn!(request_id, "{}", msg);
msg
}
ResponseProcessingError::OffloadError(e) => {
let msg = format!("critical error: failed to offload the python async generator to a new thread: {}", e);
tracing::error!(request_id, "{}", msg);
msg
}
};
Annotated::from_error(msg)
}
};
if tx.send(response).await.is_err() {
tracing::trace!(
request_id,
"error forwarding annotated response to channel; channel is closed"
);
break;
}
if done {
tracing::debug!(
request_id,
"early termination of python async generator stream task"
);
break;
}
}
tracing::debug!(
request_id,
"finished processing python async generator stream"
);
});
let stream = ReceiverStream::new(rx);
Ok(ResponseStream::new(Box::pin(stream), context.context()))
self.0.generate(request).await
}
}
async fn process_item<Resp>(
item: Result<Py<PyAny>, PyErr>,
) -> Result<Annotated<Resp>, ResponseProcessingError>
where
Resp: Data + for<'de> Deserialize<'de>,
{
let item = item.map_err(|e| ResponseProcessingError::PythonException(e.to_string()))?;
let response = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| depythonize::<Resp>(&item.into_bound(py)))
})
.await
.map_err(|e| ResponseProcessingError::OffloadError(e.to_string()))?
.map_err(|e| ResponseProcessingError::DeserializeError(e.to_string()))?;
let response = Annotated::from_data(response);
Ok(response)
}
......@@ -24,7 +24,7 @@ pub use dynemo_runtime::{
SingleIn,
},
protocols::annotated::Annotated,
Error, Result,
CancellationToken, Error, Result,
};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyDict};
......@@ -35,6 +35,7 @@ use tokio::sync::mpsc;
use tokio::sync::oneshot::Sender;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use crate::backend::ExecutionContext;
use crate::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
/// Python snippet to import a file as a module
......@@ -53,34 +54,63 @@ spec.loader.exec_module(module)
/// An engine that takes and returns strings, feeding them to a python written engine
pub async fn make_string_engine(
cancel_token: CancellationToken,
py_file: &Path,
) -> pipeline_error::Result<OpenAIChatCompletionsStreamingEngine> {
pyo3::prepare_freethreaded_python();
let engine = PythonStringEngine::new(py_file).await?;
let engine = new_engine(cancel_token, py_file).await?;
let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine);
Ok(engine)
}
struct PythonStringEngine {
_user_module: PyObject,
generator: Arc<Py<PyAny>>,
event_loop: Arc<Py<PyAny>>,
/// An engine that takes and returns tokens.
pub async fn make_token_engine(
cancel_token: CancellationToken,
py_file: &Path,
) -> pipeline_error::Result<ExecutionContext> {
pyo3::prepare_freethreaded_python();
let engine = new_engine(cancel_token, py_file).await?;
let engine: ExecutionContext = Arc::new(engine);
Ok(engine)
}
#[derive(Clone)]
pub struct PythonServerStreamingEngine {
_cancel_token: CancellationToken,
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
}
impl PythonStringEngine {
async fn new(py_file: &Path) -> anyhow::Result<Self> {
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(move || run_asyncio(tx));
let event_loop = rx.await?;
let user_module = python_file_to_module(py_file)?;
let generator = Python::with_gil(|py| user_module.getattr(py, "generate").unwrap());
Ok(PythonStringEngine {
_user_module: user_module,
generator: Arc::new(generator),
async fn new_engine(
cancel_token: CancellationToken,
py_file: &Path,
) -> anyhow::Result<PythonServerStreamingEngine> {
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(move || run_asyncio(tx));
let event_loop = rx.await?;
let user_module = python_file_to_module(py_file)?;
let generator = Python::with_gil(|py| user_module.getattr(py, "generate").unwrap());
Ok(PythonServerStreamingEngine::new(
cancel_token,
Arc::new(generator),
event_loop,
))
}
impl PythonServerStreamingEngine {
pub fn new(
cancel_token: CancellationToken,
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
) -> Self {
PythonServerStreamingEngine {
_cancel_token: cancel_token,
generator,
event_loop,
})
}
}
}
......@@ -123,7 +153,8 @@ enum ResponseProcessingError {
}
#[async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error> for PythonStringEngine
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error>
for PythonServerStreamingEngine
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment