Unverified Commit 0aa0768f authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Remove llama.cpp engine (#3499)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 552146f2
......@@ -2091,18 +2091,6 @@ dependencies = [
"uuid 1.18.1",
]
[[package]]
name = "dynamo-engine-llamacpp"
version = "0.5.1"
dependencies = [
"async-stream",
"dynamo-llm",
"dynamo-runtime",
"llama-cpp-2",
"tokio",
"tracing",
]
[[package]]
name = "dynamo-engine-mistralrs"
version = "0.5.1"
......@@ -2241,7 +2229,6 @@ dependencies = [
"async-trait",
"clap 4.5.48",
"dynamo-async-openai",
"dynamo-engine-llamacpp",
"dynamo-engine-mistralrs",
"dynamo-llm",
"dynamo-runtime",
......@@ -2434,26 +2421,6 @@ dependencies = [
"syn 2.0.106",
]
[[package]]
name = "enumflags2"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1027f7680c853e056ebcec683615fb6fbbc07dbaa13b4d5d9442b146ded4ecef"
dependencies = [
"enumflags2_derive",
]
[[package]]
name = "enumflags2_derive"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67c78a4d8fdf9953a5c9d458f9efe940fd97a0cab0941c075a813ac594733827"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "env_filter"
version = "0.1.3"
......@@ -2677,15 +2644,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959"
[[package]]
name = "find_cuda_helper"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9f9e65c593dd01ac77daad909ea4ad17f0d6d1776193fc8ea766356177abdad"
dependencies = [
"glob",
]
[[package]]
name = "fixedbitset"
version = "0.5.7"
......@@ -4284,33 +4242,6 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956"
[[package]]
name = "llama-cpp-2"
version = "0.1.122"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d574d1f43b31c9e3d0e3bf596a31fa628e4b66894eb5db4dfe78e861c7f74275"
dependencies = [
"enumflags2",
"llama-cpp-sys-2",
"thiserror 1.0.69",
"tracing",
"tracing-core",
]
[[package]]
name = "llama-cpp-sys-2"
version = "0.1.122"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e09bdf53b6f486ecaeb96b08cd8a9d9df162f2aafa37efb5b40cf421a419c755"
dependencies = [
"bindgen 0.72.1",
"cc",
"cmake",
"find_cuda_helper",
"glob",
"walkdir",
]
[[package]]
name = "llguidance"
version = "1.0.0"
......
......@@ -120,7 +120,7 @@ dynamo-build:
COPY deploy/ deploy/
ENV CARGO_TARGET_DIR=/workspace/target
RUN cargo build --release --locked --features llamacpp,cuda && \
RUN cargo build --release --locked --features cuda && \
cargo doc --no-deps
RUN cd /workspace/lib/bindings/python && \
......
......@@ -101,7 +101,7 @@ docker compose -f deploy/docker-compose.yml up -d
## 2. Select an engine
We publish Python wheels specialized for each of our supported engines: vllm, sglang, trtllm, and llama.cpp. The examples that follow use SGLang; continue reading for other engines.
We publish Python wheels specialized for each of our supported engines: vllm, sglang, and trtllm. The examples that follow use SGLang; continue reading for other engines.
```
uv venv venv
......
# llama.cpp engine for Dynamo
Usage:
```
# Install ai-dynamo llama.cpp backend (CPU Mode)
pip install "ai-dynamo[llama_cpp]"
# [Optional] To build llama.cpp for CUDA (needs a recent pip)
pip install -r --force-reinstall requirements.gpu.txt
python -m dynamo.llama_cpp --model-path /data/models/Qwen3-0.6B-Q8_0.gguf [args]
```
## Request Migration
You can enable [request migration](/docs/architecture/request_migration.md) to handle worker failures gracefully. Use the `--migration-limit` flag to specify how many times a request can be migrated to another worker:
```bash
python3 -m dynamo.llama_cpp ... --migration-limit=3
```
This allows a request to be migrated up to 3 times before failing. See the [Request Migration Architecture](/docs/architecture/request_migration.md) documentation for details on how this works.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
try:
from ._version import __version__
except Exception:
try:
from importlib.metadata import version as _pkg_version
__version__ = _pkg_version("ai-dynamo")
except Exception:
__version__ = "0.0.0+unknown"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from dynamo.llama_cpp.main import main
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Usage: `python -m dynamo.llama_cpp --model-path /data/models/Qwen3-0.6B-Q8_0.gguf [args]`
import argparse
import logging
import os
import sys
from typing import Optional
import uvloop
from llama_cpp import Llama
from dynamo.llm import ModelInput, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
from . import __version__
DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
DEFAULT_ENDPOINT = f"dyn://{DYN_NAMESPACE}.backend.generate"
configure_dynamo_logging()
class Config:
"""Command line parameters or defaults"""
namespace: str
component: str
endpoint: str
model_path: str
model_name: Optional[str]
context_length: int
migration_limit: int
@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
config = cmd_line_args()
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
endpoint = component.endpoint(config.endpoint)
await register_llm(
ModelInput.Tokens,
ModelType.Chat,
endpoint,
config.model_path,
config.model_name,
migration_limit=config.migration_limit,
)
# Initialize the engine
# For more parameters see:
# https://llama-cpp-python.readthedocs.io/en/latest/api-reference/#high-level-api
kwargs = {
"model_path": config.model_path,
"n_gpu_layers": -1, # GPU if we can
"n_threads": 16, # Otherwise give it some CPU
}
if config.context_length:
kwargs["n_ctx"] = config.context_length
engine = Llama(**kwargs)
await endpoint.serve_endpoint(RequestHandler(engine).generate)
class RequestHandler:
def __init__(self, engine):
self.engine_client = engine
async def generate(self, request):
gen = self.engine_client.create_chat_completion(
request["messages"], stream=True
)
# TODO this is a synchronous generator in an async method.
# Move it to a thread so it doesn't block the event loop.
for res in gen:
logging.debug(f"res: {res}")
yield res
def cmd_line_args():
parser = argparse.ArgumentParser(
description="llama.cpp server integrated with Dynamo LLM."
)
parser.add_argument(
"--version", action="version", version=f"Dynamo Backend llama.cpp {__version__}"
)
parser.add_argument(
"--model-path",
type=str,
required=True,
help="Path to a local GGUF file.",
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Default: {DEFAULT_ENDPOINT}",
)
parser.add_argument(
"--model-name",
type=str,
default="",
help="Name to serve the model under. Defaults to deriving it from model path.",
)
parser.add_argument(
"--context-length",
type=int,
default=None,
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
)
parser.add_argument(
"--migration-limit",
type=int,
default=0,
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
)
args = parser.parse_args()
config = Config()
config.model_path = args.model_path
if args.model_name:
config.model_name = args.model_name
else:
# This becomes an `Option` on the Rust side
config.model_name = None
endpoint_str = args.endpoint.replace("dyn://", "", 1)
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
logging.error(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
parsed_namespace, parsed_component_name, parsed_endpoint_name = endpoint_parts
config.namespace = parsed_namespace
config.component = parsed_component_name
config.endpoint = parsed_endpoint_name
config.context_length = args.context_length
config.migration_limit = args.migration_limit
return config
def main():
uvloop.run(worker())
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
llama-cpp-python -C cmake.args="-DGGML_CUDA=on"
uvloop
......@@ -2004,7 +2004,7 @@ class DynamoFrameworkInfo(NodeInfo):
Returns:
List of framework component module names
Example: ['dynamo.frontend', 'dynamo.planner', 'dynamo.vllm', 'dynamo.sglang', 'dynamo.llama_cpp']
Example: ['dynamo.frontend', 'dynamo.planner', 'dynamo.vllm', 'dynamo.sglang']
Note: Scans components/src/dynamo/... directory for modules with __init__.py files.
"""
......
# Dynamo Run
`dynamo-run` is a Rust binary that lets you easily run a model, explore the Dynamo components, and demonstrates the Rust API. It supports the `mistral.rs` and `llama.cpp` engines. `mistralrs` is the default engine.
`dynamo-run` is a Rust binary that lets you easily run a model, explore the Dynamo components, and demonstrates the Rust API. It supports the `mistral.rs` engines, as well as testing engines `echo` and `mocker`.
It is primarily for development and rapid prototyping. For production use we recommend the Python wrapped components, see the main project README.
......@@ -16,7 +16,6 @@ To adjust verbosity, use `-v` to enable debug logging or `-vv` to enable full tr
```bash
dynamo-run in=http out=mistralrs <model> -v # enables debug logging
dynamo-run in=text out=llamacpp <model> -vv # enables full trace logging
```
### Use model from Hugging Face
......@@ -303,7 +302,7 @@ The default delay is 10ms, which produces approximately 100 tokens per second.
`dynamo-run` can take a jsonl file full of prompts and evaluate them all:
```
dynamo-run in=batch:prompts.jsonl out=llamacpp <model>
dynamo-run in=batch:prompts.jsonl out=mistralrs <model>
```
The input file should look like this:
......
......@@ -14,14 +14,11 @@ description = "Dynamo Run CLI"
[features]
# Build with `--no-default-features` to disable these defaults
default = ["mistralrs", "llamacpp"]
default = ["mistralrs"]
mistralrs = ["dep:dynamo-engine-mistralrs"]
llamacpp = ["dep:dynamo-engine-llamacpp"]
cuda = ["dynamo-engine-llamacpp/cuda", "dynamo-engine-mistralrs/cuda"]
metal = ["dynamo-engine-llamacpp/metal", "dynamo-engine-mistralrs/metal"]
vulkan = ["dynamo-engine-llamacpp/vulkan"]
openmp = ["dynamo-engine-llamacpp/openmp"]
cuda = ["dynamo-engine-mistralrs/cuda"]
metal = ["dynamo-engine-mistralrs/metal"]
tokio-console = ["dynamo-runtime/tokio-console"]
......@@ -29,7 +26,6 @@ tokio-console = ["dynamo-runtime/tokio-console"]
dynamo-llm = { workspace = true }
dynamo-runtime = { workspace = true }
dynamo-engine-llamacpp = { path = "../../lib/engines/llamacpp", optional = true }
dynamo-engine-mistralrs = { path = "../../lib/engines/mistralrs", optional = true }
anyhow = { workspace = true }
......
......@@ -35,7 +35,7 @@ fn has_cuda_toolkit() -> bool {
}
fn is_cuda_engine() -> bool {
has_feature("mistralrs") || has_feature("llamacpp")
has_feature("mistralrs")
}
#[cfg(target_os = "macos")]
......
......@@ -8,7 +8,6 @@ use clap::ValueEnum;
use dynamo_llm::entrypoint::RouterConfig;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_llm::local_model::LocalModel;
use dynamo_llm::mocker::protocols::MockEngineArgs;
use dynamo_runtime::pipeline::RouterMode as RuntimeRouterMode;
......@@ -52,14 +51,6 @@ pub struct Flags {
#[arg(short = 'v', action = clap::ArgAction::Count, default_value_t = 0)]
pub verbosity: u8,
/// llamacpp only
///
/// The path to the tokenizer and model config because:
/// - our engine is a 'core' engine in that we do the tokenization, so we need the vocab
/// - TODO: we don't yet extract that from the GGUF. Once we do we can remove this flag.
#[arg(long)]
pub model_config: Option<PathBuf>,
/// If using `out=dyn` with multiple instances, this says how to route the requests.
///
/// Mostly interesting for KV-aware routing.
......@@ -145,12 +136,7 @@ pub struct Flags {
impl Flags {
/// For each Output variant, check if it would be able to run.
/// This takes validation out of the main engine creation path.
pub fn validate(
&self,
local_model: &LocalModel,
in_opt: &Input,
out_opt: &Output,
) -> anyhow::Result<()> {
pub fn validate(&self, in_opt: &Input, out_opt: &Output) -> anyhow::Result<()> {
match in_opt {
Input::Endpoint(_) => {}
_ => {
......@@ -194,14 +180,6 @@ impl Flags {
Output::Echo => {}
#[cfg(feature = "mistralrs")]
Output::MistralRs => {}
#[cfg(feature = "llamacpp")]
Output::LlamaCpp => {
if !local_model.path().is_file() {
anyhow::bail!(
"--model-path should refer to a GGUF file. llama_cpp does not support safetensors."
);
}
}
Output::Mocker => {
// nothing to check here
}
......
......@@ -5,7 +5,6 @@ use anyhow::Context as _;
use dynamo_llm::entrypoint::EngineConfig;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::CancellationToken;
use dynamo_runtime::distributed::DistributedConfig;
use dynamo_runtime::{DistributedRuntime, Runtime};
......@@ -35,7 +34,6 @@ pub async fn run(
.or(flags.model_path_flag.clone()),
)
.model_name(flags.model_name.clone())
.model_config(flags.model_config.clone())
.kv_cache_block_size(flags.kv_cache_block_size)
// Only set if user provides. Usually loaded from tokenizer_config.json
.context_length(flags.context_length)
......@@ -72,20 +70,12 @@ pub async fn run(
print_cuda(&out_opt);
// Now that we know the output we're targeting, check if we expect it to work
flags.validate(&local_model, &in_opt, &out_opt)?;
flags.validate(&in_opt, &out_opt)?;
// Make an engine from the local_model, flags and output.
let engine_config = engine_for(
runtime.primary_token(),
out_opt,
flags.clone(),
local_model,
rt.clone(),
)
.await?;
//
// Run in from an input
//
let engine_config = engine_for(out_opt, flags.clone(), local_model, rt.clone()).await?;
// Run it from an input
dynamo_llm::entrypoint::input::run_input(rt, in_opt, engine_config).await?;
Ok(())
......@@ -94,7 +84,6 @@ pub async fn run(
/// Create the engine matching `out_opt`
/// Note validation happens in Flags::validate. In here assume everything is going to work.
async fn engine_for(
cancel_token: CancellationToken,
out_opt: Output,
flags: Flags,
local_model: LocalModel,
......@@ -120,12 +109,6 @@ async fn engine_for(
model: Box::new(local_model),
is_static: flags.static_worker,
}),
#[cfg(feature = "llamacpp")]
Output::LlamaCpp => Ok(EngineConfig::StaticCore {
engine: dynamo_engine_llamacpp::make_engine(cancel_token, &local_model).await?,
model: Box::new(local_model),
is_static: flags.static_worker,
}),
Output::Mocker => {
let Either::Right(drt) = rt else {
panic!("Mocker requires a distributed runtime to run.");
......@@ -146,18 +129,16 @@ async fn engine_for(
}
}
/// If the user will benefit from CUDA/Metal/Vulkan, remind them to build with it.
/// If the user will benefit from CUDA or Metal, remind them to build with it.
/// If they have it, celebrate!
// Only mistralrs and llamacpp need to be built with CUDA.
// Only mistralrs needs to be built with CUDA.
// The Python engines only need it at runtime.
#[cfg(any(feature = "mistralrs", feature = "llamacpp"))]
#[cfg(feature = "mistralrs")]
fn print_cuda(output: &Output) {
// These engines maybe be compiled in, but are they the chosen one?
match output {
#[cfg(feature = "mistralrs")]
Output::MistralRs => {}
#[cfg(feature = "llamacpp")]
Output::LlamaCpp => {}
_ => {
return;
}
......@@ -171,15 +152,11 @@ fn print_cuda(output: &Output) {
{
tracing::info!("Metal on");
}
#[cfg(feature = "vulkan")]
{
tracing::info!("Vulkan on");
}
#[cfg(not(any(feature = "cuda", feature = "metal", feature = "vulkan")))]
tracing::info!("CPU mode. Rebuild with `--features cuda|metal|vulkan` for better performance");
#[cfg(not(any(feature = "cuda", feature = "metal")))]
tracing::info!("CPU mode. Rebuild with `--features cuda|metal` for better performance");
}
#[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))]
#[cfg(not(feature = "mistralrs"))]
fn print_cuda(_output: &Output) {}
fn default_engine_for(_local_model: &LocalModel) -> Output {
......
......@@ -21,10 +21,6 @@ pub enum Output {
#[cfg(feature = "mistralrs")]
MistralRs,
#[cfg(feature = "llamacpp")]
/// Run inference using llama.cpp
LlamaCpp,
Mocker,
}
......@@ -36,9 +32,6 @@ impl TryFrom<&str> for Output {
#[cfg(feature = "mistralrs")]
"mistralrs" => Ok(Output::MistralRs),
#[cfg(feature = "llamacpp")]
"llamacpp" | "llama_cpp" => Ok(Output::LlamaCpp),
"mocker" => Ok(Output::Mocker),
"echo" | "echo_full" => Ok(Output::Echo),
......@@ -60,9 +53,6 @@ impl fmt::Display for Output {
#[cfg(feature = "mistralrs")]
Output::MistralRs => "mistralrs",
#[cfg(feature = "llamacpp")]
Output::LlamaCpp => "llamacpp",
Output::Mocker => "mocker",
Output::Echo => "echo",
......@@ -81,12 +71,6 @@ impl Output {
{
out.push(Output::MistralRs.to_string());
}
#[cfg(feature = "llamacpp")]
{
out.push(Output::LlamaCpp.to_string());
}
out
}
}
......@@ -16,9 +16,6 @@
# - "sglang", "vllm", "trtllm", "echo": An LLM worker.
#
# Must be in a virtualenv with the Dynamo bindings (or wheel) installed.
#
# There is no provided llama.cpp engine here, but there is one in components/llama_cpp/. It would be
# easy enough to copy the few Python lines from there to here and add an `out=llama_cpp`.
import argparse
import asyncio
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
[package]
name = "dynamo-engine-llamacpp"
version.workspace = true
edition.workspace = true
description.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
keywords.workspace = true
[features]
default = []
cuda = ["llama-cpp-2/cuda"]
metal = ["llama-cpp-2/metal"]
vulkan = ["llama-cpp-2/vulkan"]
openmp = ["llama-cpp-2/openmp"]
[dependencies]
dynamo-runtime = { workspace = true }
dynamo-llm = { workspace = true }
async-stream = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
# default-features = false to disable openmp because that needs libgomp1 installed
llama-cpp-2 = { version = "0.1.108", default-features = false }
# llamacpp engine for dynamo
The [`llama-cpp-2`](https://crates.io/crates/llama-cpp-2) project used to build a Dynamo backend
requires that clang is installed on your system. See [bindgen user guide](https://rust-lang.github.io/rust-bindgen/requirements.html)
for more details.
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::{
num::NonZeroU32,
path::Path,
sync::{Arc, Mutex, Once, OnceLock},
};
use async_stream::stream;
use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseStream};
use dynamo_runtime::pipeline::error as pipeline_error;
use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn, async_trait};
use dynamo_runtime::protocols::annotated::Annotated;
use dynamo_runtime::{CancellationToken, ErrorContext, Result};
use llama_cpp_2::{
LogOptions,
context::{LlamaContext, params::LlamaContextParams},
llama_backend::LlamaBackend,
llama_batch::LlamaBatch,
model::{LlamaModel, params::LlamaModelParams},
sampling::LlamaSampler,
token::LlamaToken,
};
use dynamo_llm::protocols::common::llm_backend::LLMEngineOutput;
use dynamo_llm::protocols::common::preprocessor::PreprocessedRequest;
use dynamo_llm::{backend::ExecutionContext, local_model::LocalModel};
/// If user does not provide a max_tokens limit prompt+output to this many
const DEFAULT_MAX_TOKENS: u32 = 8192;
static LLAMA_BACKEND: tokio::sync::OnceCell<LlamaBackend> = tokio::sync::OnceCell::const_new();
pub(crate) static LLAMA_MODEL: tokio::sync::OnceCell<LlamaModel> =
tokio::sync::OnceCell::const_new();
const NUM_CONTEXTS: usize = 3;
static LLAMA_CONTEXTS: [OnceLock<Mutex<ContextWrapper>>; NUM_CONTEXTS] =
[OnceLock::new(), OnceLock::new(), OnceLock::new()];
static LLAMA_CPP_LOG_REDIRECT: Once = Once::new();
// Newtype to simplify LlamaContext lifetime
#[derive(Debug)]
struct ContextWrapper(LlamaContext<'static>);
unsafe impl Send for ContextWrapper {} // LlamaContext has a NonNull which is !Send
unsafe impl Sync for ContextWrapper {} // LlamaContext has a NonNull which is !Sync
pub async fn make_engine(
cancel_token: CancellationToken,
model: &LocalModel,
) -> pipeline_error::Result<ExecutionContext> {
let engine = LlamacppEngine::new(cancel_token, model).await?;
let engine: ExecutionContext = Arc::new(engine);
Ok(engine)
}
struct WorkRequest {
request: PreprocessedRequest,
response_channel: tokio::sync::mpsc::Sender<Annotated<LLMEngineOutput>>,
}
struct LlamacppEngine {
cancel_token: CancellationToken,
req_tx: tokio::sync::mpsc::Sender<WorkRequest>,
}
impl LlamacppEngine {
async fn new(
cancel_token: CancellationToken,
model_config: &LocalModel,
) -> pipeline_error::Result<Self> {
LLAMA_CPP_LOG_REDIRECT.call_once(|| {
llama_cpp_2::send_logs_to_tracing(LogOptions::default().with_logs_enabled(true));
});
let backend = LlamaBackend::init()?;
let model = load_model(&backend, model_config.path())?;
LLAMA_MODEL.set(model)?;
let (ctx_set, ctx_get) = tokio::sync::mpsc::channel(NUM_CONTEXTS);
let llama_ctx_params = if model_config.card().context_length > 0 {
let n_ctx = NonZeroU32::new(model_config.card().context_length);
LlamaContextParams::default().with_n_ctx(n_ctx)
} else {
// Context length defaults to 512 currently
LlamaContextParams::default()
};
for (i, ctx_holder) in LLAMA_CONTEXTS.iter().enumerate().take(NUM_CONTEXTS) {
let llama_ctx = LLAMA_MODEL
.get()
.unwrap() // Safety: We put it in a few lines up
.new_context(&backend, llama_ctx_params.clone())
.with_context(|| "unable to create the llama_context")?;
let _ = ctx_holder.set(Mutex::new(ContextWrapper(llama_ctx)));
let _ = ctx_set.send(i).await;
}
LLAMA_BACKEND.set(backend)?;
let (req_tx, req_rx) = tokio::sync::mpsc::channel(2);
let ct = cancel_token.clone();
tokio::task::spawn(worker(ct, req_rx, ctx_get, ctx_set));
Ok(LlamacppEngine {
cancel_token,
req_tx,
})
}
}
fn load_model(backend: &LlamaBackend, model_path: &Path) -> Result<LlamaModel> {
let model_params = {
if cfg!(any(feature = "cuda", feature = "vulkan")) {
LlamaModelParams::default().with_n_gpu_layers(1000)
} else {
LlamaModelParams::default()
}
};
LlamaModel::load_from_file(backend, model_path, &model_params)
.with_context(|| "unable to load model")
}
#[async_trait]
impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutput>>, Error>
for LlamacppEngine
{
async fn generate(
&self,
request: SingleIn<PreprocessedRequest>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
let (request, context) = request.into_parts();
let ctx = context.context();
let request_id = ctx.id().to_string();
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let work_request = WorkRequest {
request,
response_channel: tx,
};
self.req_tx.send(work_request).await?;
let cancel_token = self.cancel_token.clone();
let output = stream! {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
tracing::trace!(request_id, "LlamacppEngine.generate stopped by cancel token");
break;
}
from_llamacpp = rx.recv() => {
match from_llamacpp {
Some(out) => {
yield out;
},
None => {
tracing::trace!(request_id, "generate: response channel closed");
break;
}
}
}
}
}
};
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}
// Run this in a thread
async fn worker(
cancel_token: CancellationToken,
mut req_rx: tokio::sync::mpsc::Receiver<WorkRequest>,
mut ctx_get: tokio::sync::mpsc::Receiver<usize>,
ctx_set: tokio::sync::mpsc::Sender<usize>,
) {
loop {
let maybe_work_request = tokio::select! {
_ = cancel_token.cancelled() => {
break;
}
maybe_work_request = req_rx.recv() => {
maybe_work_request
}
};
let Some(work_request) = maybe_work_request else {
tracing::error!("llamacpp work request sender channel closed. Worker exit");
break;
};
// will block if there are already NUM_CONTEXTS requests in flight
let Some(ctx_pos) = ctx_get.recv().await else {
unreachable!("We don't close ctx_set");
};
let ct = cancel_token.clone();
let inner_ctx_set = ctx_set.clone();
tokio::task::spawn_blocking(move || {
let mut ctx = LLAMA_CONTEXTS[ctx_pos].get().unwrap().lock().unwrap();
if let Err(err) = run_request(ct, work_request, &mut ctx) {
tracing::error!("run_request error: {err:#}");
}
let _ = inner_ctx_set.blocking_send(ctx_pos);
});
}
}
fn run_request(
cancel_token: CancellationToken,
work_request: WorkRequest,
llama_context: &mut ContextWrapper,
) -> Result<()> {
let tokens_list: Vec<LlamaToken> = work_request
.request
.token_ids
.into_iter()
.map(|u| LlamaToken::new(u as i32))
.collect();
let limit = DEFAULT_MAX_TOKENS; // - prompt_tokens;
let max_output_tokens = std::cmp::min(
work_request
.request
.stop_conditions
.max_tokens
.unwrap_or(limit),
limit,
);
// we use this object to submit token data for decoding
let mut batch = LlamaBatch::new(std::cmp::max(512, max_output_tokens as usize), 1);
let last_index: i32 = (tokens_list.len() - 1) as i32;
for (i, token) in (0_i32..).zip(tokens_list.into_iter()) {
// llama_decode will output logits only for the last token of the prompt
let is_last = i == last_index;
batch
.add(token, i, &[0], is_last)
.with_context(|| format!("Failed adding token pos {i} to batch"))?;
}
// "decode" means "run forward pass"
llama_context
.0
.decode(&mut batch)
.with_context(|| "llama_decode failed on first pass")?;
let mut sampler = LlamaSampler::greedy();
let mut n_cur = batch.n_tokens() as u32;
let mut used_output_tokens = 0;
while !cancel_token.is_cancelled() {
// sample the next token
let n_tokens = batch.n_tokens();
let token = sampler.sample(&llama_context.0, n_tokens - 1);
sampler.accept(token);
// is it an end of stream?
// This is probably safe for concurrent access
if LLAMA_MODEL.get().unwrap().is_eog_token(token) {
work_request
.response_channel
.blocking_send(Annotated::from_data(LLMEngineOutput::stop()))
.with_context(|| "Failed sending stop to response_channel")?;
break;
}
let engine_out = LLMEngineOutput {
// todo - propagate mdcsum
token_ids: vec![token.0 as u32],
tokens: None,
text: None,
//text: if output.text.is_empty() { None } else { Some(output.text) },
cum_log_probs: None, // TODO output.cumulative_logprob.map(|v| v as f64),
log_probs: None, // TODO output.logprobs
top_logprobs: None,
finish_reason: None,
index: None,
disaggregated_params: None,
extra_args: None,
};
work_request
.response_channel
.blocking_send(Annotated::from_data(engine_out))
.with_context(|| "Failed forwarding engine output to response_channel")?;
batch.clear();
if let Err(err) = batch.add(token, n_cur as i32, &[0], true) {
let err_msg = format!(
"batch add error, probably insufficient space in buffer, aborting request. {err}."
);
tracing::error!(err_msg);
let _ = work_request
.response_channel
.blocking_send(Annotated::from_data(LLMEngineOutput::error(err_msg)));
break;
}
n_cur += 1;
used_output_tokens += 1;
if used_output_tokens > max_output_tokens {
let _ = work_request
.response_channel
.blocking_send(Annotated::from_data(LLMEngineOutput::length()));
break;
}
llama_context
.0
.decode(&mut batch)
.with_context(|| "llama_decode failed during loop")?;
}
if cancel_token.is_cancelled() {
let _ = work_request
.response_channel
.blocking_send(Annotated::from_data(LLMEngineOutput::stop()));
}
// Clean context for next use
llama_context.0.clear_kv_cache();
llama_context.0.reset_timings();
Ok(())
}
......@@ -63,11 +63,6 @@ sglang = [
"sglang[all]==0.5.3rc0",
]
llama_cpp = [
"uvloop",
"llama-cpp-python",
]
[dependency-groups]
docs = [
# Core Sphinx
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment