"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "dd382e0fe377b839189cd27db93f11a6cfe35250"
Unverified Commit ac60a0e1 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Missed cleanup, delete batch mode (#7873)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent f849e1a6
...@@ -1936,7 +1936,6 @@ dependencies = [ ...@@ -1936,7 +1936,6 @@ dependencies = [
"futures-util", "futures-util",
"galil-seiferas", "galil-seiferas",
"hf-hub", "hf-hub",
"humantime",
"image", "image",
"indicatif 0.18.4", "indicatif 0.18.4",
"insta", "insta",
......
...@@ -1584,7 +1584,6 @@ dependencies = [ ...@@ -1584,7 +1584,6 @@ dependencies = [
"futures-util", "futures-util",
"galil-seiferas", "galil-seiferas",
"hf-hub", "hf-hub",
"humantime",
"image", "image",
"json-five", "json-five",
"minijinja", "minijinja",
......
...@@ -1597,7 +1597,6 @@ dependencies = [ ...@@ -1597,7 +1597,6 @@ dependencies = [
"futures-util", "futures-util",
"galil-seiferas", "galil-seiferas",
"hf-hub", "hf-hub",
"humantime",
"image", "image",
"json-five", "json-five",
"memfile", "memfile",
......
...@@ -68,7 +68,6 @@ either = { workspace = true } ...@@ -68,7 +68,6 @@ either = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
futures-util = { workspace = true } futures-util = { workspace = true }
hf-hub = { workspace = true } hf-hub = { workspace = true }
humantime = { workspace = true } # input/batch
rand = { workspace = true } rand = { workspace = true }
oneshot = { workspace = true } oneshot = { workspace = true }
parking_lot = { workspace = true } parking_lot = { workspace = true }
......
...@@ -10,11 +10,9 @@ ...@@ -10,11 +10,9 @@
use std::{ use std::{
fmt, fmt,
io::{IsTerminal as _, Read as _}, io::{IsTerminal as _, Read as _},
path::PathBuf,
str::FromStr, str::FromStr,
}; };
pub mod batch;
mod common; mod common;
pub use common::{build_routed_pipeline, build_routed_pipeline_with_preprocessor}; pub use common::{build_routed_pipeline, build_routed_pipeline_with_preprocessor};
pub mod endpoint; pub mod endpoint;
...@@ -24,8 +22,6 @@ pub mod text; ...@@ -24,8 +22,6 @@ pub mod text;
use dynamo_runtime::protocols::ENDPOINT_SCHEME; use dynamo_runtime::protocols::ENDPOINT_SCHEME;
const BATCH_PREFIX: &str = "batch:";
/// The various ways of connecting prompts to an engine /// The various ways of connecting prompts to an engine
#[derive(PartialEq)] #[derive(PartialEq)]
pub enum Input { pub enum Input {
...@@ -41,9 +37,6 @@ pub enum Input { ...@@ -41,9 +37,6 @@ pub enum Input {
/// Pull requests from a namespace/component/endpoint path. /// Pull requests from a namespace/component/endpoint path.
Endpoint(String), Endpoint(String),
/// Batch mode. Run all the prompts, write the outputs, exit.
Batch(PathBuf),
// Run an KServe compatible gRPC server // Run an KServe compatible gRPC server
Grpc, Grpc,
} }
...@@ -68,10 +61,6 @@ impl TryFrom<&str> for Input { ...@@ -68,10 +61,6 @@ impl TryFrom<&str> for Input {
endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => { endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => {
Ok(Input::Endpoint(endpoint_path.to_string())) Ok(Input::Endpoint(endpoint_path.to_string()))
} }
batch_patch if batch_patch.starts_with(BATCH_PREFIX) => {
let path = batch_patch.strip_prefix(BATCH_PREFIX).unwrap();
Ok(Input::Batch(PathBuf::from(path)))
}
e => Err(anyhow::anyhow!("Invalid in= option '{e}'")), e => Err(anyhow::anyhow!("Invalid in= option '{e}'")),
} }
} }
...@@ -85,7 +74,6 @@ impl fmt::Display for Input { ...@@ -85,7 +74,6 @@ impl fmt::Display for Input {
Input::Text => "text", Input::Text => "text",
Input::Stdin => "stdin", Input::Stdin => "stdin",
Input::Endpoint(path) => path, Input::Endpoint(path) => path,
Input::Batch(path) => &path.display().to_string(),
}; };
write!(f, "{s}") write!(f, "{s}")
} }
...@@ -136,9 +124,6 @@ pub async fn run_input( ...@@ -136,9 +124,6 @@ pub async fn run_input(
std::io::stdin().read_to_string(&mut prompt).unwrap(); std::io::stdin().read_to_string(&mut prompt).unwrap();
text::run(drt, Some(prompt), engine_config).await?; text::run(drt, Some(prompt), engine_config).await?;
} }
Input::Batch(path) => {
batch::run(drt, path, engine_config).await?;
}
Input::Endpoint(path) => { Input::Endpoint(path) => {
endpoint::run(drt, path, engine_config).await?; endpoint::run(drt, path, engine_config).await?;
} }
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::preprocessor::OpenAIPreprocessor;
use crate::request_template::RequestTemplate;
use crate::types::openai::chat_completions::{
NvCreateChatCompletionRequest, OpenAIChatCompletionsStreamingEngine,
};
use anyhow::Context as _;
use dynamo_protocols::types::{ChatCompletionMessageContent, FinishReason};
use dynamo_runtime::{DistributedRuntime, pipeline::Context, runtime::CancellationToken};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::cmp;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use crate::entrypoint::EngineConfig;
use crate::entrypoint::input::common;
/// Max tokens in each response.
/// TODO: For batch mode this should be the full context size of the model
const MAX_TOKENS: u32 = 8192;
const OUTPUT_FILENAME: &str = "output.jsonl";
#[derive(Serialize, Deserialize, Default, Debug)]
struct Entry {
// The input files only have this
text: String,
response: Option<String>,
#[serde(default)]
tokens_in: usize,
#[serde(default)]
tokens_out: usize,
#[serde(default)]
elapsed_ms: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
finish_reason: Option<FinishReason>,
#[serde(skip, default)]
request_id: usize,
}
pub async fn run(
distributed_runtime: DistributedRuntime,
input_jsonl: PathBuf,
engine_config: EngineConfig,
) -> anyhow::Result<()> {
let cancel_token = distributed_runtime.primary_token();
// Check if the path exists and is a directory
if !input_jsonl.exists() || !input_jsonl.is_file() {
anyhow::bail!(
"Missing or not a file: {}. Should be a JSON Lines file.",
input_jsonl.display()
);
}
let mut prepared_engine = common::prepare_engine(distributed_runtime, engine_config).await?;
let pre_processor = if prepared_engine.has_tokenizer() {
Some(OpenAIPreprocessor::new(
prepared_engine.card.take().unwrap(),
)?)
} else {
None
};
let (done_entries_tx, done_entries_rx) = tokio::sync::mpsc::channel(64);
let dw_cancel_token = cancel_token.clone();
let mut output_file = input_jsonl.clone();
output_file.set_file_name(OUTPUT_FILENAME);
tokio::spawn(async move {
if let Err(err) = output_writer(dw_cancel_token, done_entries_rx, &output_file).await {
tracing::error!(%err, "Failed writing output to {}", output_file.display());
}
});
let service_name_ref = Arc::new(prepared_engine.service_name);
let tokens_in = Arc::new(AtomicU64::new(0));
let tokens_out = Arc::new(AtomicU64::new(0));
let mut handles = vec![];
let mut num_entries = 0;
let input_file = tokio::fs::File::open(&input_jsonl)
.await
.with_context(|| input_jsonl.display().to_string())?;
let buffered_input = tokio::io::BufReader::new(input_file);
tracing::info!("Timer start.");
let start = Instant::now();
let mut lines = buffered_input.lines();
let template: Option<Arc<RequestTemplate>> = prepared_engine.request_template.map(Arc::new);
while let Ok(Some(line)) = lines.next_line().await {
if cancel_token.is_cancelled() {
break;
}
if line.is_empty() {
continue;
}
let request_id = num_entries;
num_entries += 1;
let mut entry: Entry = match serde_json::from_str(&line) {
Ok(entry) => entry,
Err(err) => {
anyhow::bail!("Error parsing entry: '{line}'. {err}");
}
};
entry.request_id = request_id;
let engine = prepared_engine.engine.clone();
let pre_processor = pre_processor.clone();
let tokens_in = tokens_in.clone();
let tokens_out = tokens_out.clone();
let done_entries_tx = done_entries_tx.clone();
let service_name_ref = service_name_ref.clone();
let template_clone = template.clone();
let handle = tokio::spawn(async move {
let local_start = Instant::now();
let response = match evaluate(
request_id,
service_name_ref.as_str(),
engine,
&mut entry,
template_clone,
)
.await
{
Ok(r) => r,
Err(err) => {
tracing::error!(%err, entry.text, "Failed evaluating prompt");
return;
}
};
let local_elapsed = Instant::now() - local_start;
entry.elapsed_ms = local_elapsed.as_millis() as usize;
if let Some(pre) = pre_processor {
// Note this does not include the prompt template. Probably TODO
entry.tokens_in = match pre.tokenize(&entry.text) {
Ok(encoding) => encoding.token_ids().len(),
Err(err) => {
tracing::warn!(%err, entry.text, "Failed tokenizing prompt");
0
}
};
entry.tokens_out = match pre.tokenize(&response) {
Ok(encoding) => encoding.token_ids().len(),
Err(err) => {
tracing::warn!(%err, response, "Failed tokenizing response");
0
}
};
tokens_in.fetch_add(entry.tokens_in as u64, Ordering::Relaxed);
tokens_out.fetch_add(entry.tokens_out as u64, Ordering::Relaxed);
}
entry.response = Some(response);
let _ = done_entries_tx.send(entry).await;
});
handles.push(handle);
}
tokio::select! {
_ = cancel_token.cancelled() => {
// Don't print stats
return Ok(());
}
_ = futures::future::join_all(handles) => {
}
}
let elapsed = Instant::now() - start;
let elapsed_clean = Duration::from_millis(elapsed.as_millis() as u64);
let tokens_in = Arc::into_inner(tokens_in).unwrap().into_inner();
let tokens_out = Arc::into_inner(tokens_out).unwrap().into_inner();
tokio::time::sleep(Duration::from_millis(1)).await; // Let output_writer finish stdout write
tracing::info!(
"Ran {} files in {}. Tokens in: {} ({}/s). Tokens out: {} ({}/s)",
num_entries,
humantime::format_duration(elapsed_clean),
tokens_in,
tokens_in / cmp::max(elapsed.as_secs(), 1),
tokens_out,
tokens_out / cmp::max(elapsed.as_secs(), 1),
);
cancel_token.cancel(); // stop everything else
Ok(())
}
// Run a single prompt through the engine
async fn evaluate(
request_id: usize,
service_name: &str,
engine: OpenAIChatCompletionsStreamingEngine,
entry: &mut Entry,
template: Option<Arc<RequestTemplate>>,
) -> anyhow::Result<String> {
let user_message = dynamo_protocols::types::ChatCompletionRequestMessage::User(
dynamo_protocols::types::ChatCompletionRequestUserMessage {
content: dynamo_protocols::types::ChatCompletionRequestUserMessageContent::Text(
entry.text.clone(),
),
name: None,
},
);
let inner = dynamo_protocols::types::CreateChatCompletionRequestArgs::default()
.messages(vec![user_message])
.model(
template
.as_ref()
.map_or_else(|| service_name.to_string(), |t| t.model.clone()),
)
.stream(true)
.max_completion_tokens(
template
.as_ref()
.map_or(MAX_TOKENS, |t| t.max_completion_tokens),
)
.temperature(template.as_ref().map_or(0.7, |t| t.temperature))
.build()?;
let req = NvCreateChatCompletionRequest {
inner,
common: Default::default(),
nvext: None,
chat_template_args: None,
media_io_kwargs: None,
unsupported_fields: Default::default(),
};
let mut stream = engine.generate(Context::new(req)).await?;
let mut output = String::new();
while let Some(item) = stream.next().await {
match (item.data.as_ref(), item.event.as_deref()) {
(Some(data), _) => {
// Normal case
let Some(chat_comp) = data.inner.choices.first() else {
continue;
};
if let Some(c) = &chat_comp.delta.content {
match c {
ChatCompletionMessageContent::Text(text) => {
output += text;
}
ChatCompletionMessageContent::Parts(_) => {
// Multimodal content - skip for now in batch processing
// (ayushag) TODO: Handle multimodal content in batch mode
}
}
}
entry.finish_reason = chat_comp.finish_reason;
if let Some(finish_reason) = chat_comp.finish_reason.as_ref() {
tracing::trace!(request_id, "finish reason: {:?}", finish_reason);
break;
}
}
(None, Some("error")) => {
tracing::error!(request_id, "the error case");
// There's only one error but we loop in case that changes
for err in item.comment.unwrap_or_default() {
tracing::error!(request_id, "Engine error: {err}");
}
}
(None, Some(annotation)) => {
tracing::debug!(request_id, "Annotation. {annotation}: {:?}", item.comment);
}
_ => {
unreachable!("Event from engine with no data, no error, no annotation.");
}
}
}
Ok(output)
}
async fn output_writer(
cancel_token: CancellationToken,
mut entries_rx: tokio::sync::mpsc::Receiver<Entry>,
output_file: &Path,
) -> anyhow::Result<()> {
let mut num_completed = 0;
let mut f = tokio::fs::File::create(output_file).await?;
loop {
let entry = tokio::select! {
_ = cancel_token.cancelled() => {
break;
}
maybe_entry = entries_rx.recv() => {
match maybe_entry {
Some(entry) => entry,
None => {break;}
}
}
};
let mut s = serde_json::to_string(&entry)?;
s.push('\n');
f.write_all(s.as_bytes()).await?;
num_completed += 1;
// TODO: Progress bar. We'd have to count the lines in the input first,
// and the input maybe be large
tracing::info!(entry.request_id, entry.tokens_out, "Saved {num_completed}");
}
Ok(())
}
...@@ -45,7 +45,6 @@ pub struct PreparedEngine { ...@@ -45,7 +45,6 @@ pub struct PreparedEngine {
pub service_name: String, pub service_name: String,
pub engine: OpenAIChatCompletionsStreamingEngine, pub engine: OpenAIChatCompletionsStreamingEngine,
pub inspect_template: bool, pub inspect_template: bool,
pub card: Option<ModelDeploymentCard>,
pub request_template: Option<RequestTemplate>, pub request_template: Option<RequestTemplate>,
} }
...@@ -88,16 +87,6 @@ async fn wait_for_min_initial_workers( ...@@ -88,16 +87,6 @@ async fn wait_for_min_initial_workers(
} }
} }
impl PreparedEngine {
pub fn has_tokenizer(&self) -> bool {
if let Some(card) = self.card.as_ref() {
card.has_tokenizer()
} else {
false
}
}
}
/// Turns an EngineConfig into an OpenAI chat-completions and completions supported StreamingEngine. /// Turns an EngineConfig into an OpenAI chat-completions and completions supported StreamingEngine.
pub async fn prepare_engine( pub async fn prepare_engine(
distributed_runtime: DistributedRuntime, distributed_runtime: DistributedRuntime,
...@@ -148,7 +137,6 @@ pub async fn prepare_engine( ...@@ -148,7 +137,6 @@ pub async fn prepare_engine(
service_name: model_service_name, service_name: model_service_name,
engine, engine,
inspect_template: false, inspect_template: false,
card: None,
request_template: local_model.request_template(), request_template: local_model.request_template(),
}) })
} }
...@@ -161,7 +149,6 @@ pub async fn prepare_engine( ...@@ -161,7 +149,6 @@ pub async fn prepare_engine(
engine, engine,
inspect_template: false, inspect_template: false,
request_template: model.request_template(), request_template: model.request_template(),
card: Some(model.into_card()),
}) })
} }
EngineConfig::InProcessTokens { EngineConfig::InProcessTokens {
...@@ -182,7 +169,6 @@ pub async fn prepare_engine( ...@@ -182,7 +169,6 @@ pub async fn prepare_engine(
engine: pipeline, engine: pipeline,
inspect_template: true, inspect_template: true,
request_template: model.request_template(), request_template: model.request_template(),
card: Some(model.into_card()),
}) })
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment