Unverified Commit 757223b3 authored by OlivierDehaene's avatar OlivierDehaene Committed by GitHub
Browse files

feat: add SchedulerV3 (#1996)

- Refactor code to allow supporting multiple versions of the
generate.proto at the same time
- Add v3/generate.proto (ISO to generate.proto for now but allow for
future changes without impacting v2 backends)
- Add Schedule trait to abstract queuing and batching mechanisms that
will be different in the future
- Add SchedulerV2/V3 impl
parent fec0167a
mod health;
pub(crate) mod v2;
pub(crate) mod v3;
pub(crate) use health::HealthCheck;
use crate::validation::{ValidGenerateRequest, Validation, ValidationError};
use crate::{
ChatTemplateInputs, ChatTemplateVersions, FinishReason, GenerateRequest, HubProcessorConfig,
HubTokenizerConfig, Message, MessageChunk, PrefillToken, Text, TextMessage, Token,
};
use crate::{FunctionRef, FunctionsMap, GrammarType, Properties, Tool, ToolType, Tools};
use futures::future::try_join_all;
use minijinja::{Environment, ErrorKind, Template};
use serde_json::{json, Map, Value};
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
use tokio::time::Instant;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt;
use tracing::instrument;
pub(crate) trait Scheduler {
fn schedule(
&self,
request: ValidGenerateRequest,
permit: OwnedSemaphorePermit,
) -> Result<GenerateStreamResponse, InferError>;
}
/// Inference struct
#[derive(Clone)]
pub struct Infer {
/// Validation
validation: Validation,
/// Request scheduler
scheduler: Arc<dyn Scheduler + Send + Sync>,
/// Chat template
chat_template: Option<ChatTemplate>,
/// Inference limit
limit_concurrent_requests: Arc<Semaphore>,
}
impl Infer {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
scheduler: Arc<dyn Scheduler + Send + Sync>,
validation: Validation,
max_concurrent_requests: usize,
tokenizer_config: HubTokenizerConfig,
processor_config: HubProcessorConfig,
) -> Self {
let chat_template = tokenizer_config
.chat_template
.or(processor_config.chat_template)
.and_then(|t| match t {
ChatTemplateVersions::Single(template) => Some(template),
ChatTemplateVersions::Multiple(templates) => templates
.into_iter()
.find(|t| t.name == "default")
.map(|t| t.template),
})
.map(|t| {
// .strip() is not supported in minijinja
// .capitalize() is not supported in minijinja but we can use | capitalize
let t = t
.replace(".strip()", " | trim")
.replace(".capitalize()", " | capitalize");
ChatTemplate::new(t, tokenizer_config.bos_token, tokenizer_config.eos_token)
});
// Inference limit with a semaphore
let semaphore = Arc::new(Semaphore::new(max_concurrent_requests));
Self {
validation,
scheduler,
chat_template,
limit_concurrent_requests: semaphore,
}
}
/// Add a new request to the queue and return a stream of InferStreamResponse
#[instrument(skip_all)]
pub(crate) async fn generate_stream(
&self,
request: GenerateRequest,
) -> Result<GenerateStreamResponse, InferError> {
// Limit concurrent requests by acquiring a permit from the semaphore
let permit = self
.clone()
.limit_concurrent_requests
.try_acquire_owned()
.map_err(|err| {
metrics::increment_counter!("tgi_request_failure", "err" => "overloaded");
tracing::error!("{err}");
err
})?;
// Validate request
let valid_request = self.validation.validate(request).await.map_err(|err| {
metrics::increment_counter!("tgi_request_failure", "err" => "validation");
tracing::error!("{err}");
err
})?;
self.scheduler.schedule(valid_request, permit)
}
/// Tokenizer the input
#[instrument(skip_all)]
pub(crate) async fn tokenize(
&self,
request: GenerateRequest,
) -> Result<Option<tokenizers::Encoding>, InferError> {
// Tokenize request
let inputs = request.inputs;
let truncate = request.parameters.truncate;
let encoding = self
.validation
.tokenize(inputs, truncate)
.await
.map_err(|err| {
tracing::error!("Tokenization {err}");
err
})?;
// Return Encoding
Ok(encoding.map(|(encoding, _)| encoding))
}
/// Apply the chat template to the chat request
#[instrument(skip_all)]
pub(crate) fn apply_chat_template(
&self,
messages: Vec<Message>,
grammar_with_prompt: Option<(GrammarType, String)>,
) -> Result<String, InferError> {
self.chat_template
.as_ref()
.ok_or_else(|| InferError::TemplateError(ErrorKind::TemplateNotFound.into()))?
.apply(messages, grammar_with_prompt)
.map_err(|e| {
metrics::increment_counter!("tgi_request_failure", "err" => "template");
tracing::error!("{e}");
e
})
}
/// Add a new request to the queue and return a InferResponse
#[instrument(skip_all)]
pub(crate) async fn generate(
&self,
request: GenerateRequest,
) -> Result<InferResponse, InferError> {
let use_top_tokens = request.parameters.top_n_tokens.is_some_and(|x| x > 0);
// Create stream and keep semaphore permit as long as generate lives
let (_permit, _input_length, mut stream) = self.generate_stream(request).await?;
// Return values
let mut result_prefill = Vec::new();
let mut result_tokens = Vec::new();
let mut result_top_tokens = Vec::new();
let mut result_generated_text = None;
let mut result_start = None;
let mut result_queued = None;
// Iterate on stream
while let Some(response) = stream.next().await {
match response? {
// Add prefill tokens
InferStreamResponse::Prefill(prefill_tokens) => {
result_prefill = prefill_tokens;
}
// Push last token
InferStreamResponse::Intermediate { token, top_tokens } => {
result_tokens.push(token);
result_top_tokens.push(top_tokens);
}
// Final message
// Set return values
InferStreamResponse::End {
token,
generated_text,
start,
queued,
top_tokens,
} => {
result_tokens.push(token);
result_top_tokens.push(top_tokens);
result_generated_text = Some(generated_text);
result_start = Some(start);
result_queued = Some(queued)
}
}
}
// Check that we received a `InferStreamResponse::End` message
if let (Some(generated_text), Some(queued), Some(start)) =
(result_generated_text, result_queued, result_start)
{
Ok(InferResponse {
prefill: result_prefill,
_input_length,
tokens: result_tokens,
generated_text,
queued,
start,
top_tokens: if use_top_tokens {
result_top_tokens
} else {
Vec::new()
},
})
} else {
let err = InferError::IncompleteGeneration;
metrics::increment_counter!("tgi_request_failure", "err" => "incomplete");
tracing::error!("{err}");
Err(err)
}
}
/// Add best_of new requests to the queue and return a InferResponse of the sequence with
/// the highest log probability per token
#[instrument(skip(self, request))]
pub(crate) async fn generate_best_of(
&self,
request: GenerateRequest,
best_of: usize,
) -> Result<(InferResponse, Vec<InferResponse>), InferError> {
// validate best_of parameter separately
let best_of = self.validation.validate_best_of(best_of)?;
// create multiple generate requests
let mut infer_responses: Vec<InferResponse> =
try_join_all((0..best_of).map(|_| self.generate(request.clone()))).await?;
// get the sequence with the highest log probability per token
let mut max_index = 0;
let mut max_logprob: f32 = f32::MIN;
for (i, response) in infer_responses.iter().enumerate() {
// mean logprobs of the generated tokens
let sequence_logprob = response
.tokens
.iter()
.map(|token| token.logprob)
.sum::<f32>()
/ response.tokens.len() as f32;
// set best sequence
if sequence_logprob > max_logprob {
max_index = i;
max_logprob = sequence_logprob;
}
}
let best_response = infer_responses.remove(max_index);
Ok((best_response, infer_responses))
}
}
/// Raise a exception (custom function) used in the chat templates
fn raise_exception(err_text: String) -> Result<String, minijinja::Error> {
Err(minijinja::Error::new(ErrorKind::SyntaxError, err_text))
}
#[derive(Clone)]
struct ChatTemplate {
template: Template<'static, 'static>,
bos_token: Option<String>,
eos_token: Option<String>,
use_default_tool_template: bool,
}
impl ChatTemplate {
fn new(template: String, bos_token: Option<String>, eos_token: Option<String>) -> Self {
let mut env = Box::new(Environment::new());
let template_str = template.into_boxed_str();
env.add_function("raise_exception", raise_exception);
// check if contains the tools variable within the template
let use_default_tool_template =
!template_str.as_ref().replace(' ', "").contains("{{tools}}");
// leaking env and template_str as read-only, static resources for performance.
let template = Box::leak(env)
.template_from_str(Box::leak(template_str))
.unwrap();
Self {
template,
bos_token,
eos_token,
use_default_tool_template,
}
}
fn apply(
&self,
mut messages: Vec<Message>,
grammar_with_prompt: Option<(GrammarType, String)>,
) -> Result<String, InferError> {
if self.use_default_tool_template {
if let Some(last_message) = messages.last_mut() {
if let Some((GrammarType::Json(tools), tool_prompt)) = grammar_with_prompt {
last_message.content.push(MessageChunk::Text(Text {
text: format!("\n---\n{}\n{}", tool_prompt, tools),
}));
}
}
}
let messages: Vec<TextMessage> = messages.into_iter().map(|c| c.into()).collect();
self.template
.render(ChatTemplateInputs {
messages,
bos_token: self.bos_token.as_deref(),
eos_token: self.eos_token.as_deref(),
add_generation_prompt: true,
tools: None,
tools_prompt: None,
})
.map_err(InferError::TemplateError)
}
}
pub struct ToolGrammar {}
impl ToolGrammar {
pub fn apply(
tools: Option<Vec<Tool>>,
tool_choice: Option<ToolType>,
) -> Result<Option<Tools>, InferError> {
if let Some((req_tools, tool_choice)) = tools.zip(tool_choice) {
// let tool_prompt = tool_prompt.unwrap_or_default();
let tools_to_use = match tool_choice {
ToolType::FunctionName(name) => {
vec![req_tools
.iter()
.find(|tool| tool.function.name == *name)
.unwrap_or_else(|| panic!("Tool with name {} not found", name))
.clone()]
}
ToolType::OneOf => req_tools.to_owned(),
};
// adds the error notification function for LLM feedback if required
let mut text_response_properties = Map::new();
text_response_properties.insert(
"error".to_string(),
serde_json::json!({
"type": "string",
"description": "The error or issue to notify"
}),
);
text_response_properties.insert(
"_name".to_string(),
serde_json::json!({
"type": "string",
"const": "notify_error"
}),
);
let functions: HashMap<String, serde_json::Value> = tools_to_use
.iter()
.map(|tool| {
let func = tool.function.clone();
// Clone the existing parameters, which are expected to be a JSON object
let mut params = if let Value::Object(params) = &func.arguments {
params.clone()
} else {
Map::new()
};
// Insert the function's description at the top level, outside of properties
params.insert(
"description".to_string(),
Value::String(func.description.clone().unwrap_or_default()),
);
// Ensure 'properties' exists and is an object
let properties = params
.entry("properties".to_string())
.or_insert_with(|| json!({}))
.as_object_mut()
.unwrap();
// Insert the constant for the function name inside 'properties'
properties.insert(
"_name".to_string(),
json!({
"type": "string",
"const": func.name.clone(),
// "description": "The name of the function"
}),
);
// Check if 'required' exists, and it is an array. If not, create an empty array.
let required = params
.entry("required".to_string())
.or_insert_with(|| json!([]))
.as_array_mut()
.unwrap();
// Add 'name' to the 'required' array if it is not already present
if !required.iter().any(|r| r == "_name") {
required.push(json!("_name"));
}
(func.name, Value::Object(params))
})
.chain([(
"notify_error".to_string(),
serde_json::json!({
"properties": text_response_properties,
"required": ["error", "_name"],
"type": "object"
}),
)])
.collect();
let tools = Tools {
functions_map: FunctionsMap { functions },
properties: Properties {
function: tools_to_use
.iter()
.map(|tool| FunctionRef {
ref_path: format!("#/$functions/{}", tool.function.name.clone()),
})
.chain(std::iter::once(FunctionRef {
ref_path: "#/$functions/notify_error".to_string(),
}))
.collect(),
},
};
return Ok(Some(tools));
}
// Err(InferError::ToolError("No tools provided".to_string()))
Ok(None)
}
}
/// Type alias for generation responses
pub(crate) type GenerateStreamResponse = (
OwnedSemaphorePermit,
u32, // input_length
UnboundedReceiverStream<Result<InferStreamResponse, InferError>>,
);
#[derive(Debug)]
pub(crate) struct GeneratedText {
pub(crate) text: String,
pub(crate) generated_tokens: u32,
pub(crate) finish_reason: FinishReason,
pub(crate) seed: Option<u64>,
}
#[derive(Debug)]
pub(crate) enum InferStreamResponse {
// Optional first message
Prefill(Vec<PrefillToken>),
// Intermediate messages
Intermediate {
token: Token,
top_tokens: Vec<Token>,
},
// Last message
End {
token: Token,
top_tokens: Vec<Token>,
generated_text: GeneratedText,
start: Instant,
queued: Instant,
},
}
#[derive(Debug)]
pub(crate) struct InferResponse {
/// input_length is the input as perceived by the rust tokenizer in the
/// validation pathway. It is redundant with prefill.len() but prefill
/// has data only if the user asked for it. This will always be filled.
pub(crate) _input_length: u32,
pub(crate) prefill: Vec<PrefillToken>,
pub(crate) tokens: Vec<Token>,
pub(crate) generated_text: GeneratedText,
pub(crate) queued: Instant,
pub(crate) start: Instant,
pub(crate) top_tokens: Vec<Vec<Token>>,
}
#[derive(Debug, Error)]
pub enum InferError {
#[error("Request failed during generation: {0}")]
GenerationError(String),
#[error("Model is overloaded")]
Overloaded(#[from] TryAcquireError),
#[error("Input validation error: {0}")]
ValidationError(#[from] ValidationError),
#[error("Incomplete generation")]
IncompleteGeneration,
#[error("Template error: {0}")]
TemplateError(#[from] minijinja::Error),
#[error("Tool error: {0}")]
ToolError(String),
}
impl InferError {
pub(crate) fn error_type(&self) -> &str {
match self {
InferError::GenerationError(_) => "generation",
InferError::Overloaded(_) => "overloaded",
InferError::ValidationError(_) => "validation",
InferError::IncompleteGeneration => "incomplete_generation",
InferError::TemplateError(_) => "template_error",
InferError::ToolError(_) => "tool_error",
}
}
}
mod queue;
mod scheduler;
pub(crate) use scheduler::SchedulerV2;
use crate::infer::{InferError, InferStreamResponse};
use crate::validation::{
ValidGenerateRequest, ValidGrammar, ValidParameters, ValidStoppingParameters,
};
use nohash_hasher::{BuildNoHashHasher, IntMap};
use std::cmp::min;
use std::collections::VecDeque;
use text_generation_client::v2::{
Batch, GrammarType, NextTokenChooserParameters, Request, StoppingCriteriaParameters,
};
use text_generation_client::ChunksToString;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tracing::{info_span, instrument, Span};
/// Queue entry
#[derive(Debug)]
pub(crate) struct Entry {
/// Request
pub request: ValidGenerateRequest,
/// Response sender to communicate between the Infer struct and the batching_task
pub response_tx: mpsc::UnboundedSender<Result<InferStreamResponse, InferError>>,
/// Span that will live as long as entry
pub span: Span,
/// Temporary span used as a guard when logging inference, wait times...
pub temp_span: Option<Span>,
/// Instant when this entry was queued
pub queue_time: Instant,
/// Instant when this entry was added to a batch
pub batch_time: Option<Instant>,
}
/// Request Queue
#[derive(Debug, Clone)]
pub(crate) struct Queue {
/// Channel to communicate with the background queue task
queue_sender: mpsc::UnboundedSender<QueueCommand>,
}
impl Queue {
pub(crate) fn new(
requires_padding: bool,
block_size: u32,
window_size: Option<u32>,
speculate: u32,
) -> Self {
// Create channel
let (queue_sender, queue_receiver) = mpsc::unbounded_channel();
// Launch background queue task
tokio::spawn(queue_task(
requires_padding,
block_size,
window_size,
speculate,
queue_receiver,
));
Self { queue_sender }
}
#[instrument(skip_all)]
pub(crate) fn append(&self, entry: Entry) {
// Send append command to the background task managing the state
// Unwrap is safe here
self.queue_sender
.send(QueueCommand::Append(Box::new(entry), Span::current()))
.unwrap();
}
// Get the next batch
#[instrument(skip(self))]
pub(crate) async fn next_batch(
&self,
min_size: Option<usize>,
max_size: Option<usize>,
prefill_token_budget: u32,
token_budget: u32,
) -> Option<NextBatch> {
// Create response channel
let (response_sender, response_receiver) = oneshot::channel();
// Send next batch command to the background task managing the state
// Unwrap is safe here
self.queue_sender
.send(QueueCommand::NextBatch {
min_size,
max_size,
prefill_token_budget,
token_budget,
response_sender,
span: Span::current(),
})
.unwrap();
// Await on response channel
// Unwrap is safe here
response_receiver.await.unwrap()
}
}
// Background task responsible of the queue state
async fn queue_task(
requires_padding: bool,
block_size: u32,
window_size: Option<u32>,
speculate: u32,
mut receiver: mpsc::UnboundedReceiver<QueueCommand>,
) {
let mut state = State::new(requires_padding, block_size, window_size, speculate);
while let Some(cmd) = receiver.recv().await {
match cmd {
QueueCommand::Append(entry, span) => {
span.in_scope(|| state.append(*entry));
metrics::increment_gauge!("tgi_queue_size", 1.0);
}
QueueCommand::NextBatch {
min_size,
max_size,
prefill_token_budget,
token_budget,
response_sender,
span,
} => span.in_scope(|| {
let next_batch =
state.next_batch(min_size, max_size, prefill_token_budget, token_budget);
response_sender.send(next_batch).unwrap();
metrics::gauge!("tgi_queue_size", state.entries.len() as f64);
}),
}
}
}
/// Queue State
#[derive(Debug)]
struct State {
/// Queue entries organized in a Vec
entries: VecDeque<(u64, Entry)>,
/// Id of the next entry
next_id: u64,
/// Id of the next batch
next_batch_id: u64,
/// Whether the model is using padding
requires_padding: bool,
/// Paged Attention block size
block_size: u32,
/// Sliding window
window_size: Option<u32>,
/// Speculation amount
speculate: u32,
}
impl State {
fn new(
requires_padding: bool,
block_size: u32,
window_size: Option<u32>,
speculate: u32,
) -> Self {
Self {
entries: VecDeque::with_capacity(128),
next_id: 0,
next_batch_id: 0,
requires_padding,
block_size,
window_size,
speculate,
}
}
/// Append an entry to the queue
fn append(&mut self, mut entry: Entry) {
// Create a span that will live as long as the entry is in the queue waiting to be batched
let queue_span = info_span!(parent: &entry.span, "queued");
entry.temp_span = Some(queue_span);
// Push entry in the queue
self.entries.push_back((self.next_id, entry));
self.next_id += 1;
}
// Get the next batch
fn next_batch(
&mut self,
min_size: Option<usize>,
max_size: Option<usize>,
prefill_token_budget: u32,
token_budget: u32,
) -> Option<NextBatch> {
if self.entries.is_empty() {
tracing::debug!("No queue");
return None;
}
// Check if we have enough entries
if let Some(min_size) = min_size {
if self.entries.len() < min_size {
tracing::debug!("Not enough entries");
return None;
}
}
// Pad prefill_token_budget to be a multiple of block size
let prefill_token_budget =
((prefill_token_budget + self.block_size - 1) / self.block_size) * self.block_size;
// Create span for this batch to add context to inference calls
let next_batch_span = info_span!(parent: None, "batch", batch_size = tracing::field::Empty);
next_batch_span.follows_from(&Span::current());
let mut batch_requests = Vec::with_capacity(self.entries.len());
let mut batch_entries =
IntMap::with_capacity_and_hasher(self.entries.len(), BuildNoHashHasher::default());
let mut max_input_length = 0;
let mut prefill_tokens: u32 = 0;
let mut decode_tokens: u32 = 0;
// Pop entries starting from the front of the queue
while let Some((id, mut entry)) = self.entries.pop_front() {
// Filter entries where the response receiver was dropped (== entries where the request
// was dropped by the client)
if entry.response_tx.is_closed() {
metrics::increment_counter!("tgi_request_failure", "err" => "dropped");
tracing::debug!("Dropping entry");
continue;
}
if self.requires_padding {
// We pad to max input length in the Python shards
// We need to take these padding tokens into the equation
max_input_length = max_input_length.max(entry.request.input_length);
prefill_tokens = (batch_requests.len() + 1) as u32 * max_input_length
} else {
// pad to block size
prefill_tokens += ((entry.request.input_length + self.block_size - 1)
/ self.block_size)
* self.block_size;
}
if self.requires_padding {
decode_tokens += entry.request.stopping_parameters.max_new_tokens;
} else {
let max_new_tokens = match self.window_size {
None => entry.request.stopping_parameters.max_new_tokens,
Some(window_size) => min(
window_size.saturating_sub(entry.request.input_length),
entry.request.stopping_parameters.max_new_tokens,
),
};
// pad to block size
decode_tokens +=
((max_new_tokens + self.block_size - 1) / self.block_size) * self.block_size;
}
if prefill_tokens > prefill_token_budget
|| (prefill_tokens + decode_tokens + self.speculate) > token_budget
{
// Entry is over budget
// Add it back to the front
tracing::debug!("Over budget: prefill_tokens={prefill_tokens} > {prefill_token_budget} || {prefill_tokens} + {decode_tokens} + {} > {token_budget}", self.speculate);
self.entries.push_front((id, entry));
break;
}
tracing::debug!("Accepting entry");
// Create a new span to link the batch back to this entry
let entry_batch_span = info_span!(parent: &entry.span, "infer");
// Add relationships
next_batch_span.follows_from(&entry_batch_span);
entry_batch_span.follows_from(&next_batch_span);
// Update entry
entry.temp_span = Some(entry_batch_span);
batch_requests.push(Request {
id,
prefill_logprobs: entry.request.decoder_input_details,
inputs: entry.request.inputs.chunks_to_string(),
truncate: entry.request.truncate,
parameters: Some(NextTokenChooserParameters::from(
entry.request.parameters.clone(),
)),
stopping_parameters: Some(StoppingCriteriaParameters::from(
entry.request.stopping_parameters.clone(),
)),
top_n_tokens: entry.request.top_n_tokens,
});
// Set batch_time
entry.batch_time = Some(Instant::now());
// Insert in batch_entries IntMap
batch_entries.insert(id, entry);
// Check if max_size
if Some(batch_requests.len()) == max_size {
break;
}
}
// Empty batch
if batch_requests.is_empty() {
tracing::debug!("Filtered out all entries");
return None;
}
// Check if our batch is big enough
if let Some(min_size) = min_size {
// Batch is too small
if batch_requests.len() < min_size {
// Add back entries to the queue in the correct order
for r in batch_requests.into_iter().rev() {
let id = r.id;
let entry = batch_entries.remove(&id).unwrap();
self.entries.push_front((id, entry));
}
return None;
}
}
// Final batch size
let size = batch_requests.len() as u32;
next_batch_span.record("batch_size", size);
let batch = Batch {
id: self.next_batch_id,
requests: batch_requests,
size,
max_tokens: (prefill_tokens + decode_tokens),
};
// Increment batch id
self.next_batch_id += 1;
metrics::histogram!("tgi_batch_next_size", batch.size as f64);
Some((batch_entries, batch, next_batch_span))
}
}
type NextBatch = (IntMap<u64, Entry>, Batch, Span);
#[derive(Debug)]
enum QueueCommand {
Append(Box<Entry>, Span),
NextBatch {
min_size: Option<usize>,
max_size: Option<usize>,
prefill_token_budget: u32,
token_budget: u32,
response_sender: oneshot::Sender<Option<NextBatch>>,
span: Span,
},
}
impl From<ValidParameters> for NextTokenChooserParameters {
fn from(value: ValidParameters) -> Self {
let (grammar, grammar_type) = match value.grammar {
None => (String::new(), GrammarType::None),
Some(grammar) => match grammar {
ValidGrammar::Json(grammar_string) => (grammar_string, GrammarType::Json),
ValidGrammar::Regex(grammar_string) => (grammar_string, GrammarType::Regex),
},
};
Self {
temperature: value.temperature,
top_k: value.top_k,
top_p: value.top_p,
typical_p: value.typical_p,
do_sample: value.do_sample,
seed: value.seed,
repetition_penalty: value.repetition_penalty,
frequency_penalty: value.frequency_penalty,
watermark: value.watermark,
grammar,
grammar_type: grammar_type.into(),
}
}
}
impl From<ValidStoppingParameters> for StoppingCriteriaParameters {
fn from(value: ValidStoppingParameters) -> Self {
Self {
max_new_tokens: value.max_new_tokens,
stop_sequences: value.stop_sequences,
ignore_eos_token: value.ignore_eos_token,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tracing::info_span;
fn default_entry() -> (
Entry,
mpsc::UnboundedReceiver<Result<InferStreamResponse, InferError>>,
) {
let (response_tx, receiver_tx) = mpsc::unbounded_channel();
let entry = Entry {
request: ValidGenerateRequest {
inputs: vec![],
input_length: 0,
truncate: 0,
decoder_input_details: false,
parameters: ValidParameters {
temperature: 0.0,
top_k: 0,
top_p: 0.0,
typical_p: 0.0,
do_sample: false,
seed: 0,
repetition_penalty: 0.0,
frequency_penalty: 0.0,
watermark: false,
grammar: None,
},
stopping_parameters: ValidStoppingParameters {
ignore_eos_token: false,
max_new_tokens: 1,
stop_sequences: vec![],
},
top_n_tokens: 0,
},
response_tx,
span: info_span!("entry"),
temp_span: None,
queue_time: Instant::now(),
batch_time: None,
};
(entry, receiver_tx)
}
#[test]
fn test_append() {
let mut state = State::new(false, 1, None, 0);
let (entry, _guard) = default_entry();
assert_eq!(state.next_id, 0);
assert_eq!(state.entries.len(), 0);
state.append(entry);
assert_eq!(state.next_id, 1);
assert_eq!(state.entries.len(), 1);
let (id, _) = state.entries.remove(0).unwrap();
assert_eq!(id, 0);
}
#[test]
fn test_next_batch_empty() {
let mut state = State::new(false, 1, None, 0);
assert!(state.next_batch(None, None, 1, 1).is_none());
assert!(state.next_batch(Some(1), None, 1, 1).is_none());
}
#[test]
fn test_next_batch_min_size() {
let mut state = State::new(false, 1, None, 0);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
state.append(entry1);
state.append(entry2);
let (entries, batch, _) = state.next_batch(None, None, 2, 2).unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&0));
assert!(entries.contains_key(&1));
assert!(entries.get(&0).unwrap().batch_time.is_some());
assert!(entries.get(&1).unwrap().batch_time.is_some());
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 2);
assert_eq!(state.next_id, 2);
assert_eq!(state.entries.len(), 0);
assert_eq!(state.next_batch_id, 1);
let (entry3, _guard3) = default_entry();
state.append(entry3);
assert!(state.next_batch(Some(2), None, 2, 2).is_none());
assert_eq!(state.next_id, 3);
assert_eq!(state.entries.len(), 1);
let (id, _) = state.entries.remove(0).unwrap();
assert_eq!(id, 2);
}
#[test]
fn test_next_batch_max_size() {
let mut state = State::new(false, 1, None, 0);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
state.append(entry1);
state.append(entry2);
let (entries, batch, _) = state.next_batch(None, Some(1), 2, 2).unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key(&0));
assert!(entries.get(&0).unwrap().batch_time.is_some());
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 1);
assert_eq!(state.next_id, 2);
assert_eq!(state.entries.len(), 1);
assert_eq!(state.next_batch_id, 1);
}
#[test]
fn test_next_batch_token_budget() {
let mut state = State::new(false, 1, None, 0);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
state.append(entry1);
state.append(entry2);
let (entries, batch, _) = state.next_batch(None, None, 1, 1).unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key(&0));
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 1);
assert_eq!(state.next_id, 2);
assert_eq!(state.entries.len(), 1);
assert_eq!(state.next_batch_id, 1);
let (entry3, _guard3) = default_entry();
state.append(entry3);
let (entries, batch, _) = state.next_batch(None, None, 3, 3).unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&1));
assert!(entries.contains_key(&2));
assert_eq!(batch.id, 1);
assert_eq!(batch.size, 2);
assert_eq!(state.next_id, 3);
assert_eq!(state.entries.len(), 0);
assert_eq!(state.next_batch_id, 2);
}
#[tokio::test]
async fn test_queue_append() {
let queue = Queue::new(false, 1, None, 0);
let (entry, _guard) = default_entry();
queue.append(entry);
}
#[tokio::test]
async fn test_queue_next_batch_empty() {
let queue = Queue::new(false, 1, None, 0);
assert!(queue.next_batch(None, None, 1, 1).await.is_none());
assert!(queue.next_batch(Some(1), None, 1, 1).await.is_none());
}
#[tokio::test]
async fn test_queue_next_batch_min_size() {
let queue = Queue::new(false, 1, None, 0);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
queue.append(entry1);
queue.append(entry2);
let (entries, batch, _) = queue.next_batch(None, None, 2, 2).await.unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&0));
assert!(entries.contains_key(&1));
assert!(entries.get(&0).unwrap().batch_time.is_some());
assert!(entries.get(&1).unwrap().batch_time.is_some());
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 2);
let (entry3, _guard3) = default_entry();
queue.append(entry3);
// Not enough requests pending
assert!(queue.next_batch(Some(2), None, 2, 2).await.is_none());
// Not enough token budget
assert!(queue.next_batch(Some(1), None, 0, 0).await.is_none());
// Ok
let (entries2, batch2, _) = queue.next_batch(Some(1), None, 2, 2).await.unwrap();
assert_eq!(entries2.len(), 1);
assert!(entries2.contains_key(&2));
assert!(entries2.get(&2).unwrap().batch_time.is_some());
assert_eq!(batch2.id, 1);
assert_eq!(batch2.size, 1);
}
#[tokio::test]
async fn test_queue_next_batch_max_size() {
let queue = Queue::new(false, 1, None, 0);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
queue.append(entry1);
queue.append(entry2);
let (entries, batch, _) = queue.next_batch(None, Some(1), 2, 2).await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key(&0));
assert!(entries.get(&0).unwrap().batch_time.is_some());
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 1);
}
#[tokio::test]
async fn test_queue_next_batch_token_budget() {
let queue = Queue::new(false, 1, None, 0);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
queue.append(entry1);
queue.append(entry2);
let (entries, batch, _) = queue.next_batch(None, None, 1, 1).await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key(&0));
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 1);
let (entry3, _guard3) = default_entry();
queue.append(entry3);
let (entries, batch, _) = queue.next_batch(None, None, 3, 3).await.unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&1));
assert!(entries.contains_key(&2));
assert_eq!(batch.id, 1);
assert_eq!(batch.size, 2);
}
#[tokio::test]
async fn test_queue_next_batch_token_speculate() {
let queue = Queue::new(false, 1, None, 2);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
queue.append(entry1);
queue.append(entry2);
// Budget of 1 is not enough
assert!(queue.next_batch(None, None, 1, 1).await.is_none());
let (entries, batch, _) = queue.next_batch(None, None, 6, 6).await.unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&0));
assert!(entries.contains_key(&1));
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 2);
}
#[tokio::test]
async fn test_queue_next_batch_dropped_receiver() {
let queue = Queue::new(false, 1, None, 0);
let (entry, _) = default_entry();
queue.append(entry);
assert!(queue.next_batch(None, None, 1, 1).await.is_none());
}
}
/// Batching and inference logic /// Batching and inference logic
use crate::validation::{Validation, ValidationError}; use crate::infer::v2::queue::{Entry, Queue};
use crate::{ use crate::infer::{
ChatTemplateInputs, ChatTemplateVersions, Entry, GenerateRequest, GenerateStreamResponse, GenerateStreamResponse, GeneratedText, InferError, InferStreamResponse, Scheduler,
HubProcessorConfig, HubTokenizerConfig, Message, MessageChunk, PrefillToken, Queue, Text,
TextMessage, Token,
}; };
use crate::{FunctionRef, FunctionsMap, GrammarType, Properties, Tool, ToolType, Tools}; use crate::validation::ValidGenerateRequest;
use futures::future::try_join_all; use crate::{FinishReason, PrefillToken, Token};
use minijinja::{Environment, ErrorKind, Template};
use nohash_hasher::IntMap; use nohash_hasher::IntMap;
use serde_json::{json, Map, Value};
use std::collections::HashMap;
use std::sync::{ use std::sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}; };
use text_generation_client::{ use text_generation_client::v2::{Batch, CachedBatch, Generation, ShardedClient};
Batch, CachedBatch, ClientError, GeneratedText, Generation, ShardedClient, Tokens, use text_generation_client::ClientError;
};
use thiserror::Error;
use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::error::SendError;
use tokio::sync::{mpsc, Notify, Semaphore, TryAcquireError}; use tokio::sync::{mpsc, Notify, OwnedSemaphorePermit};
use tokio::time::Instant; use tokio::time::Instant;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt;
use tracing::{info_span, instrument, Instrument, Span}; use tracing::{info_span, instrument, Instrument, Span};
/// Inference struct pub(crate) struct SchedulerV2 {
#[derive(Clone)]
pub struct Infer {
/// Validation
validation: Validation,
/// Request queue /// Request queue
queue: Queue, queue: Queue,
/// Shared state /// Notify batcher on queue appends
shared: Arc<Shared>, batching_task_notifier: Arc<Notify>,
/// Chat template
chat_template: Option<ChatTemplate>,
/// Inference limit
limit_concurrent_requests: Arc<Semaphore>,
}
/// Infer shared state
struct Shared {
/// Batching background Tokio task notifier
batching_task: Notify,
} }
/// Raise a exception (custom function) used in the chat templates impl SchedulerV2 {
fn raise_exception(err_text: String) -> Result<String, minijinja::Error> {
Err(minijinja::Error::new(ErrorKind::SyntaxError, err_text))
}
impl Infer {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(crate) fn new( pub(crate) fn new(
client: ShardedClient, client: ShardedClient,
validation: Validation,
waiting_served_ratio: f32, waiting_served_ratio: f32,
max_batch_prefill_tokens: u32, max_batch_prefill_tokens: u32,
max_batch_total_tokens: u32, max_batch_total_tokens: u32,
max_waiting_tokens: usize, max_waiting_tokens: usize,
max_batch_size: Option<usize>, max_batch_size: Option<usize>,
max_concurrent_requests: usize,
requires_padding: bool, requires_padding: bool,
window_size: Option<u32>, window_size: Option<u32>,
speculate: u32, speculate: u32,
generation_health: Arc<AtomicBool>, generation_health: Arc<AtomicBool>,
tokenizer_config: HubTokenizerConfig,
processor_config: HubProcessorConfig,
) -> Self { ) -> Self {
let queue = Queue::new(requires_padding, 16, window_size, speculate); let queue = Queue::new(requires_padding, 16, window_size, speculate);
let shared = Arc::new(Shared { let batching_task_notifier = Arc::new(Notify::new());
batching_task: Notify::new(),
});
// Spawn batching background task that contains all the inference logic // Spawn batching background task that contains all the inference logic
tokio::spawn(batching_task( tokio::spawn(batching_task(
...@@ -84,72 +51,31 @@ impl Infer { ...@@ -84,72 +51,31 @@ impl Infer {
max_waiting_tokens, max_waiting_tokens,
max_batch_size, max_batch_size,
queue.clone(), queue.clone(),
shared.clone(), batching_task_notifier.clone(),
generation_health, generation_health,
)); ));
let chat_template = tokenizer_config
.chat_template
.or(processor_config.chat_template)
.and_then(|t| match t {
ChatTemplateVersions::Single(template) => Some(template),
ChatTemplateVersions::Multiple(templates) => templates
.into_iter()
.find(|t| t.name == "default")
.map(|t| t.template),
})
.map(|t| {
// .strip() is not supported in minijinja
// .capitalize() is not supported in minijinja but we can use | capitalize
let t = t
.replace(".strip()", " | trim")
.replace(".capitalize()", " | capitalize");
ChatTemplate::new(t, tokenizer_config.bos_token, tokenizer_config.eos_token)
});
// Inference limit with a semaphore
let semaphore = Arc::new(Semaphore::new(max_concurrent_requests));
Self { Self {
validation,
queue, queue,
shared, batching_task_notifier,
chat_template,
limit_concurrent_requests: semaphore,
} }
} }
}
/// Add a new request to the queue and return a stream of InferStreamResponse impl Scheduler for SchedulerV2 {
#[instrument(skip_all)] #[instrument(skip_all)]
pub(crate) async fn generate_stream( fn schedule(
&self, &self,
request: GenerateRequest, request: ValidGenerateRequest,
permit: OwnedSemaphorePermit,
) -> Result<GenerateStreamResponse, InferError> { ) -> Result<GenerateStreamResponse, InferError> {
// Limit concurrent requests by acquiring a permit from the semaphore
let permit = self
.clone()
.limit_concurrent_requests
.try_acquire_owned()
.map_err(|err| {
metrics::increment_counter!("tgi_request_failure", "err" => "overloaded");
tracing::error!("{err}");
err
})?;
// Validate request
let valid_request = self.validation.validate(request).await.map_err(|err| {
metrics::increment_counter!("tgi_request_failure", "err" => "validation");
tracing::error!("{err}");
err
})?;
// MPSC channel to communicate with the background batching task // MPSC channel to communicate with the background batching task
let (response_tx, response_rx) = mpsc::unbounded_channel(); let (response_tx, response_rx) = mpsc::unbounded_channel();
let input_length = valid_request.input_length; let input_length = request.input_length;
// Append the request to the queue // Append the request to the queue
self.queue.append(Entry { self.queue.append(Entry {
request: valid_request, request,
response_tx, response_tx,
span: Span::current(), span: Span::current(),
temp_span: None, temp_span: None,
...@@ -159,7 +85,7 @@ impl Infer { ...@@ -159,7 +85,7 @@ impl Infer {
// Notify the background task that we have a new entry in the queue that needs // Notify the background task that we have a new entry in the queue that needs
// to be batched // to be batched
self.shared.batching_task.notify_one(); self.batching_task_notifier.notify_one();
// Return stream // Return stream
Ok(( Ok((
...@@ -168,343 +94,6 @@ impl Infer { ...@@ -168,343 +94,6 @@ impl Infer {
UnboundedReceiverStream::new(response_rx), UnboundedReceiverStream::new(response_rx),
)) ))
} }
/// Tokenizer the input
#[instrument(skip_all)]
pub(crate) async fn tokenize(
&self,
request: GenerateRequest,
) -> Result<Option<tokenizers::Encoding>, InferError> {
// Tokenize request
let inputs = request.inputs;
let truncate = request.parameters.truncate;
let encoding = self
.validation
.tokenize(inputs, truncate)
.await
.map_err(|err| {
tracing::error!("Tokenization {err}");
err
})?;
// Return Encoding
Ok(encoding.map(|(encoding, _)| encoding))
}
/// Apply the chat template to the chat request
#[instrument(skip_all)]
pub(crate) fn apply_chat_template(
&self,
messages: Vec<Message>,
grammar_with_prompt: Option<(GrammarType, String)>,
) -> Result<String, InferError> {
self.chat_template
.as_ref()
.ok_or_else(|| InferError::TemplateError(ErrorKind::TemplateNotFound.into()))?
.apply(messages, grammar_with_prompt)
.map_err(|e| {
metrics::increment_counter!("tgi_request_failure", "err" => "template");
tracing::error!("{e}");
e
})
}
/// Add a new request to the queue and return a InferResponse
#[instrument(skip_all)]
pub(crate) async fn generate(
&self,
request: GenerateRequest,
) -> Result<InferResponse, InferError> {
let use_top_tokens = request.parameters.top_n_tokens.is_some_and(|x| x > 0);
// Create stream and keep semaphore permit as long as generate lives
let (_permit, _input_length, mut stream) = self.generate_stream(request).await?;
// Return values
let mut result_prefill = Vec::new();
let mut result_tokens = Vec::new();
let mut result_top_tokens = Vec::new();
let mut result_generated_text = None;
let mut result_start = None;
let mut result_queued = None;
// Iterate on stream
while let Some(response) = stream.next().await {
match response? {
// Add prefill tokens
InferStreamResponse::Prefill(tokens) => {
// Create Token objects
// We do that here instead of in the Python code as Rust for loops are faster
result_prefill = tokens
.ids
.into_iter()
.zip(tokens.logprobs.into_iter())
.zip(tokens.texts.into_iter())
.map(|((id, logprob), text)| PrefillToken { id, text, logprob })
.collect();
}
// Push last token
InferStreamResponse::Intermediate { token, top_tokens } => {
result_tokens.push(token);
result_top_tokens.push(top_tokens);
}
// Final message
// Set return values
InferStreamResponse::End {
token,
generated_text,
start,
queued,
top_tokens,
} => {
result_tokens.push(token);
result_top_tokens.push(top_tokens);
result_generated_text = Some(generated_text);
result_start = Some(start);
result_queued = Some(queued)
}
}
}
// Check that we received a `InferStreamResponse::End` message
if let (Some(generated_text), Some(queued), Some(start)) =
(result_generated_text, result_queued, result_start)
{
Ok(InferResponse {
prefill: result_prefill,
_input_length,
tokens: result_tokens,
generated_text,
queued,
start,
top_tokens: if use_top_tokens {
result_top_tokens
} else {
Vec::new()
},
})
} else {
let err = InferError::IncompleteGeneration;
metrics::increment_counter!("tgi_request_failure", "err" => "incomplete");
tracing::error!("{err}");
Err(err)
}
}
/// Add best_of new requests to the queue and return a InferResponse of the sequence with
/// the highest log probability per token
#[instrument(skip(self, request))]
pub(crate) async fn generate_best_of(
&self,
request: GenerateRequest,
best_of: usize,
) -> Result<(InferResponse, Vec<InferResponse>), InferError> {
// validate best_of parameter separately
let best_of = self.validation.validate_best_of(best_of)?;
// create multiple generate requests
let mut infer_responses: Vec<InferResponse> =
try_join_all((0..best_of).map(|_| self.generate(request.clone()))).await?;
// get the sequence with the highest log probability per token
let mut max_index = 0;
let mut max_logprob: f32 = f32::MIN;
for (i, response) in infer_responses.iter().enumerate() {
// mean logprobs of the generated tokens
let sequence_logprob = response
.tokens
.iter()
.map(|token| token.logprob)
.sum::<f32>()
/ response.tokens.len() as f32;
// set best sequence
if sequence_logprob > max_logprob {
max_index = i;
max_logprob = sequence_logprob;
}
}
let best_response = infer_responses.remove(max_index);
Ok((best_response, infer_responses))
}
}
#[derive(Clone)]
struct ChatTemplate {
template: Template<'static, 'static>,
bos_token: Option<String>,
eos_token: Option<String>,
use_default_tool_template: bool,
}
impl ChatTemplate {
fn new(template: String, bos_token: Option<String>, eos_token: Option<String>) -> Self {
let mut env = Box::new(Environment::new());
let template_str = template.into_boxed_str();
env.add_function("raise_exception", raise_exception);
// check if contains the tools variable within the template
let use_default_tool_template =
!template_str.as_ref().replace(' ', "").contains("{{tools}}");
// leaking env and template_str as read-only, static resources for performance.
let template = Box::leak(env)
.template_from_str(Box::leak(template_str))
.unwrap();
Self {
template,
bos_token,
eos_token,
use_default_tool_template,
}
}
fn apply(
&self,
mut messages: Vec<Message>,
grammar_with_prompt: Option<(GrammarType, String)>,
) -> Result<String, InferError> {
if self.use_default_tool_template {
if let Some(last_message) = messages.last_mut() {
if let Some((GrammarType::Json(tools), tool_prompt)) = grammar_with_prompt {
last_message.content.push(MessageChunk::Text(Text {
text: format!("\n---\n{}\n{}", tool_prompt, tools),
}));
}
}
}
let messages: Vec<TextMessage> = messages.into_iter().map(|c| c.into()).collect();
self.template
.render(ChatTemplateInputs {
messages,
bos_token: self.bos_token.as_deref(),
eos_token: self.eos_token.as_deref(),
add_generation_prompt: true,
tools: None,
tools_prompt: None,
})
.map_err(InferError::TemplateError)
}
}
pub struct ToolGrammar {}
impl ToolGrammar {
pub fn apply(
tools: Option<Vec<Tool>>,
tool_choice: Option<ToolType>,
) -> Result<Option<Tools>, InferError> {
if let Some((req_tools, tool_choice)) = tools.zip(tool_choice) {
// let tool_prompt = tool_prompt.unwrap_or_default();
let tools_to_use = match tool_choice {
ToolType::FunctionName(name) => {
vec![req_tools
.iter()
.find(|tool| tool.function.name == *name)
.unwrap_or_else(|| panic!("Tool with name {} not found", name))
.clone()]
}
ToolType::OneOf => req_tools.to_owned(),
};
// adds the error notification function for LLM feedback if required
let mut text_response_properties = Map::new();
text_response_properties.insert(
"error".to_string(),
serde_json::json!({
"type": "string",
"description": "The error or issue to notify"
}),
);
text_response_properties.insert(
"_name".to_string(),
serde_json::json!({
"type": "string",
"const": "notify_error"
}),
);
let functions: HashMap<String, serde_json::Value> = tools_to_use
.iter()
.map(|tool| {
let func = tool.function.clone();
// Clone the existing parameters, which are expected to be a JSON object
let mut params = if let Value::Object(params) = &func.arguments {
params.clone()
} else {
Map::new()
};
// Insert the function's description at the top level, outside of properties
params.insert(
"description".to_string(),
Value::String(func.description.clone().unwrap_or_default()),
);
// Ensure 'properties' exists and is an object
let properties = params
.entry("properties".to_string())
.or_insert_with(|| json!({}))
.as_object_mut()
.unwrap();
// Insert the constant for the function name inside 'properties'
properties.insert(
"_name".to_string(),
json!({
"type": "string",
"const": func.name.clone(),
// "description": "The name of the function"
}),
);
// Check if 'required' exists, and it is an array. If not, create an empty array.
let required = params
.entry("required".to_string())
.or_insert_with(|| json!([]))
.as_array_mut()
.unwrap();
// Add 'name' to the 'required' array if it is not already present
if !required.iter().any(|r| r == "_name") {
required.push(json!("_name"));
}
(func.name, Value::Object(params))
})
.chain([(
"notify_error".to_string(),
serde_json::json!({
"properties": text_response_properties,
"required": ["error", "_name"],
"type": "object"
}),
)])
.collect();
let tools = Tools {
functions_map: FunctionsMap { functions },
properties: Properties {
function: tools_to_use
.iter()
.map(|tool| FunctionRef {
ref_path: format!("#/$functions/{}", tool.function.name.clone()),
})
.chain(std::iter::once(FunctionRef {
ref_path: "#/$functions/notify_error".to_string(),
}))
.collect(),
},
};
return Ok(Some(tools));
}
// Err(InferError::ToolError("No tools provided".to_string()))
Ok(None)
}
} }
/// Batching logic /// Batching logic
...@@ -512,7 +101,7 @@ impl ToolGrammar { ...@@ -512,7 +101,7 @@ impl ToolGrammar {
/// ///
/// Batches requests and sends them to the inference server /// Batches requests and sends them to the inference server
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn batching_task( pub(crate) async fn batching_task(
mut client: ShardedClient, mut client: ShardedClient,
waiting_served_ratio: f32, waiting_served_ratio: f32,
max_batch_prefill_tokens: u32, max_batch_prefill_tokens: u32,
...@@ -520,13 +109,13 @@ async fn batching_task( ...@@ -520,13 +109,13 @@ async fn batching_task(
max_waiting_tokens: usize, max_waiting_tokens: usize,
max_batch_size: Option<usize>, max_batch_size: Option<usize>,
queue: Queue, queue: Queue,
shared: Arc<Shared>, notifier: Arc<Notify>,
generation_health: Arc<AtomicBool>, generation_health: Arc<AtomicBool>,
) { ) {
// Infinite loop // Infinite loop
loop { loop {
// Wait for a notification from the Infer struct // Wait for a notification from the Infer struct
shared.batching_task.notified().await; notifier.notified().await;
// Get the next batch from the queue // Get the next batch from the queue
// This batch might be smaller than the maximum batch size if there are not enough requests // This batch might be smaller than the maximum batch size if there are not enough requests
...@@ -792,6 +381,16 @@ fn send_responses( ...@@ -792,6 +381,16 @@ fn send_responses(
let mut stopped = false; let mut stopped = false;
if let Some(prefill_tokens) = generation.prefill_tokens { if let Some(prefill_tokens) = generation.prefill_tokens {
// Create Token objects
// We do that here instead of in the Python code as Rust for loops are faster
let prefill_tokens = prefill_tokens
.ids
.into_iter()
.zip(prefill_tokens.logprobs)
.zip(prefill_tokens.texts)
.map(|((id, logprob), text)| PrefillToken { id, text, logprob })
.collect();
// Send message // Send message
entry entry
.response_tx .response_tx
...@@ -842,7 +441,7 @@ fn send_responses( ...@@ -842,7 +441,7 @@ fn send_responses(
entry.response_tx.send(Ok(InferStreamResponse::End { entry.response_tx.send(Ok(InferStreamResponse::End {
token, token,
top_tokens, top_tokens,
generated_text: generated_text.clone(), generated_text: GeneratedText::from(generated_text.clone()),
queued: entry.queue_time, queued: entry.queue_time,
start: entry.batch_time.unwrap(), start: entry.batch_time.unwrap(),
}))?; }))?;
...@@ -877,64 +476,21 @@ fn send_errors(error: ClientError, entries: &mut IntMap<u64, Entry>) { ...@@ -877,64 +476,21 @@ fn send_errors(error: ClientError, entries: &mut IntMap<u64, Entry>) {
}); });
} }
#[derive(Debug)] impl From<text_generation_client::v2::GeneratedText> for GeneratedText {
pub(crate) enum InferStreamResponse { fn from(value: text_generation_client::v2::GeneratedText) -> Self {
// Optional first message let v2_finish_reason =
Prefill(Tokens), text_generation_client::v2::FinishReason::try_from(value.finish_reason).unwrap();
// Intermediate messages let finish_reason = match v2_finish_reason {
Intermediate { text_generation_client::v2::FinishReason::Length => FinishReason::Length,
token: Token, text_generation_client::v2::FinishReason::EosToken => FinishReason::EndOfSequenceToken,
top_tokens: Vec<Token>, text_generation_client::v2::FinishReason::StopSequence => FinishReason::StopSequence,
}, };
// Last message
End {
token: Token,
top_tokens: Vec<Token>,
generated_text: GeneratedText,
start: Instant,
queued: Instant,
},
}
#[derive(Debug)]
pub(crate) struct InferResponse {
/// input_length is the input as perceived by the rust tokenizer in the
/// validation pathway. It is redundant with prefill.len() but prefill
/// has data only if the user asked for it. This will always be filled.
pub(crate) _input_length: u32,
pub(crate) prefill: Vec<PrefillToken>,
pub(crate) tokens: Vec<Token>,
pub(crate) generated_text: GeneratedText,
pub(crate) queued: Instant,
pub(crate) start: Instant,
pub(crate) top_tokens: Vec<Vec<Token>>,
}
#[derive(Debug, Error)]
pub enum InferError {
#[error("Request failed during generation: {0}")]
GenerationError(String),
#[error("Model is overloaded")]
Overloaded(#[from] TryAcquireError),
#[error("Input validation error: {0}")]
ValidationError(#[from] ValidationError),
#[error("Incomplete generation")]
IncompleteGeneration,
#[error("Template error: {0}")]
TemplateError(#[from] minijinja::Error),
#[error("Tool error: {0}")]
ToolError(String),
}
impl InferError { Self {
pub(crate) fn error_type(&self) -> &str { text: value.text,
match self { generated_tokens: value.generated_tokens,
InferError::GenerationError(_) => "generation", finish_reason,
InferError::Overloaded(_) => "overloaded", seed: value.seed,
InferError::ValidationError(_) => "validation",
InferError::IncompleteGeneration => "incomplete_generation",
InferError::TemplateError(_) => "template_error",
InferError::ToolError(_) => "tool_error",
} }
} }
} }
...@@ -1355,11 +911,11 @@ mod tests { ...@@ -1355,11 +911,11 @@ mod tests {
chat_template: "{% for message in messages %}\n{% if message['role'] == 'user' %}\n{{ '<|user|>\\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'system' %}\n{{ '<|system|>\\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'assistant' %}\n{{ '<|assistant|>\\n' + message['content'] + eos_token }}\n{% endif %}\n{% if loop.last and add_generation_prompt %}\n{{ '<|assistant|>' }}\n{% endif %}\n{% endfor %}", chat_template: "{% for message in messages %}\n{% if message['role'] == 'user' %}\n{{ '<|user|>\\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'system' %}\n{{ '<|system|>\\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'assistant' %}\n{{ '<|assistant|>\\n' + message['content'] + eos_token }}\n{% endif %}\n{% if loop.last and add_generation_prompt %}\n{{ '<|assistant|>' }}\n{% endif %}\n{% endfor %}",
input: ChatTemplateInputs { input: ChatTemplateInputs {
messages: vec![ messages: vec![
TextMessage{ TextMessage {
role: "system".to_string(), role: "system".to_string(),
content: "You are a friendly chatbot who always responds in the style of a pirate".to_string(), content: "You are a friendly chatbot who always responds in the style of a pirate".to_string(),
}, },
TextMessage{ TextMessage {
role: "user".to_string(), role: "user".to_string(),
content: "How many helicopters can a human eat in one sitting?".to_string(), content: "How many helicopters can a human eat in one sitting?".to_string(),
}, },
......
mod queue;
mod scheduler;
pub(crate) use scheduler::SchedulerV3;
use crate::infer::InferError; use crate::infer::{InferError, InferStreamResponse};
use crate::infer::InferStreamResponse; use crate::validation::{
use crate::validation::ValidGenerateRequest; ValidGenerateRequest, ValidGrammar, ValidParameters, ValidStoppingParameters,
};
use nohash_hasher::{BuildNoHashHasher, IntMap}; use nohash_hasher::{BuildNoHashHasher, IntMap};
use std::cmp::min; use std::cmp::min;
use std::collections::VecDeque; use std::collections::VecDeque;
use text_generation_client::ChunksToString; use text_generation_client::v3::{
use text_generation_client::Input; Batch, GrammarType, NextTokenChooserParameters, Request, StoppingCriteriaParameters,
use text_generation_client::{Batch, Request}; };
use text_generation_client::{ChunksToString, Input};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant; use tokio::time::Instant;
use tracing::{info_span, instrument, Span}; use tracing::{info_span, instrument, Span};
...@@ -57,7 +59,6 @@ impl Queue { ...@@ -57,7 +59,6 @@ impl Queue {
Self { queue_sender } Self { queue_sender }
} }
/// Append an entry to the queue
#[instrument(skip_all)] #[instrument(skip_all)]
pub(crate) fn append(&self, entry: Entry) { pub(crate) fn append(&self, entry: Entry) {
// Send append command to the background task managing the state // Send append command to the background task managing the state
...@@ -280,13 +281,17 @@ impl State { ...@@ -280,13 +281,17 @@ impl State {
batch_requests.push(Request { batch_requests.push(Request {
id, id,
prefill_logprobs: entry.request.decoder_input_details, prefill_logprobs: entry.request.decoder_input_details,
inputs: entry.request.inputs.chunks_to_string(),
input_chunks: Some(Input { input_chunks: Some(Input {
chunks: entry.request.inputs.clone(), chunks: entry.request.inputs.clone(),
}), }),
inputs: entry.request.inputs.chunks_to_string(),
truncate: entry.request.truncate, truncate: entry.request.truncate,
parameters: Some(entry.request.parameters.clone()), parameters: Some(NextTokenChooserParameters::from(
stopping_parameters: Some(entry.request.stopping_parameters.clone()), entry.request.parameters.clone(),
)),
stopping_parameters: Some(StoppingCriteriaParameters::from(
entry.request.stopping_parameters.clone(),
)),
top_n_tokens: entry.request.top_n_tokens, top_n_tokens: entry.request.top_n_tokens,
}); });
// Set batch_time // Set batch_time
...@@ -355,12 +360,46 @@ enum QueueCommand { ...@@ -355,12 +360,46 @@ enum QueueCommand {
}, },
} }
impl From<ValidParameters> for NextTokenChooserParameters {
fn from(value: ValidParameters) -> Self {
let (grammar, grammar_type) = match value.grammar {
None => (String::new(), GrammarType::None),
Some(grammar) => match grammar {
ValidGrammar::Json(grammar_string) => (grammar_string, GrammarType::Json),
ValidGrammar::Regex(grammar_string) => (grammar_string, GrammarType::Regex),
},
};
Self {
temperature: value.temperature,
top_k: value.top_k,
top_p: value.top_p,
typical_p: value.typical_p,
do_sample: value.do_sample,
seed: value.seed,
repetition_penalty: value.repetition_penalty,
frequency_penalty: value.frequency_penalty,
watermark: value.watermark,
grammar,
grammar_type: grammar_type.into(),
}
}
}
impl From<ValidStoppingParameters> for StoppingCriteriaParameters {
fn from(value: ValidStoppingParameters) -> Self {
Self {
max_new_tokens: value.max_new_tokens,
stop_sequences: value.stop_sequences,
ignore_eos_token: value.ignore_eos_token,
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use text_generation_client::{
GrammarType as ProtoGrammarType, NextTokenChooserParameters, StoppingCriteriaParameters,
};
use tracing::info_span; use tracing::info_span;
fn default_entry() -> ( fn default_entry() -> (
...@@ -375,7 +414,7 @@ mod tests { ...@@ -375,7 +414,7 @@ mod tests {
input_length: 0, input_length: 0,
truncate: 0, truncate: 0,
decoder_input_details: false, decoder_input_details: false,
parameters: NextTokenChooserParameters { parameters: ValidParameters {
temperature: 0.0, temperature: 0.0,
top_k: 0, top_k: 0,
top_p: 0.0, top_p: 0.0,
...@@ -385,10 +424,9 @@ mod tests { ...@@ -385,10 +424,9 @@ mod tests {
repetition_penalty: 0.0, repetition_penalty: 0.0,
frequency_penalty: 0.0, frequency_penalty: 0.0,
watermark: false, watermark: false,
grammar: String::new(), grammar: None,
grammar_type: ProtoGrammarType::None as i32,
}, },
stopping_parameters: StoppingCriteriaParameters { stopping_parameters: ValidStoppingParameters {
ignore_eos_token: false, ignore_eos_token: false,
max_new_tokens: 1, max_new_tokens: 1,
stop_sequences: vec![], stop_sequences: vec![],
......
/// Batching and inference logic
use crate::infer::v3::queue::{Entry, Queue};
use crate::infer::{
GenerateStreamResponse, GeneratedText, InferError, InferStreamResponse, Scheduler,
};
use crate::validation::ValidGenerateRequest;
use crate::{FinishReason, PrefillToken, Token};
use nohash_hasher::IntMap;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use text_generation_client::v3::{Batch, CachedBatch, Generation, ShardedClient};
use text_generation_client::ClientError;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::{mpsc, Notify, OwnedSemaphorePermit};
use tokio::time::Instant;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{info_span, instrument, Instrument, Span};
pub(crate) struct SchedulerV3 {
/// Request queue
queue: Queue,
/// Notify batcher on queue appends
batching_task_notifier: Arc<Notify>,
}
impl SchedulerV3 {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: ShardedClient,
waiting_served_ratio: f32,
max_batch_prefill_tokens: u32,
max_batch_total_tokens: u32,
max_waiting_tokens: usize,
max_batch_size: Option<usize>,
requires_padding: bool,
window_size: Option<u32>,
speculate: u32,
generation_health: Arc<AtomicBool>,
) -> Self {
let queue = Queue::new(requires_padding, 16, window_size, speculate);
let batching_task_notifier = Arc::new(Notify::new());
// Spawn batching background task that contains all the inference logic
tokio::spawn(batching_task(
client,
waiting_served_ratio,
max_batch_prefill_tokens,
max_batch_total_tokens,
max_waiting_tokens,
max_batch_size,
queue.clone(),
batching_task_notifier.clone(),
generation_health,
));
Self {
queue,
batching_task_notifier,
}
}
}
impl Scheduler for SchedulerV3 {
#[instrument(skip_all)]
fn schedule(
&self,
request: ValidGenerateRequest,
permit: OwnedSemaphorePermit,
) -> Result<GenerateStreamResponse, InferError> {
// MPSC channel to communicate with the background batching task
let (response_tx, response_rx) = mpsc::unbounded_channel();
let input_length = request.input_length;
// Append the request to the queue
self.queue.append(Entry {
request,
response_tx,
span: Span::current(),
temp_span: None,
queue_time: Instant::now(),
batch_time: None,
});
// Notify the background task that we have a new entry in the queue that needs
// to be batched
self.batching_task_notifier.notify_one();
// Return stream
Ok((
permit,
input_length,
UnboundedReceiverStream::new(response_rx),
))
}
}
/// Batching logic
/// Will be launched in a background Tokio task
///
/// Batches requests and sends them to the inference server
#[allow(clippy::too_many_arguments)]
pub(crate) async fn batching_task(
mut client: ShardedClient,
waiting_served_ratio: f32,
max_batch_prefill_tokens: u32,
max_batch_total_tokens: u32,
max_waiting_tokens: usize,
max_batch_size: Option<usize>,
queue: Queue,
notifier: Arc<Notify>,
generation_health: Arc<AtomicBool>,
) {
// Infinite loop
loop {
// Wait for a notification from the Infer struct
notifier.notified().await;
// Get the next batch from the queue
// This batch might be smaller than the maximum batch size if there are not enough requests
// waiting in the queue
while let Some((mut entries, batch, span)) = queue
.next_batch(
None,
max_batch_size,
max_batch_prefill_tokens,
max_batch_total_tokens,
)
.await
{
let mut cached_batch = prefill(&mut client, batch, &mut entries, &generation_health)
.instrument(span)
.await;
let mut waiting_tokens = 1;
// We loop until we do not receive any cached batch from the inference server (== until
// all requests have met their stopping criteria)
while let Some(batch) = cached_batch {
// Get current batch info
let batch_size = batch.size;
let batch_max_tokens = batch.max_tokens;
let mut batches = vec![batch];
metrics::gauge!("tgi_batch_current_size", batch_size as f64);
metrics::gauge!("tgi_batch_current_max_tokens", batch_max_tokens as f64);
let min_size = if waiting_tokens >= max_waiting_tokens {
// If we didn't onboard any new requests since >= max_waiting_tokens, we try
// to add a new batch even though its size might be small
None
} else {
// Minimum batch size
Some((batch_size as f32 * waiting_served_ratio).floor() as usize)
};
let token_budget = max_batch_total_tokens.saturating_sub(batch_max_tokens);
let max_size = max_batch_size.map(|max_size| max_size - batch_size as usize);
// Try to get a new batch
if let Some((mut new_entries, new_batch, span)) = queue
.next_batch(min_size, max_size, max_batch_prefill_tokens, token_budget)
.await
{
// Tracking metrics
if min_size.is_some() {
metrics::increment_counter!("tgi_batch_concat", "reason" => "backpressure");
} else {
metrics::increment_counter!("tgi_batch_concat", "reason" => "wait_exceeded");
}
entries.iter_mut().for_each(|(_, entry)| {
// Create a new span to add the info that this entry is waiting
// because a new batch is being computed
let entry_waiting_span = info_span!(parent: &entry.span, "waiting");
// Add relationships
span.follows_from(&entry_waiting_span);
entry_waiting_span.follows_from(&span);
// Update entry
entry.temp_span = Some(entry_waiting_span);
});
// Generate one token for this new batch to have the attention past in cache
let new_cached_batch =
prefill(&mut client, new_batch, &mut new_entries, &generation_health)
.instrument(span)
.await;
// Reset waiting counter
waiting_tokens = 1;
// Extend current batch with the new batch
if let Some(new_cached_batch) = new_cached_batch {
entries.extend(new_entries);
batches.push(new_cached_batch);
}
}
// Create span for this batch to add context to inference calls
let next_batch_size = entries.len();
let next_batch_span =
info_span!(parent: None, "batch", batch_size = next_batch_size);
entries.iter_mut().for_each(|(_, entry)| {
// Create a new span to link the batch back to this entry
let entry_batch_span = info_span!(parent: &entry.span, "infer");
// Add relationships
next_batch_span.follows_from(&entry_batch_span);
entry_batch_span.follows_from(&next_batch_span);
// Update entry
entry.temp_span = Some(entry_batch_span);
});
cached_batch = decode(&mut client, batches, &mut entries, &generation_health)
.instrument(next_batch_span)
.await;
waiting_tokens += 1;
}
metrics::gauge!("tgi_batch_current_size", 0.0);
metrics::gauge!("tgi_batch_current_max_tokens", 0.0);
}
}
}
#[instrument(skip_all)]
async fn prefill(
client: &mut ShardedClient,
batch: Batch,
entries: &mut IntMap<u64, Entry>,
generation_health: &Arc<AtomicBool>,
) -> Option<CachedBatch> {
let start_time = Instant::now();
let batch_id = batch.id;
metrics::increment_counter!("tgi_batch_inference_count", "method" => "prefill");
match client.prefill(batch).await {
Ok((generations, next_batch, timings)) => {
// Update health
generation_health.store(true, Ordering::SeqCst);
let start_filtering_time = Instant::now();
// Send generated tokens and filter stopped entries
filter_send_generations(generations, entries);
// Filter next batch and remove requests that were stopped
let next_batch = filter_batch(client, next_batch, entries).await;
metrics::histogram!("tgi_batch_forward_duration", timings.forward.as_secs_f64(), "method" => "prefill");
metrics::histogram!("tgi_batch_decode_duration", timings.decode.as_secs_f64(), "method" => "prefill");
metrics::histogram!("tgi_batch_filter_duration", start_filtering_time.elapsed().as_secs_f64(), "method" => "prefill");
metrics::histogram!("tgi_batch_inference_duration", start_time.elapsed().as_secs_f64(), "method" => "prefill");
metrics::increment_counter!("tgi_batch_inference_success", "method" => "prefill");
next_batch
}
// If we have an error, we discard the whole batch
Err(err) => {
// Update health
generation_health.store(false, Ordering::SeqCst);
let _ = client.clear_cache(Some(batch_id)).await;
send_errors(err, entries);
metrics::increment_counter!("tgi_batch_inference_failure", "method" => "prefill");
None
}
}
}
#[instrument(skip_all)]
async fn decode(
client: &mut ShardedClient,
batches: Vec<CachedBatch>,
entries: &mut IntMap<u64, Entry>,
generation_health: &Arc<AtomicBool>,
) -> Option<CachedBatch> {
let start_time = Instant::now();
let batch_ids: Vec<u64> = batches.iter().map(|b| b.id).collect();
metrics::increment_counter!("tgi_batch_inference_count", "method" => "decode");
match client.decode(batches).await {
Ok((generations, next_batch, timings)) => {
// Update health
generation_health.store(true, Ordering::SeqCst);
let start_filtering_time = Instant::now();
// Send generated tokens and filter stopped entries
filter_send_generations(generations, entries);
// Filter next batch and remove requests that were stopped
let next_batch = filter_batch(client, next_batch, entries).await;
if let Some(concat_duration) = timings.concat {
metrics::histogram!("tgi_batch_concat_duration", concat_duration.as_secs_f64(), "method" => "decode");
}
metrics::histogram!("tgi_batch_forward_duration", timings.forward.as_secs_f64(), "method" => "decode");
metrics::histogram!("tgi_batch_decode_duration", timings.decode.as_secs_f64(), "method" => "decode");
metrics::histogram!("tgi_batch_filter_duration", start_filtering_time.elapsed().as_secs_f64(), "method" => "decode");
metrics::histogram!("tgi_batch_inference_duration", start_time.elapsed().as_secs_f64(), "method" => "decode");
metrics::increment_counter!("tgi_batch_inference_success", "method" => "decode");
next_batch
}
// If we have an error, we discard the whole batch
Err(err) => {
generation_health.store(false, Ordering::SeqCst);
for id in batch_ids {
let _ = client.clear_cache(Some(id)).await;
}
send_errors(err, entries);
metrics::increment_counter!("tgi_batch_inference_failure", "method" => "decode");
None
}
}
}
/// Filter a `batch` and remove all requests not present in `entries`
#[instrument(skip_all)]
async fn filter_batch(
client: &mut ShardedClient,
next_batch: Option<CachedBatch>,
entries: &IntMap<u64, Entry>,
) -> Option<CachedBatch> {
let mut batch = next_batch?;
// No need to filter
if batch.size as usize == entries.len() {
return Some(batch);
}
let id = batch.id;
// Retain only requests that are still in entries
batch.request_ids.retain(|id| entries.contains_key(id));
if batch.request_ids.is_empty() {
// All requests have been filtered out
// Next batch is now empty
// Clear it from the Python shards cache
// We unwrap here as we need to panic since we cannot recover if this method fails
client.clear_cache(Some(id)).await.unwrap();
None
} else {
// Filter Python shard cache
// We unwrap here as we need to panic since we cannot recover if this method fails
client.filter_batch(id, batch.request_ids).await.unwrap()
}
}
/// Send one or multiple `InferStreamResponse` to Infer for all `entries`
/// and filter entries
#[instrument(skip_all)]
fn filter_send_generations(generations: Vec<Generation>, entries: &mut IntMap<u64, Entry>) {
generations.into_iter().for_each(|generation| {
let id = generation.request_id;
// Get entry
// We can `expect` here as the request id should always be in the entries
let entry = entries
.get(&id)
.expect("ID not found in entries. This is a bug.");
// Create and enter a span to link this function back to the entry
let _span = info_span!(parent: entry.temp_span.as_ref().expect("batch_span is None. This is a bug."), "send_generation", generation = ?generation).entered();
// Send generation responses back to the infer task
// If the receive an error from the Flume channel, it means that the client dropped the
// request and we need to stop generating hence why we unwrap_or(true)
let stopped = send_responses(generation, entry).map_err(|err| {
tracing::error!("Entry response channel error.");
metrics::increment_counter!("tgi_request_failure", "err" => "dropped");
err
}).unwrap_or(true);
if stopped {
entries.remove(&id).expect("ID not found in entries. This is a bug.");
}
});
}
/// Send responses through the `entry` response channel
fn send_responses(
generation: Generation,
entry: &Entry,
) -> Result<bool, Box<SendError<Result<InferStreamResponse, InferError>>>> {
// Return directly if the channel is disconnected
if entry.response_tx.is_closed() {
metrics::increment_counter!("tgi_request_failure", "err" => "dropped");
return Ok(true);
}
let mut stopped = false;
if let Some(prefill_tokens) = generation.prefill_tokens {
// Create Token objects
// We do that here instead of in the Python code as Rust for loops are faster
let prefill_tokens = prefill_tokens
.ids
.into_iter()
.zip(prefill_tokens.logprobs)
.zip(prefill_tokens.texts)
.map(|((id, logprob), text)| PrefillToken { id, text, logprob })
.collect();
// Send message
entry
.response_tx
.send(Ok(InferStreamResponse::Prefill(prefill_tokens)))?;
}
// Create last Token
let tokens_ = generation.tokens.expect("Non empty tokens in generation");
let n = tokens_.ids.len();
metrics::histogram!("tgi_request_skipped_tokens", (n - 1) as f64);
let mut iterator = tokens_
.ids
.into_iter()
.zip(tokens_.logprobs)
.zip(tokens_.texts)
.zip(tokens_.is_special)
.enumerate()
.peekable();
while let Some((i, (((id, logprob), text), special))) = iterator.next() {
let token = Token {
id,
text,
logprob,
special,
};
let top_tokens = if let Some(top_tokens_) = generation.top_tokens.get(i) {
top_tokens_
.ids
.iter()
.zip(top_tokens_.logprobs.iter())
.zip(top_tokens_.texts.iter())
.zip(top_tokens_.is_special.iter())
.map(|(((&id, &logprob), text), &special)| Token {
id,
text: text.to_string(),
logprob,
special,
})
.collect()
} else {
vec![]
};
match (&generation.generated_text, iterator.peek()) {
(Some(generated_text), None) => {
// Generation has ended
stopped = true;
// Send message
entry.response_tx.send(Ok(InferStreamResponse::End {
token,
top_tokens,
generated_text: GeneratedText::from(generated_text.clone()),
queued: entry.queue_time,
start: entry.batch_time.unwrap(),
}))?;
}
_ => {
// Send message
entry
.response_tx
.send(Ok(InferStreamResponse::Intermediate { token, top_tokens }))?;
}
}
}
Ok(stopped)
}
/// Send errors to Infer for all `entries`
#[instrument(skip_all)]
fn send_errors(error: ClientError, entries: &mut IntMap<u64, Entry>) {
entries.drain().for_each(|(_, entry)| {
// Create and enter a span to link this function back to the entry
let _send_error_span = info_span!(parent: entry.temp_span.as_ref().expect("batch_span is None. This is a bug."), "send_error").entered();
let err = InferError::GenerationError(error.to_string());
metrics::increment_counter!("tgi_request_failure", "err" => "generation");
tracing::error!("{err}");
// unwrap_or is valid here as we don't care if the receiver is gone.
entry
.response_tx
.send(Err(err))
.unwrap_or(());
});
}
impl From<text_generation_client::v3::GeneratedText> for GeneratedText {
fn from(value: text_generation_client::v3::GeneratedText) -> Self {
let v3_finish_reason =
text_generation_client::v3::FinishReason::try_from(value.finish_reason).unwrap();
let finish_reason = match v3_finish_reason {
text_generation_client::v3::FinishReason::Length => FinishReason::Length,
text_generation_client::v3::FinishReason::EosToken => FinishReason::EndOfSequenceToken,
text_generation_client::v3::FinishReason::StopSequence => FinishReason::StopSequence,
};
Self {
text: value.text,
generated_tokens: value.generated_tokens,
finish_reason,
seed: value.seed,
}
}
}
// tests
#[cfg(test)]
mod tests {
use crate::infer::raise_exception;
use crate::{ChatTemplateInputs, TextMessage};
use minijinja::Environment;
#[test]
fn test_chat_template() {
let env = Environment::new();
let source = r#"
{% for message in messages %}
{% if message['role'] == 'system' %}
{% if message['content']%}
{{'### System:\n' + message['content']+'\n\n'}}
{% endif %}
{% elif message['role'] == 'user' %}
{{'### User:\n' + message['content']+'\n\n'}}
{% elif message['role'] == 'assistant' %}
{{'### Assistant:\n' + message['content']}}
{% endif %}
{% if loop.last and add_generation_prompt %}
{{ '### Assistant:\n' }}
{% endif %}
{% endfor %}"#;
// trim all the whitespace
let source = source
.lines()
.map(|line| line.trim())
.collect::<Vec<&str>>()
.join("");
let tmpl = env.template_from_str(&source);
let chat_template_inputs = ChatTemplateInputs {
messages: vec![
TextMessage {
role: "user".to_string(),
content: "Hi!".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "Hello how can I help?".to_string(),
},
TextMessage {
role: "user".to_string(),
content: "What is Deep Learning?".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "magic!".to_string(),
},
],
bos_token: Some("[BOS]"),
eos_token: Some("[EOS]"),
add_generation_prompt: true,
..Default::default()
};
let result = tmpl.unwrap().render(chat_template_inputs).unwrap();
assert_eq!(
result,
"### User:\nHi!\n\n### Assistant:\nHello how can I help?### User:\nWhat is Deep Learning?\n\n### Assistant:\nmagic!### Assistant:\n"
);
}
#[test]
fn test_chat_template_invalid_with_raise() {
let mut env = Environment::new();
env.add_function("raise_exception", raise_exception);
let source = r#"
{{ bos_token }}
{% for message in messages %}
{% if (message['role'] == 'user') != (loop.index0 % 2 == 0) %}
{{ raise_exception('Conversation roles must alternate user/assistant/user/assistant/...') }}
{% endif %}
{% if message['role'] == 'user' %}
{{ '[INST] ' + message['content'] + ' [/INST]' }}
{% elif message['role'] == 'assistant' %}
{{ message['content'] + eos_token}}
{% else %}
{{ raise_exception('Only user and assistant roles are supported!') }}
{% endif %}
{% endfor %}"#;
// trim all the whitespace
let source = source
.lines()
.map(|line| line.trim())
.collect::<Vec<&str>>()
.join("");
let tmpl = env.template_from_str(&source);
let chat_template_inputs = ChatTemplateInputs {
messages: vec![
TextMessage {
role: "user".to_string(),
content: "Hi!".to_string(),
},
TextMessage {
role: "user".to_string(),
content: "Hi again!".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "Hello how can I help?".to_string(),
},
TextMessage {
role: "user".to_string(),
content: "What is Deep Learning?".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "magic!".to_string(),
},
],
bos_token: Some("[BOS]"),
eos_token: Some("[EOS]"),
add_generation_prompt: true,
..Default::default()
};
let result = tmpl.unwrap().render(chat_template_inputs); //.err().unwrap();
match result {
Ok(_) => panic!("Should have failed"),
Err(e) => {
assert_eq!(
e.detail().unwrap(),
"Conversation roles must alternate user/assistant/user/assistant/..."
);
}
}
}
#[test]
fn test_chat_template_valid_with_raise() {
let mut env = Environment::new();
env.add_function("raise_exception", raise_exception);
let source = r#"
{{ bos_token }}
{% for message in messages %}
{% if (message['role'] == 'user') != (loop.index0 % 2 == 0) %}
{{ raise_exception('Conversation roles must alternate user/assistant/user/assistant/...') }}
{% endif %}
{% if message['role'] == 'user' %}
{{ '[INST] ' + message['content'] + ' [/INST]' }}
{% elif message['role'] == 'assistant' %}
{{ message['content'] + eos_token}}
{% else %}
{{ raise_exception('Only user and assistant roles are supported!') }}
{% endif %}
{% endfor %}"#;
// trim all the whitespace
let source = source
.lines()
.map(|line| line.trim())
.collect::<Vec<&str>>()
.join("");
let tmpl = env.template_from_str(&source);
let chat_template_inputs = ChatTemplateInputs {
messages: vec![
TextMessage {
role: "user".to_string(),
content: "Hi!".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "Hello how can I help?".to_string(),
},
TextMessage {
role: "user".to_string(),
content: "What is Deep Learning?".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "magic!".to_string(),
},
],
bos_token: Some("[BOS]"),
eos_token: Some("[EOS]"),
add_generation_prompt: true,
..Default::default()
};
let result = tmpl.unwrap().render(chat_template_inputs).unwrap();
assert_eq!(result, "[BOS][INST] Hi! [/INST]Hello how can I help?[EOS][INST] What is Deep Learning? [/INST]magic![EOS]");
}
#[test]
fn test_chat_template_valid_with_add_generation_prompt() {
let mut env = Environment::new();
env.add_function("raise_exception", raise_exception);
let source = r#"
{% for message in messages %}
{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}
{% endfor %}
{% if add_generation_prompt %}
{{ '<|im_start|>assistant\n' }}
{% endif %}"#;
// trim all the whitespace
let source = source
.lines()
.map(|line| line.trim())
.collect::<Vec<&str>>()
.join("");
let tmpl = env.template_from_str(&source);
let chat_template_inputs = ChatTemplateInputs {
messages: vec![
TextMessage {
role: "user".to_string(),
content: "Hi!".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "Hello how can I help?".to_string(),
},
TextMessage {
role: "user".to_string(),
content: "What is Deep Learning?".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "magic!".to_string(),
},
],
bos_token: Some("[BOS]"),
eos_token: Some("[EOS]"),
add_generation_prompt: true,
..Default::default()
};
let result = tmpl.unwrap().render(chat_template_inputs).unwrap();
assert_eq!(result, "<|im_start|>user\nHi!<|im_end|>\n<|im_start|>assistant\nHello how can I help?<|im_end|>\n<|im_start|>user\nWhat is Deep Learning?<|im_end|>\n<|im_start|>assistant\nmagic!<|im_end|>\n<|im_start|>assistant\n");
}
struct ChatTemplateTestItem {
name: &'static str,
chat_template: &'static str,
input: ChatTemplateInputs<'static>,
target: &'static str,
}
#[test]
fn test_many_chat_templates() {
let example_chat = vec![
TextMessage {
role: "user".to_string(),
content: "Hello, how are you?".to_string(),
},
TextMessage {
role: "assistant".to_string(),
content: "I'm doing great. How can I help you today?".to_string(),
},
TextMessage {
role: "user".to_string(),
content: "I'd like to show off how chat templating works!".to_string(),
},
];
let example_chat_with_system = [TextMessage {
role: "system".to_string(),
content: "You are a friendly chatbot who always responds in the style of a pirate"
.to_string(),
}]
.iter()
.chain(&example_chat)
.cloned()
.collect::<Vec<_>>();
let test_default_templates = vec![
ChatTemplateTestItem {
name: "_base",
chat_template: "{% for message in messages %}{{'<|im_start|>' + message['role'] + '\\n' + message['content'] + '<|im_end|>' + '\\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\\n' }}{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some(""),
eos_token: Some(""),
..Default::default()
},
target: "<|im_start|>user\nHello, how are you?<|im_end|>\n<|im_start|>assistant\nI'm doing great. How can I help you today?<|im_end|>\n<|im_start|>user\nI'd like to show off how chat templating works!<|im_end|>\n",
},
ChatTemplateTestItem {
name: "blenderbot",
chat_template: "{% for message in messages %}{% if message['role'] == 'user' %}{{ ' ' }}{% endif %}{{ message['content'] }}{% if not loop.last %}{{ ' ' }}{% endif %}{% endfor %}{{ eos_token }}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some(""),
eos_token: Some("</s>"),
..Default::default()
},
target: " Hello, how are you? I'm doing great. How can I help you today? I'd like to show off how chat templating works!</s>",
},
ChatTemplateTestItem {
name: "blenderbot_small",
chat_template: "{% for message in messages %}{% if message['role'] == 'user' %}{{ ' ' }}{% endif %}{{ message['content'] }}{% if not loop.last %}{{ ' ' }}{% endif %}{% endfor %}{{ eos_token }}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some(""),
eos_token: Some("</s>"),
..Default::default()
},
target: " Hello, how are you? I'm doing great. How can I help you today? I'd like to show off how chat templating works!</s>",
},
ChatTemplateTestItem {
name: "bloom",
chat_template: "{% for message in messages %}{{ message.content }}{{ eos_token }}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some(""),
eos_token: Some("</s>"),
..Default::default()
},
target: "Hello, how are you?</s>I'm doing great. How can I help you today?</s>I'd like to show off how chat templating works!</s>",
},
ChatTemplateTestItem {
name: "gpt_neox",
chat_template: "{% for message in messages %}{{ message.content }}{{ eos_token }}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some(""),
eos_token: Some("<|endoftext|>"),
..Default::default()
},
target: "Hello, how are you?<|endoftext|>I'm doing great. How can I help you today?<|endoftext|>I'd like to show off how chat templating works!<|endoftext|>",
},
ChatTemplateTestItem {
name: "gpt2",
chat_template: "{% for message in messages %}{{ message.content }}{{ eos_token }}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some(""),
eos_token: Some("<|endoftext|>"),
..Default::default()
},
target: "Hello, how are you?<|endoftext|>I'm doing great. How can I help you today?<|endoftext|>I'd like to show off how chat templating works!<|endoftext|>",
},
ChatTemplateTestItem {
name: "llama",
// NOTE: the `.strip()` has been replaced with `| trim` in the following template
chat_template: "{% if messages[0]['role'] == 'system' %}{% set loop_messages = messages[1:] %}{% set system_message = messages[0]['content'] %}{% elif USE_DEFAULT_PROMPT == true and not '<<SYS>>' in messages[0]['content'] %}{% set loop_messages = messages %}{% set system_message = 'DEFAULT_SYSTEM_MESSAGE' %}{% else %}{% set loop_messages = messages %}{% set system_message = false %}{% endif %}{% for message in loop_messages %}{% if (message['role'] == 'user') != (loop.index0 % 2 == 0) %}{{ raise_exception('Conversation roles must alternate user/assistant/user/assistant/...') }}{% endif %}{% if loop.index0 == 0 and system_message != false %}{% set content = '<<SYS>>\\n' + system_message + '\\n<</SYS>>\\n\\n' + message['content'] %}{% else %}{% set content = message['content'] %}{% endif %}{% if message['role'] == 'user' %}{{ bos_token +'[INST] ' + content | trim + ' [/INST]' }}{% elif message['role'] == 'system' %}{{ '<<SYS>>\\n' + content | trim + '\\n<</SYS>>\\n\\n' }}{% elif message['role'] == 'assistant' %}{{ ' ' + content | trim + ' ' + eos_token }}{% endif %}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat_with_system.clone(),
add_generation_prompt: true,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<s>[INST] <<SYS>>\nYou are a friendly chatbot who always responds in the style of a pirate\n<</SYS>>\n\nHello, how are you? [/INST] I'm doing great. How can I help you today? </s><s>[INST] I'd like to show off how chat templating works! [/INST]",
},
ChatTemplateTestItem {
name: "whisper",
chat_template: "{% for message in messages %}{{ message.content }}{{ eos_token }}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: true,
bos_token: Some(""),
eos_token: Some("<|endoftext|>"),
..Default::default()
},
target: "Hello, how are you?<|endoftext|>I'm doing great. How can I help you today?<|endoftext|>I'd like to show off how chat templating works!<|endoftext|>",
},
];
#[allow(unused_variables)] // name is unused
for ChatTemplateTestItem {
name,
chat_template,
input,
target,
} in test_default_templates
{
let mut env = Environment::new();
env.add_function("raise_exception", raise_exception);
let tmpl = env.template_from_str(chat_template);
let result = tmpl.unwrap().render(input).unwrap();
assert_eq!(result, target);
}
let test_custom_templates = vec![
ChatTemplateTestItem {
name: "HuggingFaceH4/zephyr-7b-beta (add_generation_prompt=false)",
chat_template: "{% for message in messages %}\n{% if message['role'] == 'user' %}\n{{ '<|user|>\\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'system' %}\n{{ '<|system|>\\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'assistant' %}\n{{ '<|assistant|>\\n' + message['content'] + eos_token }}\n{% endif %}\n{% if loop.last and add_generation_prompt %}\n{{ '<|assistant|>' }}\n{% endif %}\n{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat_with_system.clone(),
add_generation_prompt: false,
bos_token: Some(""),
eos_token: Some("</s>"),
..Default::default()
},
target: "<|system|>\nYou are a friendly chatbot who always responds in the style of a pirate</s><|user|>\nHello, how are you?</s><|assistant|>\nI'm doing great. How can I help you today?</s><|user|>\nI'd like to show off how chat templating works!</s>",
},
ChatTemplateTestItem {
name: "HuggingFaceH4/zephyr-7b-beta (add_generation_prompt=true)",
chat_template: "{% for message in messages %}\n{% if message['role'] == 'user' %}\n{{ '<|user|>\\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'system' %}\n{{ '<|system|>\\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'assistant' %}\n{{ '<|assistant|>\\n' + message['content'] + eos_token }}\n{% endif %}\n{% if loop.last and add_generation_prompt %}\n{{ '<|assistant|>' }}\n{% endif %}\n{% endfor %}",
input: ChatTemplateInputs {
messages: vec![
TextMessage {
role: "system".to_string(),
content: "You are a friendly chatbot who always responds in the style of a pirate".to_string(),
},
TextMessage {
role: "user".to_string(),
content: "How many helicopters can a human eat in one sitting?".to_string(),
},
],
add_generation_prompt: true,
bos_token: Some(""),
eos_token: Some("</s>"),
..Default::default()
},
target: "<|system|>\nYou are a friendly chatbot who always responds in the style of a pirate</s><|user|>\nHow many helicopters can a human eat in one sitting?</s><|assistant|>",
},
ChatTemplateTestItem {
name: "HuggingFaceH4/zephyr-7b-gemma-v0.1",
chat_template: "{% if messages[0]['role'] == 'user' or messages[0]['role'] == 'system' %}{{ bos_token }}{% endif %}{% for message in messages %}{{ '<|im_start|>' + message['role'] + '\\n' + message['content'] + '<|im_end|>' + '\\n' }}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% elif messages[-1]['role'] == 'assistant' %}{{ eos_token }}{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<bos>"),
eos_token: Some("<eos>"),
..Default::default()
},
target: "<bos><|im_start|>user\nHello, how are you?<|im_end|>\n<|im_start|>assistant\nI'm doing great. How can I help you today?<|im_end|>\n<|im_start|>user\nI'd like to show off how chat templating works!<|im_end|>\n",
},
ChatTemplateTestItem {
name: "mistralai/Mistral-7B-Instruct-v0.1",
chat_template: "{{ bos_token }}{% for message in messages %}{% if (message['role'] == 'user') != (loop.index0 % 2 == 0) %}{{ raise_exception('Conversation roles must alternate user/assistant/user/assistant/...') }}{% endif %}{% if message['role'] == 'user' %}{{ '[INST] ' + message['content'] + ' [/INST]' }}{% elif message['role'] == 'assistant' %}{{ message['content'] + eos_token + ' ' }}{% else %}{{ raise_exception('Only user and assistant roles are supported!') }}{% endif %}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<s>[INST] Hello, how are you? [/INST]I'm doing great. How can I help you today?</s> [INST] I'd like to show off how chat templating works! [/INST]",
},
ChatTemplateTestItem {
name: "mistralai/Mixtral-8x7B-Instruct-v0.1",
chat_template: "{{ bos_token }}{% for message in messages %}{% if (message['role'] == 'user') != (loop.index0 % 2 == 0) %}{{ raise_exception('Conversation roles must alternate user/assistant/user/assistant/...') }}{% endif %}{% if message['role'] == 'user' %}{{ '[INST] ' + message['content'] + ' [/INST]' }}{% elif message['role'] == 'assistant' %}{{ message['content'] + eos_token}}{% else %}{{ raise_exception('Only user and assistant roles are supported!') }}{% endif %}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<s>[INST] Hello, how are you? [/INST]I'm doing great. How can I help you today?</s>[INST] I'd like to show off how chat templating works! [/INST]",
},
ChatTemplateTestItem {
name: "cognitivecomputations/dolphin-2.5-mixtral-8x7b",
chat_template: "{% if not add_generation_prompt is defined %}{% set add_generation_prompt = false %}{% endif %}{% for message in messages %}{{'<|im_start|>' + message['role'] + '\\n' + message['content'] + '<|im_end|>' + '\\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\\n' }}{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<|im_start|>user\nHello, how are you?<|im_end|>\n<|im_start|>assistant\nI'm doing great. How can I help you today?<|im_end|>\n<|im_start|>user\nI'd like to show off how chat templating works!<|im_end|>\n",
},
ChatTemplateTestItem {
name: "openchat/openchat-3.5-0106",
// `.title()` has been replaced with `| upper` in the following template
chat_template: "{{ bos_token }}{% for message in messages %}{{ 'GPT4 Correct ' + (message['role'] | title) + ': ' + message['content'] + '<|end_of_turn|>'}}{% endfor %}{% if add_generation_prompt %}{{ 'GPT4 Correct Assistant:' }}{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<s>GPT4 Correct User: Hello, how are you?<|end_of_turn|>GPT4 Correct Assistant: I'm doing great. How can I help you today?<|end_of_turn|>GPT4 Correct User: I'd like to show off how chat templating works!<|end_of_turn|>",
},
ChatTemplateTestItem {
name: "upstage/SOLAR-10.7B-Instruct-v1.0",
chat_template: "{% for message in messages %}{{ message.content }}{{ eos_token }}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "Hello, how are you?</s>I'm doing great. How can I help you today?</s>I'd like to show off how chat templating works!</s>",
},
ChatTemplateTestItem {
name: "codellama/CodeLlama-70b-Instruct-hf",
// NOTE: `.strip()` has been replaced with `| trim` in the following template
chat_template: "{% if messages[0]['role'] == 'system' %}{% set user_index = 1 %}{% else %}{% set user_index = 0 %}{% endif %}{% for message in messages %}{% if (message['role'] == 'user') != ((loop.index0 + user_index) % 2 == 0) %}{{ raise_exception('Conversation roles must alternate user/assistant/user/assistant/...') }}{% endif %}{% if loop.index0 == 0 %}{{ '<s>' }}{% endif %}{% set content = 'Source: ' + message['role'] + '\\n\\n ' + message['content'] | trim %}{{ content + ' <step> ' }}{% endfor %}{{'Source: assistant\\nDestination: user\\n\\n '}}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<s>Source: user\n\n Hello, how are you? <step> Source: assistant\n\n I'm doing great. How can I help you today? <step> Source: user\n\n I'd like to show off how chat templating works! <step> Source: assistant\nDestination: user\n\n ",
},
ChatTemplateTestItem {
name: "Deci/DeciLM-7B-instruct",
chat_template: "{% for message in messages %}\n{% if message['role'] == 'user' %}\n{{ '### User:\\n' + message['content'] }}\n{% elif message['role'] == 'system' %}\n{{ '### System:\\n' + message['content'] }}\n{% elif message['role'] == 'assistant' %}\n{{ '### Assistant:\\n' + message['content'] }}\n{% endif %}\n{% if loop.last and add_generation_prompt %}\n{{ '### Assistant:' }}\n{% endif %}\n{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "### User:\nHello, how are you?### Assistant:\nI'm doing great. How can I help you today?### User:\nI'd like to show off how chat templating works!",
},
ChatTemplateTestItem {
name: "Qwen/Qwen1.5-72B-Chat",
chat_template: "{% for message in messages %}{% if loop.first and messages[0]['role'] != 'system' %}{{ '<|im_start|>system\\nYou are a helpful assistant<|im_end|>\\n' }}{% endif %}{{'<|im_start|>' + message['role'] + '\\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\\n'}}{% endif %}{% endfor %}{% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\\n' }}{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<|im_start|>system\nYou are a helpful assistant<|im_end|>\n<|im_start|>user\nHello, how are you?<|im_end|>\n<|im_start|>assistant\nI'm doing great. How can I help you today?<|im_end|>\n<|im_start|>user\nI'd like to show off how chat templating works!",
},
ChatTemplateTestItem {
name: "deepseek-ai/deepseek-llm-7b-chat",
chat_template: "{% if not add_generation_prompt is defined %}{% set add_generation_prompt = false %}{% endif %}{{ bos_token }}{% for message in messages %}{% if message['role'] == 'user' %}{{ 'User: ' + message['content'] + '\\n\\n' }}{% elif message['role'] == 'assistant' %}{{ 'Assistant: ' + message['content'] + eos_token }}{% elif message['role'] == 'system' %}{{ message['content'] + '\\n\\n' }}{% endif %}{% endfor %}{% if add_generation_prompt %}{{ 'Assistant:' }}{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<|begin▁of▁sentence|>"),
eos_token: Some("<|end▁of▁sentence|>"),
..Default::default()
},
target: "<|begin▁of▁sentence|>User: Hello, how are you?\n\nAssistant: I'm doing great. How can I help you today?<|end▁of▁sentence|>User: I'd like to show off how chat templating works!\n\n",
},
ChatTemplateTestItem {
name: "h2oai/h2o-danube-1.8b-chat",
chat_template: "{% for message in messages %}{% if message['role'] == 'user' %}{{ '<|prompt|>' + message['content'] + eos_token }}{% elif message['role'] == 'system' %}{{ '<|system|>' + message['content'] + eos_token }}{% elif message['role'] == 'assistant' %}{{ '<|answer|>' + message['content'] + eos_token }}{% endif %}{% if loop.last and add_generation_prompt %}{{ '<|answer|>' }}{% endif %}{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<|prompt|>Hello, how are you?</s><|answer|>I'm doing great. How can I help you today?</s><|prompt|>I'd like to show off how chat templating works!</s>",
},
ChatTemplateTestItem {
name: "internlm/internlm2-chat-7b",
chat_template: "{% if messages[0]['role'] == 'user' or messages[0]['role'] == 'system' %}{{ bos_token }}{% endif %}{% for message in messages %}{{ '<|im_start|>' + message['role'] + '\\n' + message['content'] + '<|im_end|>' + '\\n' }}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\\n' }}{% elif messages[-1]['role'] == 'assistant' %}{{ eos_token }}{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "<s><|im_start|>user\nHello, how are you?<|im_end|>\n<|im_start|>assistant\nI'm doing great. How can I help you today?<|im_end|>\n<|im_start|>user\nI'd like to show off how chat templating works!<|im_end|>\n",
},
ChatTemplateTestItem {
name: "TheBloke/deepseek-coder-33B-instruct-AWQ",
chat_template: "{%- set found_item = false -%}\n{%- for message in messages -%}\n {%- if message['role'] == 'system' -%}\n {%- set found_item = true -%}\n {%- endif -%}\n{%- endfor -%}\n{%- if not found_item -%}\n{{'You are an AI programming assistant, utilizing the Deepseek Coder model, developed by Deepseek Company, and you only answer questions related to computer science. For politically sensitive questions, security and privacy issues, and other non-computer science questions, you will refuse to answer.\\n'}}\n{%- endif %}\n{%- for message in messages %}\n {%- if message['role'] == 'system' %}\n{{ message['content'] }}\n {%- else %}\n {%- if message['role'] == 'user' %}\n{{'### Instruction:\\n' + message['content'] + '\\n'}}\n {%- else %}\n{{'### Response:\\n' + message['content'] + '\\n<|EOT|>\\n'}}\n {%- endif %}\n {%- endif %}\n{%- endfor %}\n{{'### Response:\\n'}}\n",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<|begin▁of▁sentence|>"),
eos_token: Some("<|EOT|>"),
..Default::default()
},
target: "You are an AI programming assistant, utilizing the Deepseek Coder model, developed by Deepseek Company, and you only answer questions related to computer science. For politically sensitive questions, security and privacy issues, and other non-computer science questions, you will refuse to answer.\n### Instruction:\nHello, how are you?\n### Response:\nI'm doing great. How can I help you today?\n<|EOT|>\n### Instruction:\nI'd like to show off how chat templating works!\n### Response:\n",
},
ChatTemplateTestItem {
name: "ericzzz/falcon-rw-1b-chat",
// `.strip()` has been replaced with `| trim` in the following template
chat_template: "{% for message in messages %}{% if loop.index > 1 and loop.previtem['role'] != 'assistant' %}{{ ' ' }}{% endif %}{% if message['role'] == 'system' %}{{ '[SYS] ' + message['content'] | trim }}{% elif message['role'] == 'user' %}{{ '[INST] ' + message['content'] | trim }}{% elif message['role'] == 'assistant' %}{{ '[RESP] ' + message['content'] + eos_token }}{% endif %}{% endfor %}{% if add_generation_prompt %}{{ ' [RESP] ' }}{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<|endoftext|>"),
eos_token: Some("<|endoftext|>"),
..Default::default()
},
target: "[INST] Hello, how are you? [RESP] I'm doing great. How can I help you today?<|endoftext|>[INST] I'd like to show off how chat templating works!",
},
ChatTemplateTestItem {
name: "abacusai/Smaug-34B-v0.1",
chat_template: "{%- for idx in range(0, messages|length) -%}\n{%- if messages[idx]['role'] == 'user' -%}\n{%- if idx > 1 -%}\n{{- bos_token + '[INST] ' + messages[idx]['content'] + ' [/INST]' -}}\n{%- else -%}\n{{- messages[idx]['content'] + ' [/INST]' -}}\n{%- endif -%}\n{% elif messages[idx]['role'] == 'system' %}\n{{- '[INST] <<SYS>>\\n' + messages[idx]['content'] + '\\n<</SYS>>\\n\\n' -}}\n{%- elif messages[idx]['role'] == 'assistant' -%}\n{{- ' ' + messages[idx]['content'] + ' ' + eos_token -}}\n{% endif %}\n{% endfor %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "Hello, how are you? [/INST] I'm doing great. How can I help you today? </s><s>[INST] I'd like to show off how chat templating works! [/INST]",
},
ChatTemplateTestItem {
name: "maywell/Synatra-Mixtral-8x7B",
chat_template: "Below is an instruction that describes a task. Write a response that appropriately completes the request.\n\n{% for message in messages %}{% if message['role'] == 'user' %}### Instruction:\n{{ message['content']|trim -}}{% if not loop.last %}{% endif %}\n{% elif message['role'] == 'assistant' %}### Response:\n{{ message['content']|trim -}}{% if not loop.last %}{% endif %}\n{% elif message['role'] == 'system' %}{{ message['content']|trim -}}{% if not loop.last %}{% endif %}\n{% endif %}\n{% endfor %}\n{% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}\n### Response:\n{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "Below is an instruction that describes a task. Write a response that appropriately completes the request.### Instruction:Hello, how are you?### Response:I'm doing great. How can I help you today?### Instruction:I'd like to show off how chat templating works!",
},
ChatTemplateTestItem {
name: "deepseek-ai/deepseek-coder-33b-instruct",
chat_template: "{% if not add_generation_prompt is defined %}\n{% set add_generation_prompt = false %}\n{% endif %}\n{%- set ns = namespace(found=false) -%}\n{%- for message in messages -%}\n {%- if message['role'] == 'system' -%}\n {%- set ns.found = true -%}\n {%- endif -%}\n{%- endfor -%}\n{{bos_token}}{%- if not ns.found -%}\n{{'You are an AI programming assistant, utilizing the Deepseek Coder model, developed by Deepseek Company, and you only answer questions related to computer science. For politically sensitive questions, security and privacy issues, and other non-computer science questions, you will refuse to answer\\n'}}\n{%- endif %}\n{%- for message in messages %}\n {%- if message['role'] == 'system' %}\n{{ message['content'] }}\n {%- else %}\n {%- if message['role'] == 'user' %}\n{{'### Instruction:\\n' + message['content'] + '\\n'}}\n {%- else %}\n{{'### Response:\\n' + message['content'] + '\\n<|EOT|>\\n'}}\n {%- endif %}\n {%- endif %}\n{%- endfor %}\n{% if add_generation_prompt %}\n{{'### Response:'}}\n{% endif %}",
input: ChatTemplateInputs {
messages: example_chat.clone(),
add_generation_prompt: false,
bos_token: Some("<|begin▁of▁sentence|>"),
eos_token: Some("</EOT>"),
..Default::default()
},
target: "<|begin▁of▁sentence|>You are an AI programming assistant, utilizing the Deepseek Coder model, developed by Deepseek Company, and you only answer questions related to computer science. For politically sensitive questions, security and privacy issues, and other non-computer science questions, you will refuse to answer\n### Instruction:\nHello, how are you?\n### Response:\nI'm doing great. How can I help you today?\n<|EOT|>\n### Instruction:\nI'd like to show off how chat templating works!\n",
},
// NOT INCLUDED
// - meetkai/functionary-medium-v3.2
// - fireworks-ai/firefunction-v1
// https://github
ChatTemplateTestItem {
name: "maywell/PiVoT-MoE",
chat_template: "{{ (messages|selectattr('role', 'equalto', 'system')|list|last).content|trim if (messages|selectattr('role', 'equalto', 'system')|list) else '' }}{% for message in messages %}{% if message['role'] == 'system' %}{{ message['content']|trim }}{% elif message['role'] == 'user' %}### Instruction: {{ message['content']|trim }}{% elif message['role'] == 'assistant' %}### Response: {{ message['content']|trim }}{% elif message['role'] == 'user_context' %}### Input: {{ message['content']|trim }}{% endif %}{% if not loop.last %}\n{% endif %}{% endfor %}{% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}### Response:{% endif %}",
input: ChatTemplateInputs {
messages: example_chat_with_system.clone(),
add_generation_prompt: false,
bos_token: Some("<s>"),
eos_token: Some("</s>"),
..Default::default()
},
target: "You are a friendly chatbot who always responds in the style of a pirateYou are a friendly chatbot who always responds in the style of a pirate### Instruction: Hello, how are you?### Response: I'm doing great. How can I help you today?### Instruction: I'd like to show off how chat templating works!",
},
];
#[allow(unused_variables)] // name is unused
for ChatTemplateTestItem {
name,
chat_template,
input,
target,
} in test_custom_templates
{
let mut env = Environment::new();
env.add_function("raise_exception", raise_exception);
// trim all the whitespace
let chat_template = chat_template
.lines()
.map(|line| line.trim())
.collect::<Vec<&str>>()
.join("");
let tmpl = env.template_from_str(&chat_template);
let result = tmpl.unwrap().render(input).unwrap();
assert_eq!(result, target);
}
}
}
pub mod config;
mod health;
/// Text Generation Inference Webserver /// Text Generation Inference Webserver
pub mod config;
mod infer; mod infer;
mod queue;
pub mod server; pub mod server;
mod validation; mod validation;
use infer::{Infer, InferError, InferStreamResponse};
use queue::{Entry, Queue};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::OwnedSemaphorePermit;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::warn; use tracing::warn;
use utoipa::ToSchema; use utoipa::ToSchema;
use validation::Validation; use validation::Validation;
/// Type alias for generation responses
pub(crate) type GenerateStreamResponse = (
OwnedSemaphorePermit,
u32, // input_length
UnboundedReceiverStream<Result<InferStreamResponse, InferError>>,
);
#[derive(Clone, Deserialize, ToSchema)] #[derive(Clone, Deserialize, ToSchema)]
pub(crate) struct VertexInstance { pub(crate) struct VertexInstance {
#[schema(example = "What is Deep Learning?")] #[schema(example = "What is Deep Learning?")]
...@@ -158,7 +145,7 @@ pub struct Info { ...@@ -158,7 +145,7 @@ pub struct Info {
#[schema(example = "4")] #[schema(example = "4")]
pub max_stop_sequences: usize, pub max_stop_sequences: usize,
#[schema(example = "1024")] #[schema(example = "1024")]
pub max_input_length: usize, pub max_input_tokens: usize,
#[schema(example = "2048")] #[schema(example = "2048")]
pub max_total_tokens: usize, pub max_total_tokens: usize,
#[schema(example = "1.2")] #[schema(example = "1.2")]
...@@ -1087,7 +1074,7 @@ pub struct SimpleToken { ...@@ -1087,7 +1074,7 @@ pub struct SimpleToken {
stop: usize, stop: usize,
} }
#[derive(Serialize, ToSchema)] #[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all(serialize = "snake_case"))] #[serde(rename_all(serialize = "snake_case"))]
#[schema(example = "Length")] #[schema(example = "Length")]
pub(crate) enum FinishReason { pub(crate) enum FinishReason {
......
...@@ -12,7 +12,6 @@ use std::fs::File; ...@@ -12,7 +12,6 @@ use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use text_generation_client::{ClientError, ShardedClient};
use text_generation_router::config::Config; use text_generation_router::config::Config;
use text_generation_router::{server, HubModelInfo, HubProcessorConfig, HubTokenizerConfig}; use text_generation_router::{server, HubModelInfo, HubProcessorConfig, HubTokenizerConfig};
use thiserror::Error; use thiserror::Error;
...@@ -315,59 +314,6 @@ async fn main() -> Result<(), RouterError> { ...@@ -315,59 +314,6 @@ async fn main() -> Result<(), RouterError> {
Some(pipeline_tag) => pipeline_tag.as_str() == "text-generation", Some(pipeline_tag) => pipeline_tag.as_str() == "text-generation",
}; };
// Instantiate sharded client from the master unix socket
let mut sharded_client = ShardedClient::connect_uds(master_shard_uds_path)
.await
.map_err(RouterError::Connection)?;
// Clear the cache; useful if the webserver rebooted
sharded_client
.clear_cache(None)
.await
.map_err(RouterError::Cache)?;
// Get info from the shard
let shard_info = sharded_client.info().await.map_err(RouterError::Info)?;
// Warmup model
tracing::info!("Warming up model");
let max_supported_batch_total_tokens = match sharded_client
.warmup(
max_input_tokens as u32,
max_batch_prefill_tokens,
max_total_tokens as u32,
max_batch_size,
)
.await
.map_err(RouterError::Warmup)?
{
// Older models do not support automatic max-batch-total-tokens
None => {
let max_batch_total_tokens = max_batch_total_tokens
.unwrap_or(16000.max((max_total_tokens as u32).max(max_batch_prefill_tokens)));
tracing::warn!("Model does not support automatic max batch total tokens");
max_batch_total_tokens
}
// Flash attention models return their max supported total tokens
Some(max_supported_batch_total_tokens) => {
// Warn if user added his own max-batch-total-tokens as we will ignore it
if max_batch_total_tokens.is_some() {
tracing::warn!(
"`--max-batch-total-tokens` is deprecated for Flash \
Attention models."
);
tracing::warn!(
"Inferred max batch total tokens: {max_supported_batch_total_tokens}"
);
}
if max_total_tokens as u32 > max_supported_batch_total_tokens {
return Err(RouterError::ArgumentValidation(format!("`max_total_tokens` must be <= `max_batch_total_tokens`. Given: {max_total_tokens} and {max_supported_batch_total_tokens}")));
}
max_supported_batch_total_tokens
}
};
tracing::info!("Setting max batch total tokens to {max_supported_batch_total_tokens}");
tracing::info!("Connected");
// Determine the server port based on the feature and environment variable. // Determine the server port based on the feature and environment variable.
let port = if cfg!(feature = "google") { let port = if cfg!(feature = "google") {
std::env::var("AIP_HTTP_PORT") std::env::var("AIP_HTTP_PORT")
...@@ -387,8 +333,8 @@ async fn main() -> Result<(), RouterError> { ...@@ -387,8 +333,8 @@ async fn main() -> Result<(), RouterError> {
// Run server // Run server
server::run( server::run(
master_shard_uds_path,
model_info, model_info,
shard_info,
compat_return_full_text, compat_return_full_text,
max_concurrent_requests, max_concurrent_requests,
max_best_of, max_best_of,
...@@ -398,10 +344,9 @@ async fn main() -> Result<(), RouterError> { ...@@ -398,10 +344,9 @@ async fn main() -> Result<(), RouterError> {
max_total_tokens, max_total_tokens,
waiting_served_ratio, waiting_served_ratio,
max_batch_prefill_tokens, max_batch_prefill_tokens,
max_supported_batch_total_tokens, max_batch_total_tokens,
max_waiting_tokens, max_waiting_tokens,
max_batch_size, max_batch_size,
sharded_client,
tokenizer, tokenizer,
config, config,
validation_workers, validation_workers,
...@@ -557,16 +502,8 @@ pub async fn get_tokenizer_config(api_repo: &ApiRepo) -> Option<HubTokenizerConf ...@@ -557,16 +502,8 @@ pub async fn get_tokenizer_config(api_repo: &ApiRepo) -> Option<HubTokenizerConf
enum RouterError { enum RouterError {
#[error("Argument validation error: {0}")] #[error("Argument validation error: {0}")]
ArgumentValidation(String), ArgumentValidation(String),
#[error("Unable to connect to the Python model shards: {0}")] #[error("WebServer error: {0}")]
Connection(ClientError), WebServer(#[from] server::WebServerError),
#[error("Unable to clear the Python model shards cache: {0}")]
Cache(ClientError),
#[error("Unable to get the Python model shards info: {0}")]
Info(ClientError),
#[error("Unable to warmup the Python model shards: {0}")]
Warmup(ClientError),
#[error("Tokio runtime failed to start: {0}")] #[error("Tokio runtime failed to start: {0}")]
Tokio(#[from] std::io::Error), Tokio(#[from] std::io::Error),
#[error("Axum webserver failed: {0}")]
Axum(#[from] axum::BoxError),
} }
use crate::config::Config;
/// HTTP Server logic /// HTTP Server logic
use crate::health::Health; use crate::config::Config;
use crate::infer::{InferError, InferResponse, InferStreamResponse, ToolGrammar}; use crate::infer::v2::SchedulerV2;
use crate::infer::v3::SchedulerV3;
use crate::infer::{HealthCheck, Scheduler};
use crate::infer::{Infer, InferError, InferResponse, InferStreamResponse, ToolGrammar};
use crate::validation::ValidationError; use crate::validation::ValidationError;
use crate::{ use crate::{
BestOfSequence, Details, ErrorResponse, FinishReason, GenerateParameters, GenerateRequest, BestOfSequence, Details, ErrorResponse, FinishReason, GenerateParameters, GenerateRequest,
GenerateResponse, GrammarType, HubModelInfo, HubProcessorConfig, HubTokenizerConfig, Infer, GenerateResponse, GrammarType, HubModelInfo, HubProcessorConfig, HubTokenizerConfig, Info,
Info, Message, PrefillToken, SimpleToken, StreamDetails, StreamResponse, Token, Message, PrefillToken, SimpleToken, StreamDetails, StreamResponse, Token, TokenizeResponse,
TokenizeResponse, Usage, Validation, Usage, Validation,
}; };
use crate::{ use crate::{
ChatCompletion, ChatCompletionChoice, ChatCompletionChunk, ChatCompletionComplete, ChatCompletion, ChatCompletionChoice, ChatCompletionChunk, ChatCompletionComplete,
...@@ -34,7 +36,8 @@ use std::convert::Infallible; ...@@ -34,7 +36,8 @@ use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use text_generation_client::{ShardInfo, ShardedClient}; use text_generation_client::{v2, v3, ClientError, ShardInfo};
use thiserror::Error;
use tokenizers::Tokenizer; use tokenizers::Tokenizer;
use tokio::select; use tokio::select;
use tokio::signal; use tokio::signal;
...@@ -115,7 +118,9 @@ example = json ! ({"error": "unhealthy", "error_type": "healthcheck"})), ...@@ -115,7 +118,9 @@ example = json ! ({"error": "unhealthy", "error_type": "healthcheck"})),
)] )]
#[instrument(skip(health))] #[instrument(skip(health))]
/// Health check method /// Health check method
async fn health(mut health: Extension<Health>) -> Result<(), (StatusCode, Json<ErrorResponse>)> { async fn health(
mut health: Extension<HealthCheck>,
) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
match health.check().await { match health.check().await {
true => Ok(()), true => Ok(()),
false => Err(( false => Err((
...@@ -213,9 +218,7 @@ async fn generate_internal( ...@@ -213,9 +218,7 @@ async fn generate_internal(
BestOfSequence { BestOfSequence {
generated_text: output_text, generated_text: output_text,
finish_reason: FinishReason::from( finish_reason: response.generated_text.finish_reason,
response.generated_text.finish_reason,
),
generated_tokens: response.generated_text.generated_tokens, generated_tokens: response.generated_text.generated_tokens,
prefill: response.prefill, prefill: response.prefill,
tokens: response.tokens, tokens: response.tokens,
...@@ -227,7 +230,7 @@ async fn generate_internal( ...@@ -227,7 +230,7 @@ async fn generate_internal(
}); });
Some(Details { Some(Details {
finish_reason: FinishReason::from(response.generated_text.finish_reason), finish_reason: response.generated_text.finish_reason,
generated_tokens: response.generated_text.generated_tokens, generated_tokens: response.generated_text.generated_tokens,
prefill: response.prefill, prefill: response.prefill,
tokens: response.tokens, tokens: response.tokens,
...@@ -468,7 +471,7 @@ async fn generate_stream_internal( ...@@ -468,7 +471,7 @@ async fn generate_stream_internal(
// Token details // Token details
let details = match details { let details = match details {
true => Some(StreamDetails { true => Some(StreamDetails {
finish_reason: FinishReason::from(generated_text.finish_reason), finish_reason: generated_text.finish_reason,
generated_tokens: generated_text.generated_tokens, generated_tokens: generated_text.generated_tokens,
seed: generated_text.seed, seed: generated_text.seed,
}), }),
...@@ -556,38 +559,38 @@ async fn generate_stream_internal( ...@@ -556,38 +559,38 @@ async fn generate_stream_internal(
/// Generate tokens /// Generate tokens
#[utoipa::path( #[utoipa::path(
post, post,
tag = "Text Generation Inference", tag = "Text Generation Inference",
path = "/v1/completions", path = "/v1/completions",
request_body = CompletionRequest, request_body = CompletionRequest,
responses( responses(
(status = 200, description = "Generated Chat Completion", (status = 200, description = "Generated Chat Completion",
content( content(
("application/json" = Completion), ("application/json" = Completion),
("text/event-stream" = CompletionCompleteChunk), ("text/event-stream" = CompletionCompleteChunk),
)), )),
(status = 424, description = "Generation Error", body = ErrorResponse, (status = 424, description = "Generation Error", body = ErrorResponse,
example = json ! ({"error": "Request failed during generation"})), example = json ! ({"error": "Request failed during generation"})),
(status = 429, description = "Model is overloaded", body = ErrorResponse, (status = 429, description = "Model is overloaded", body = ErrorResponse,
example = json ! ({"error": "Model is overloaded"})), example = json ! ({"error": "Model is overloaded"})),
(status = 422, description = "Input validation error", body = ErrorResponse, (status = 422, description = "Input validation error", body = ErrorResponse,
example = json ! ({"error": "Input validation error"})), example = json ! ({"error": "Input validation error"})),
(status = 500, description = "Incomplete generation", body = ErrorResponse, (status = 500, description = "Incomplete generation", body = ErrorResponse,
example = json ! ({"error": "Incomplete generation"})), example = json ! ({"error": "Incomplete generation"})),
) )
)] )]
#[instrument( #[instrument(
skip_all, skip_all,
fields( fields(
// parameters = ? req.parameters, // parameters = ? req.parameters,
total_time, total_time,
validation_time, validation_time,
queue_time, queue_time,
inference_time, inference_time,
time_per_token, time_per_token,
seed, seed,
) )
)] )]
async fn completions( async fn completions(
Extension(infer): Extension<Infer>, Extension(infer): Extension<Infer>,
Extension(compute_type): Extension<ComputeType>, Extension(compute_type): Extension<ComputeType>,
...@@ -961,38 +964,38 @@ async fn completions( ...@@ -961,38 +964,38 @@ async fn completions(
/// Generate tokens /// Generate tokens
#[utoipa::path( #[utoipa::path(
post, post,
tag = "Text Generation Inference", tag = "Text Generation Inference",
path = "/v1/chat/completions", path = "/v1/chat/completions",
request_body = ChatRequest, request_body = ChatRequest,
responses( responses(
(status = 200, description = "Generated Chat Completion", (status = 200, description = "Generated Chat Completion",
content( content(
("application/json" = ChatCompletion), ("application/json" = ChatCompletion),
("text/event-stream" = ChatCompletionChunk), ("text/event-stream" = ChatCompletionChunk),
)), )),
(status = 424, description = "Generation Error", body = ErrorResponse, (status = 424, description = "Generation Error", body = ErrorResponse,
example = json ! ({"error": "Request failed during generation"})), example = json ! ({"error": "Request failed during generation"})),
(status = 429, description = "Model is overloaded", body = ErrorResponse, (status = 429, description = "Model is overloaded", body = ErrorResponse,
example = json ! ({"error": "Model is overloaded"})), example = json ! ({"error": "Model is overloaded"})),
(status = 422, description = "Input validation error", body = ErrorResponse, (status = 422, description = "Input validation error", body = ErrorResponse,
example = json ! ({"error": "Input validation error"})), example = json ! ({"error": "Input validation error"})),
(status = 500, description = "Incomplete generation", body = ErrorResponse, (status = 500, description = "Incomplete generation", body = ErrorResponse,
example = json ! ({"error": "Incomplete generation"})), example = json ! ({"error": "Incomplete generation"})),
) )
)] )]
#[instrument( #[instrument(
skip_all, skip_all,
fields( fields(
// parameters = ? req.parameters, // parameters = ? req.parameters,
total_time, total_time,
validation_time, validation_time,
queue_time, queue_time,
inference_time, inference_time,
time_per_token, time_per_token,
seed, seed,
) )
)] )]
async fn chat_completions( async fn chat_completions(
Extension(infer): Extension<Infer>, Extension(infer): Extension<Infer>,
Extension(compute_type): Extension<ComputeType>, Extension(compute_type): Extension<ComputeType>,
...@@ -1217,22 +1220,22 @@ async fn chat_completions( ...@@ -1217,22 +1220,22 @@ async fn chat_completions(
/// Generate tokens from Vertex request /// Generate tokens from Vertex request
#[utoipa::path( #[utoipa::path(
post, post,
tag = "Text Generation Inference", tag = "Text Generation Inference",
path = "/vertex", path = "/vertex",
request_body = VertexRequest, request_body = VertexRequest,
responses( responses(
(status = 200, description = "Generated Text", body = VertexResponse), (status = 200, description = "Generated Text", body = VertexResponse),
(status = 424, description = "Generation Error", body = ErrorResponse, (status = 424, description = "Generation Error", body = ErrorResponse,
example = json ! ({"error": "Request failed during generation"})), example = json ! ({"error": "Request failed during generation"})),
(status = 429, description = "Model is overloaded", body = ErrorResponse, (status = 429, description = "Model is overloaded", body = ErrorResponse,
example = json ! ({"error": "Model is overloaded"})), example = json ! ({"error": "Model is overloaded"})),
(status = 422, description = "Input validation error", body = ErrorResponse, (status = 422, description = "Input validation error", body = ErrorResponse,
example = json ! ({"error": "Input validation error"})), example = json ! ({"error": "Input validation error"})),
(status = 500, description = "Incomplete generation", body = ErrorResponse, (status = 500, description = "Incomplete generation", body = ErrorResponse,
example = json ! ({"error": "Incomplete generation"})), example = json ! ({"error": "Incomplete generation"})),
) )
)] )]
#[instrument( #[instrument(
skip_all, skip_all,
fields( fields(
...@@ -1310,16 +1313,16 @@ async fn vertex_compatibility( ...@@ -1310,16 +1313,16 @@ async fn vertex_compatibility(
/// Tokenize inputs /// Tokenize inputs
#[utoipa::path( #[utoipa::path(
post, post,
tag = "Text Generation Inference", tag = "Text Generation Inference",
path = "/tokenize", path = "/tokenize",
request_body = GenerateRequest, request_body = GenerateRequest,
responses( responses(
(status = 200, description = "Tokenized ids", body = TokenizeResponse), (status = 200, description = "Tokenized ids", body = TokenizeResponse),
(status = 404, description = "No tokenizer found", body = ErrorResponse, (status = 404, description = "No tokenizer found", body = ErrorResponse,
example = json ! ({"error": "No fast tokenizer available"})), example = json ! ({"error": "No fast tokenizer available"})),
) )
)] )]
#[instrument(skip_all)] #[instrument(skip_all)]
async fn tokenize( async fn tokenize(
Extension(infer): Extension<Infer>, Extension(infer): Extension<Infer>,
...@@ -1372,21 +1375,20 @@ pub(crate) struct ComputeType(String); ...@@ -1372,21 +1375,20 @@ pub(crate) struct ComputeType(String);
/// Serving method /// Serving method
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn run( pub async fn run(
master_shard_uds_path: String,
model_info: HubModelInfo, model_info: HubModelInfo,
shard_info: ShardInfo,
compat_return_full_text: bool, compat_return_full_text: bool,
max_concurrent_requests: usize, max_concurrent_requests: usize,
max_best_of: usize, max_best_of: usize,
max_stop_sequences: usize, max_stop_sequences: usize,
max_top_n_tokens: u32, max_top_n_tokens: u32,
max_input_length: usize, max_input_tokens: usize,
max_total_tokens: usize, max_total_tokens: usize,
waiting_served_ratio: f32, waiting_served_ratio: f32,
max_batch_prefill_tokens: u32, max_batch_prefill_tokens: u32,
max_batch_total_tokens: u32, max_batch_total_tokens: Option<u32>,
max_waiting_tokens: usize, max_waiting_tokens: usize,
max_batch_size: Option<usize>, max_batch_size: Option<usize>,
client: ShardedClient,
tokenizer: Option<Tokenizer>, tokenizer: Option<Tokenizer>,
config: Option<Config>, config: Option<Config>,
validation_workers: usize, validation_workers: usize,
...@@ -1400,7 +1402,7 @@ pub async fn run( ...@@ -1400,7 +1402,7 @@ pub async fn run(
messages_api_enabled: bool, messages_api_enabled: bool,
grammar_support: bool, grammar_support: bool,
max_client_batch_size: usize, max_client_batch_size: usize,
) -> Result<(), axum::BoxError> { ) -> Result<(), WebServerError> {
// OpenAPI documentation // OpenAPI documentation
#[derive(OpenApi)] #[derive(OpenApi)]
#[openapi( #[openapi(
...@@ -1470,6 +1472,141 @@ pub async fn run( ...@@ -1470,6 +1472,141 @@ pub async fn run(
struct ApiDoc; struct ApiDoc;
// Create state // Create state
// Open connection, get model info and warmup
let (scheduler, health_ext, shard_info, max_batch_total_tokens): (
Arc<dyn Scheduler + Send + Sync>,
HealthCheck,
ShardInfo,
u32,
) = {
// Helper function to check both v2 and v3
let check_max_batch_total_tokens = |max_supported_batch_total_tokens: Option<u32>| {
match max_supported_batch_total_tokens {
// Older models do not support automatic max-batch-total-tokens
None => {
let max_batch_total_tokens = max_batch_total_tokens.unwrap_or(
16000.max((max_total_tokens as u32).max(max_batch_prefill_tokens)),
);
tracing::warn!("Model does not support automatic max batch total tokens");
Ok(max_batch_total_tokens)
}
// Flash attention models return their max supported total tokens
Some(max_supported_batch_total_tokens) => {
// Warn if user added his own max-batch-total-tokens as we will ignore it
if max_batch_total_tokens.is_some() {
tracing::warn!(
"`--max-batch-total-tokens` is deprecated for Flash \
Attention models."
);
tracing::warn!(
"Inferred max batch total tokens: {max_supported_batch_total_tokens}"
);
}
if max_total_tokens as u32 > max_supported_batch_total_tokens {
return Err(WebServerError::NotEnoughMemory(max_total_tokens));
}
Ok(max_supported_batch_total_tokens)
}
}
};
let generation_health = Arc::new(AtomicBool::new(false));
match v3::ShardedClient::connect_uds(master_shard_uds_path.clone()).await {
Ok(mut sharded_client) => {
// server is running on v3
// Clear the cache; useful if the webserver rebooted
sharded_client
.clear_cache(None)
.await
.map_err(WebServerError::Cache)?;
// Get info from the shard
let shard_info = sharded_client.info().await.map_err(WebServerError::Info)?;
// Warmup model
tracing::info!("Warming up model");
let max_batch_total_tokens = check_max_batch_total_tokens(
sharded_client
.warmup(
max_input_tokens as u32,
max_batch_prefill_tokens,
max_total_tokens as u32,
max_batch_size,
)
.await
.map_err(WebServerError::Warmup)?,
)?;
let health_ext =
HealthCheck::new(Arc::new(sharded_client.clone()), generation_health.clone());
let scheduler = Arc::new(SchedulerV3::new(
sharded_client,
waiting_served_ratio,
max_batch_prefill_tokens,
max_batch_total_tokens,
max_waiting_tokens,
max_batch_size,
shard_info.requires_padding,
shard_info.window_size,
shard_info.speculate,
generation_health,
));
tracing::info!("Using scheduler V3");
(scheduler, health_ext, shard_info, max_batch_total_tokens)
}
Err(_) => {
let mut sharded_client = v2::ShardedClient::connect_uds(master_shard_uds_path)
.await
.map_err(WebServerError::Connection)?;
// server is running on v2
// Clear the cache; useful if the webserver rebooted
sharded_client
.clear_cache(None)
.await
.map_err(WebServerError::Cache)?;
// Get info from the shard
let shard_info = sharded_client.info().await.map_err(WebServerError::Info)?;
// Warmup model
tracing::info!("Warming up model");
let max_batch_total_tokens = check_max_batch_total_tokens(
sharded_client
.warmup(
max_input_tokens as u32,
max_batch_prefill_tokens,
max_total_tokens as u32,
max_batch_size,
)
.await
.map_err(WebServerError::Warmup)?,
)?;
let health_ext =
HealthCheck::new(Arc::new(sharded_client.clone()), generation_health.clone());
let scheduler = Arc::new(SchedulerV2::new(
sharded_client,
waiting_served_ratio,
max_batch_prefill_tokens,
max_batch_total_tokens,
max_waiting_tokens,
max_batch_size,
shard_info.requires_padding,
shard_info.window_size,
shard_info.speculate,
generation_health,
));
tracing::info!("Using scheduler V2");
(scheduler, health_ext, shard_info, max_batch_total_tokens)
}
}
};
tracing::info!("Setting max batch total tokens to {max_batch_total_tokens}");
let validation = Validation::new( let validation = Validation::new(
validation_workers, validation_workers,
tokenizer, tokenizer,
...@@ -1477,25 +1614,15 @@ pub async fn run( ...@@ -1477,25 +1614,15 @@ pub async fn run(
max_best_of, max_best_of,
max_stop_sequences, max_stop_sequences,
max_top_n_tokens, max_top_n_tokens,
max_input_length, max_input_tokens,
max_total_tokens, max_total_tokens,
grammar_support, grammar_support,
); );
let generation_health = Arc::new(AtomicBool::new(false));
let health_ext = Health::new(client.clone(), generation_health.clone());
let infer = Infer::new( let infer = Infer::new(
client, scheduler,
validation, validation,
waiting_served_ratio,
max_batch_prefill_tokens,
max_batch_total_tokens,
max_waiting_tokens,
max_batch_size,
max_concurrent_requests, max_concurrent_requests,
shard_info.requires_padding,
shard_info.window_size,
shard_info.speculate,
generation_health,
tokenizer_config, tokenizer_config,
processor_config, processor_config,
); );
...@@ -1514,7 +1641,7 @@ pub async fn run( ...@@ -1514,7 +1641,7 @@ pub async fn run(
// Input Length buckets // Input Length buckets
let input_length_matcher = Matcher::Full(String::from("tgi_request_input_length")); let input_length_matcher = Matcher::Full(String::from("tgi_request_input_length"));
let input_length_buckets: Vec<f64> = (0..100) let input_length_buckets: Vec<f64> = (0..100)
.map(|x| (max_input_length as f64 / 100.0) * (x + 1) as f64) .map(|x| (max_input_tokens as f64 / 100.0) * (x + 1) as f64)
.collect(); .collect();
// Generated tokens buckets // Generated tokens buckets
let generated_tokens_matcher = Matcher::Full(String::from("tgi_request_generated_tokens")); let generated_tokens_matcher = Matcher::Full(String::from("tgi_request_generated_tokens"));
...@@ -1568,7 +1695,7 @@ pub async fn run( ...@@ -1568,7 +1695,7 @@ pub async fn run(
max_concurrent_requests, max_concurrent_requests,
max_best_of, max_best_of,
max_stop_sequences, max_stop_sequences,
max_input_length, max_input_tokens,
max_total_tokens, max_total_tokens,
waiting_served_ratio, waiting_served_ratio,
max_batch_total_tokens, max_batch_total_tokens,
...@@ -1664,6 +1791,8 @@ pub async fn run( ...@@ -1664,6 +1791,8 @@ pub async fn run(
.layer(OtelAxumLayer::default()) .layer(OtelAxumLayer::default())
.layer(cors_layer); .layer(cors_layer);
tracing::info!("Connected");
if ngrok { if ngrok {
#[cfg(feature = "ngrok")] #[cfg(feature = "ngrok")]
{ {
...@@ -1686,7 +1815,8 @@ pub async fn run( ...@@ -1686,7 +1815,8 @@ pub async fn run(
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app) axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal()) .with_graceful_shutdown(shutdown_signal())
.await?; .await
.map_err(|err| WebServerError::Axum(Box::new(err)))?;
} }
Ok(()) Ok(())
} }
...@@ -1719,17 +1849,6 @@ async fn shutdown_signal() { ...@@ -1719,17 +1849,6 @@ async fn shutdown_signal() {
opentelemetry::global::shutdown_tracer_provider(); opentelemetry::global::shutdown_tracer_provider();
} }
impl From<i32> for FinishReason {
fn from(finish_reason: i32) -> Self {
let finish_reason = text_generation_client::FinishReason::try_from(finish_reason).unwrap();
match finish_reason {
text_generation_client::FinishReason::Length => FinishReason::Length,
text_generation_client::FinishReason::EosToken => FinishReason::EndOfSequenceToken,
text_generation_client::FinishReason::StopSequence => FinishReason::StopSequence,
}
}
}
/// Convert to Axum supported formats /// Convert to Axum supported formats
impl From<InferError> for (StatusCode, Json<ErrorResponse>) { impl From<InferError> for (StatusCode, Json<ErrorResponse>) {
fn from(err: InferError) -> Self { fn from(err: InferError) -> Self {
...@@ -1762,3 +1881,19 @@ impl From<InferError> for Event { ...@@ -1762,3 +1881,19 @@ impl From<InferError> for Event {
.unwrap() .unwrap()
} }
} }
#[derive(Debug, Error)]
pub enum WebServerError {
#[error("Unable to connect to the Python model shards: {0}")]
Connection(ClientError),
#[error("Unable to clear the Python model shards cache: {0}")]
Cache(ClientError),
#[error("Unable to get the Python model shards info: {0}")]
Info(ClientError),
#[error("Unable to warmup the Python model shards: {0}")]
Warmup(ClientError),
#[error("Not enough memory to handle `max_total_tokens={0}`")]
NotEnoughMemory(usize),
#[error("Axum error: {0}")]
Axum(#[from] axum::BoxError),
}
use crate::config::Config;
/// Payload validation logic /// Payload validation logic
use crate::config::Config;
use crate::validation::ValidationError::{BestOfSampling, BestOfSeed, EmptyInput}; use crate::validation::ValidationError::{BestOfSampling, BestOfSeed, EmptyInput};
use crate::{GenerateParameters, GenerateRequest, GrammarType}; use crate::{GenerateParameters, GenerateRequest, GrammarType};
use base64::{engine::general_purpose::STANDARD, Engine};
use image::{io::Reader as ImageReader, ImageFormat};
use jsonschema::{Draft, JSONSchema}; use jsonschema::{Draft, JSONSchema};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use serde_json::Value; use serde_json::Value;
use std::io::Cursor; use std::io::Cursor;
use text_generation_client::{ use text_generation_client::{Chunk, Image, InputChunk};
Chunk, GrammarType as ProtoGrammarType, Image, InputChunk, NextTokenChooserParameters,
StoppingCriteriaParameters,
};
use thiserror::Error; use thiserror::Error;
use tokenizers::tokenizer::Tokenizer; use tokenizers::tokenizer::Tokenizer;
// use tokenizers::TruncationDirection;
use base64::{engine::general_purpose::STANDARD, Engine};
use image::{io::Reader as ImageReader, ImageFormat};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::{instrument, Span}; use tracing::{instrument, Span};
...@@ -173,10 +169,6 @@ impl Validation { ...@@ -173,10 +169,6 @@ impl Validation {
// Validate MaxNewTokens // Validate MaxNewTokens
if (input_length as u32 + max_new_tokens) > self.max_total_tokens as u32 { if (input_length as u32 + max_new_tokens) > self.max_total_tokens as u32 {
input_length = input_length.saturating_sub(max_new_tokens as usize); input_length = input_length.saturating_sub(max_new_tokens as usize);
// return Err(ValidationError::MaxNewTokens(
// self.max_total_tokens - self.max_input_length,
// max_new_tokens,
// ));
} }
Ok(( Ok((
...@@ -327,13 +319,13 @@ impl Validation { ...@@ -327,13 +319,13 @@ impl Validation {
// compiler and use that to build the FSM here. // compiler and use that to build the FSM here.
// Validate grammar and unpack the grammar and type for the proto message // Validate grammar and unpack the grammar and type for the proto message
let (grammar, grammar_type) = match grammar { let grammar = match grammar {
Some(grammar) => { Some(grammar) => {
// Ensure that grammar is not set if it's not supported // Ensure that grammar is not set if it's not supported
if self.disable_grammar_support { if self.disable_grammar_support {
return Err(ValidationError::Grammar); return Err(ValidationError::Grammar);
} }
match grammar { let valid_grammar = match grammar {
GrammarType::Json(json) => { GrammarType::Json(json) => {
let json = match json { let json = match json {
// if value is a string, we need to parse it again to make sure its // if value is a string, we need to parse it again to make sure its
...@@ -350,20 +342,20 @@ impl Validation { ...@@ -350,20 +342,20 @@ impl Validation {
.compile(&json) .compile(&json)
.map_err(|e| ValidationError::InvalidGrammar(e.to_string()))?; .map_err(|e| ValidationError::InvalidGrammar(e.to_string()))?;
( // Serialize json to string
// Serialize json to string ValidGrammar::Json(
serde_json::to_string(&json) serde_json::to_string(&json)
.map_err(|e| ValidationError::InvalidGrammar(e.to_string()))?, .map_err(|e| ValidationError::InvalidGrammar(e.to_string()))?,
ProtoGrammarType::Json.into(),
) )
} }
GrammarType::Regex(regex) => (regex, ProtoGrammarType::Regex.into()), GrammarType::Regex(regex) => ValidGrammar::Regex(regex),
} };
Some(valid_grammar)
} }
None => (String::new(), ProtoGrammarType::None.into()), None => None,
}; };
let parameters = NextTokenChooserParameters { let parameters = ValidParameters {
temperature, temperature,
repetition_penalty, repetition_penalty,
frequency_penalty, frequency_penalty,
...@@ -374,9 +366,8 @@ impl Validation { ...@@ -374,9 +366,8 @@ impl Validation {
seed, seed,
watermark, watermark,
grammar, grammar,
grammar_type,
}; };
let stopping_parameters = StoppingCriteriaParameters { let stopping_parameters = ValidStoppingParameters {
max_new_tokens, max_new_tokens,
stop_sequences, stop_sequences,
ignore_eos_token: false, ignore_eos_token: false,
...@@ -458,6 +449,7 @@ fn format_from_mimetype(mimetype: &str) -> Option<ImageFormat> { ...@@ -458,6 +449,7 @@ fn format_from_mimetype(mimetype: &str) -> Option<ImageFormat> {
_ => None, _ => None,
} }
} }
fn format_to_mimetype(format: ImageFormat) -> String { fn format_to_mimetype(format: ImageFormat) -> String {
match format { match format {
ImageFormat::Png => "image/png", ImageFormat::Png => "image/png",
...@@ -636,14 +628,55 @@ type TokenizerRequest = ( ...@@ -636,14 +628,55 @@ type TokenizerRequest = (
Span, Span,
); );
#[derive(Debug, Clone)]
pub(crate) enum ValidGrammar {
Json(String),
Regex(String),
}
#[derive(Debug, Clone)]
pub(crate) struct ValidParameters {
/// / exponential scaling output probability distribution
pub temperature: f32,
/// / restricting to the k highest probability elements
pub top_k: u32,
/// / restricting to top tokens summing to prob_cut_off <= prob_cut_off
pub top_p: f32,
/// / restricting to top tokens summing to prob_cut_off <= prob_cut_off
pub typical_p: f32,
/// / apply sampling on the logits
pub do_sample: bool,
/// / random seed for sampling
pub seed: u64,
/// / repetition penalty
pub repetition_penalty: f32,
/// / frequency penalty
pub frequency_penalty: f32,
/// / token watermarking using "A Watermark for Large Language Models"
pub watermark: bool,
/// / grammar (applied if not empty)
pub grammar: Option<ValidGrammar>,
}
#[derive(Debug, Clone)]
pub(crate) struct ValidStoppingParameters {
/// / Maximum number of generated tokens
pub max_new_tokens: u32,
/// / Optional stopping sequences
pub stop_sequences: Vec<String>,
/// / Ignore end of sequence token
/// / used for benchmarking
pub ignore_eos_token: bool,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct ValidGenerateRequest { pub(crate) struct ValidGenerateRequest {
pub inputs: Vec<InputChunk>, pub inputs: Vec<InputChunk>,
pub input_length: u32, pub input_length: u32,
pub truncate: u32, pub truncate: u32,
pub decoder_input_details: bool, pub decoder_input_details: bool,
pub parameters: NextTokenChooserParameters, pub parameters: ValidParameters,
pub stopping_parameters: StoppingCriteriaParameters, pub stopping_parameters: ValidStoppingParameters,
pub top_n_tokens: u32, pub top_n_tokens: u32,
} }
......
...@@ -12,8 +12,8 @@ gen-server: ...@@ -12,8 +12,8 @@ gen-server:
# Compile protos # Compile protos
pip install grpcio-tools==1.51.1 mypy-protobuf==3.4.0 'types-protobuf>=3.20.4' --no-cache-dir pip install grpcio-tools==1.51.1 mypy-protobuf==3.4.0 'types-protobuf>=3.20.4' --no-cache-dir
mkdir text_generation_server/pb || true mkdir text_generation_server/pb || true
python -m grpc_tools.protoc -I../proto --python_out=text_generation_server/pb \ python -m grpc_tools.protoc -I../proto/v3 --python_out=text_generation_server/pb \
--grpc_python_out=text_generation_server/pb --mypy_out=text_generation_server/pb ../proto/generate.proto --grpc_python_out=text_generation_server/pb --mypy_out=text_generation_server/pb ../proto/v3/generate.proto
find text_generation_server/pb/ -type f -name "*.py" -print0 -exec sed -i -e 's/^\(import.*pb2\)/from . \1/g' {} \; find text_generation_server/pb/ -type f -name "*.py" -print0 -exec sed -i -e 's/^\(import.*pb2\)/from . \1/g' {} \;
touch text_generation_server/pb/__init__.py touch text_generation_server/pb/__init__.py
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment