Commit 418ae5e8 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat: Add `tio` your friendly cmd line uncle to run triton-llm services (#174)

This provides a simple example of how to write a triton-llm engine, and how to connect it to the OpenAI HTTP server.

This is the tool previously called `nio` and `llmctl`.

- **Inputs**: Text and HTTP.
- **Engines**: Echo, which streams your prompt back with a slight delay.

Build: `cargo build`

Pre-requisites: `nats-server` and `etcd` must be running locally, even though they are not yet used by `tio`.

Run with text input:
```
./target/debug/tio in=text out=echo_full --model-name test
```

Run with the triton-llm HTTP server:
```
./target/debug/tio in=http out=echo_full --http-port 8080 --model-name Echo-0B
```

List models:
```
curl localhost:8080/v1/models | jq
```

Will output
```
{
  "object": "list",
  "data": [
    {
      "id": "Echo-0B",
      "object": "object",
      "created": 1739400430,
      "owned_by": "nvidia"
    }
  ]
}
```

#### What's next

As triton-distributed gains features `tio` will be able to grow:
- When we get the pre-processor we can have token-in token-out engines. 
- When we get a pull-router we can have `in=nats` and `out=nats`.
- When we get discovery we can have dynamic engines.
parent 2fd6592f
...@@ -51,15 +51,21 @@ jobs: ...@@ -51,15 +51,21 @@ jobs:
key: ${{ runner.os }}-cargo-tools-${{ github.head_ref || github.ref_name }}-${{ hashFiles('**/Cargo.lock') }} key: ${{ runner.os }}-cargo-tools-${{ github.head_ref || github.ref_name }}-${{ hashFiles('**/Cargo.lock') }}
- name: Set up Rust Toolchain Components - name: Set up Rust Toolchain Components
run: rustup component add rustfmt clippy run: rustup component add rustfmt clippy
- name: Run Cargo Check - name: Run Cargo Check on runtime
working-directory: runtime/rust working-directory: runtime/rust
run: cargo check --locked run: cargo check --locked
- name: Run Cargo Check on tio
working-directory: apps/tio
run: cargo check --locked
- name: Verify Code Formatting - name: Verify Code Formatting
working-directory: runtime/rust working-directory: runtime/rust
run: cargo fmt -- --check run: cargo fmt -- --check
- name: Run Clippy Checks - name: Run Clippy Checks on runtime
working-directory: runtime/rust working-directory: runtime/rust
run: cargo clippy --no-deps --all-targets -- -D warnings run: cargo clippy --no-deps --all-targets -- -D warnings
- name: Run Clippy Checks on tio
working-directory: apps/tio
run: cargo clippy --no-deps --all-targets -- -D warnings
- name: Install and Run cargo-deny - name: Install and Run cargo-deny
working-directory: runtime/rust working-directory: runtime/rust
run: | run: |
......
This diff is collapsed.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "tio"
version = "0.1.0"
edition = "2021"
authors = ["NVIDIA"]
homepage = "https://github.com/triton-inference-server/triton_distributed"
[dependencies]
anyhow = "1"
async-stream = { version = "0.3" }
async-trait = { version = "0.1" }
clap = { version = "4.5", features = ["derive", "env"] }
dialoguer = { version = "0.11", default-features = false, features = ["editor", "history"] }
futures = { version = "0.3" }
futures-util = "0.3"
libc = { version = "0.2" }
netlink-packet-route = { version = "0.19", optional = true }
rtnetlink = { version = "0.14", optional = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec", "net"] }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "json"] }
triton-distributed = { path = "../../../../runtime/rust" }
triton-llm = { path = "../../../../llm/rust/triton-llm" }
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod http;
pub mod text;
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use triton_distributed::runtime::CancellationToken;
use triton_llm::http::service::service_v2;
use crate::EngineConfig;
/// Build and run an HTTP service
pub async fn run(
cancel_token: CancellationToken,
http_port: u16,
engine_config: EngineConfig,
) -> anyhow::Result<()> {
match engine_config {
EngineConfig::StaticFull {
service_name,
engine,
..
} => {
let http_service = service_v2::HttpService::builder()
.port(http_port)
.enable_chat_endpoints(true)
.enable_cmpl_endpoints(true)
.build()?;
http_service
.model_manager()
.add_chat_completions_model(&service_name, engine)?;
http_service.run(cancel_token).await
}
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::StreamExt;
use std::io::{ErrorKind, Read, Write};
use triton_distributed::{pipeline::Context, runtime::CancellationToken};
use triton_llm::{
protocols::openai::chat_completions::MessageRole,
types::openai::chat_completions::{
ChatCompletionRequest, OpenAIChatCompletionsStreamingEngine,
},
};
use crate::EngineConfig;
/// Max response tokens for each single query. Must be less than model context size.
const MAX_TOKENS: i32 = 8192;
/// Output of `isatty` if the fd is indeed a TTY
const IS_A_TTY: i32 = 1;
pub async fn run(
cancel_token: CancellationToken,
engine_config: EngineConfig,
) -> anyhow::Result<()> {
let (service_name, engine, inspect_template): (
String,
OpenAIChatCompletionsStreamingEngine,
bool,
) = match engine_config {
EngineConfig::StaticFull {
service_name,
engine,
} => {
tracing::info!("Model: {service_name}");
(service_name, engine, false)
}
};
// TODO: Acquire an etcd lease, we are running
main_loop(cancel_token, &service_name, engine, inspect_template).await
}
async fn main_loop(
cancel_token: CancellationToken,
service_name: &str,
engine: OpenAIChatCompletionsStreamingEngine,
inspect_template: bool,
) -> anyhow::Result<()> {
tracing::info!("Ctrl-c to exit");
let theme = dialoguer::theme::ColorfulTheme::default();
let mut initial_prompt = if unsafe { libc::isatty(libc::STDIN_FILENO) == IS_A_TTY } {
None
} else {
// Something piped in, use that as initial prompt
let mut input = String::new();
std::io::stdin().read_to_string(&mut input).unwrap();
Some(input)
};
let mut history = dialoguer::BasicHistory::default();
let mut messages = vec![];
while !cancel_token.is_cancelled() {
// User input
let prompt = match initial_prompt.take() {
Some(p) => p,
None => {
let input_ui = dialoguer::Input::<String>::with_theme(&theme)
.history_with(&mut history)
.with_prompt("User");
match input_ui.interact_text() {
Ok(prompt) => prompt,
Err(dialoguer::Error::IO(err)) => {
match err.kind() {
ErrorKind::Interrupted => {
// Ctrl-C
// Unfortunately I could not make dialoguer handle Ctrl-d
}
k => {
tracing::info!("IO error: {k}");
}
}
break;
}
}
}
};
messages.push((MessageRole::user, prompt.clone()));
// Request
let mut req_builder = ChatCompletionRequest::builder();
req_builder
.model(service_name)
.stream(true)
.max_tokens(MAX_TOKENS);
if inspect_template {
// This makes the pre-processor ignore stop tokens
req_builder.min_tokens(8192);
}
for (role, msg) in &messages {
match role {
MessageRole::user => {
req_builder.add_user_message(msg);
}
MessageRole::assistant => {
req_builder.add_assistant_message(msg);
}
x => panic!("Only 'user' and 'assistant' messages are supported, not {x}"),
}
}
let req = req_builder.build()?;
// Call the model
let mut stream = engine.generate(Context::new(req)).await?;
// Stream the output to stdout
let mut stdout = std::io::stdout();
let mut assistant_message = String::new();
while let Some(item) = stream.next().await {
let data = item.data.as_ref().unwrap();
let entry = data.choices.first();
let chat_comp = entry.as_ref().unwrap();
if let Some(c) = &chat_comp.delta.content {
let _ = stdout.write(c.as_bytes());
let _ = stdout.flush();
assistant_message += c;
}
if chat_comp.finish_reason.is_some() {
tracing::trace!("finish reason: {:?}", chat_comp.finish_reason.unwrap());
break;
}
}
println!();
messages.push((MessageRole::assistant, assistant_message));
}
println!();
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use triton_distributed::runtime::CancellationToken;
use triton_llm::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
mod input;
mod opt;
mod output;
pub use opt::{Input, Output};
/// Required options depend on the in and out choices
#[derive(clap::Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Flags {
/// HTTP port. `in=http` only
#[arg(long, default_value = "8080")]
pub http_port: u16,
/// The name of the model we are serving
/// Later that will come from the HF repo name, and still later from etcd during discovery
#[arg(long)]
pub model_name: String,
}
pub enum EngineConfig {
/// A Full service engine does it's own tokenization and prompt formatting.
StaticFull {
service_name: String,
engine: OpenAIChatCompletionsStreamingEngine,
},
}
pub async fn run(
in_opt: Input,
out_opt: Output,
flags: Flags,
cancel_token: CancellationToken,
) -> anyhow::Result<()> {
// Create the engine matching `out`
let engine_config = match out_opt {
Output::EchoFull => EngineConfig::StaticFull {
service_name: flags.model_name,
engine: output::echo_full::make_engine_full(),
},
};
match in_opt {
Input::Http => {
crate::input::http::run(cancel_token.clone(), flags.http_port, engine_config).await?;
}
Input::Text => {
crate::input::text::run(cancel_token.clone(), engine_config).await?;
}
}
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use clap::Parser;
use triton_distributed::logging;
const HELP: &str = r#"
triton-llm service runner stub
"#;
const USAGE: &str = "USAGE: tio in=[http|text] out=[echo_full] [--http-port 8080]";
fn main() -> anyhow::Result<()> {
logging::init();
// max_worker_threads and max_blocking_threads from env vars or config file.
let rt_config = triton_distributed::RuntimeConfig::from_settings()?;
// One per process. Wraps a Runtime with holds two tokio runtimes.
let worker = triton_distributed::Worker::from_config(rt_config)?;
worker.execute(tio_wrapper)
}
async fn tio_wrapper(runtime: triton_distributed::Runtime) -> anyhow::Result<()> {
let mut in_opt = None;
let mut out_opt = None;
let args: Vec<String> = env::args().skip(1).collect();
if args.is_empty() || args[0] == "-h" || args[0] == "--help" {
println!("{USAGE}");
println!("{HELP}");
return Ok(());
}
for arg in env::args().skip(1).take(2) {
let Some((in_out, val)) = arg.split_once('=') else {
anyhow::bail!("Argument missing '='. {USAGE}");
};
match in_out {
"in" => {
in_opt = Some(val.try_into()?);
}
"out" => {
out_opt = Some(val.try_into()?);
}
_ => {
anyhow::bail!("Invalid argument, must start with 'in' or 'out. {USAGE}");
}
}
}
let (Some(in_opt), Some(out_opt)) = (in_opt, out_opt) else {
anyhow::bail!("Missing 'in' or 'out'. {USAGE}");
};
// Clap skips the first argument expecting it to be the binary name, so add it back
let nio_flags =
tio::Flags::try_parse_from(["tio".to_string()].into_iter().chain(env::args().skip(3)))?;
// etcd and nats addresses, from env vars ETCD_ENDPOINTS and NATS_SERVER with localhost
// defaults
let dt_config = triton_distributed::distributed::DistributedConfig::from_settings();
// Wraps the Runtime (which wraps two tokio runtimes) and adds etcd and nats clients
let d_runtime = triton_distributed::DistributedRuntime::new(runtime, dt_config).await?;
tio::run(
in_opt,
out_opt,
nio_flags,
d_runtime.runtime().primary_token(),
)
.await
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
pub enum Input {
/// Run an OpenAI compatible HTTP server
Http,
/// Read prompt from stdin
Text,
}
impl TryFrom<&str> for Input {
type Error = anyhow::Error;
fn try_from(s: &str) -> anyhow::Result<Self> {
match s {
"http" => Ok(Input::Http),
"text" => Ok(Input::Text),
e => Err(anyhow::anyhow!("Invalid in= option '{e}'")),
}
}
}
impl fmt::Display for Input {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let s = match self {
Input::Http => "http",
Input::Text => "text",
};
write!(f, "{s}")
}
}
pub enum Output {
/// Accept un-preprocessed requests, echo the prompt back as the response
EchoFull,
}
impl TryFrom<&str> for Output {
type Error = anyhow::Error;
fn try_from(s: &str) -> anyhow::Result<Self> {
match s {
"echo_full" => Ok(Output::EchoFull),
e => Err(anyhow::anyhow!("Invalid out= option '{e}'")),
}
}
}
impl fmt::Display for Output {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let s = match self {
Output::EchoFull => "echo_full",
};
write!(f, "{s}")
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod echo_full;
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{sync::Arc, time::Duration};
use async_stream::stream;
use async_trait::async_trait;
use triton_distributed::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseStream};
use triton_distributed::pipeline::{Error, ManyOut, SingleIn};
use triton_distributed::protocols::annotated::Annotated;
use triton_llm::protocols::openai::chat_completions::FinishReason;
use triton_llm::protocols::openai::chat_completions::{
ChatCompletionRequest, ChatCompletionResponseDelta, Content,
};
use triton_llm::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
/// How long to sleep between echoed tokens.
/// 50ms gives us 20 tok/s.
const TOKEN_ECHO_DELAY: Duration = Duration::from_millis(50);
/// Engine that accepts un-preprocessed requests and echos the prompt back as the response
/// Useful for testing ingress such as service-http.
struct EchoEngineFull {}
pub fn make_engine_full() -> OpenAIChatCompletionsStreamingEngine {
Arc::new(EchoEngineFull {})
}
#[async_trait]
impl
AsyncEngine<
SingleIn<ChatCompletionRequest>,
ManyOut<Annotated<ChatCompletionResponseDelta>>,
Error,
> for EchoEngineFull
{
async fn generate(
&self,
incoming_request: SingleIn<ChatCompletionRequest>,
) -> Result<ManyOut<Annotated<ChatCompletionResponseDelta>>, Error> {
let (request, context) = incoming_request.transfer(());
let deltas = request.response_generator();
let ctx = context.context();
let req = request.messages.into_iter().last().unwrap();
let prompt = match req.content {
Content::Text(prompt) => prompt,
_ => {
anyhow::bail!("Invalid request content field, expected Content::Text");
}
};
let output = stream! {
let mut id = 1;
for c in prompt.chars() {
// we are returning characters not tokens, so speed up some
tokio::time::sleep(TOKEN_ECHO_DELAY/2).await;
let delta = deltas.create_choice(0, Some(c.to_string()), None, None);
yield Annotated{ id: Some(id.to_string()), data: Some(delta), event: None, comment: None };
id += 1;
}
let stop_delta = deltas.create_choice(0, None, Some(FinishReason::stop), None);
yield Annotated { id: Some(id.to_string()), data: Some(stop_delta), event: None, comment: None };
};
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment