Unverified Commit a485ab78 authored by Chi McIsaac's avatar Chi McIsaac Committed by GitHub
Browse files

feat: align OpenAI response IDs with distributed trace IDs (#2496)

parent 5d177b61
......@@ -212,9 +212,9 @@ impl MistralRsEngine {
// Perform warmup request
let (tx, mut rx) = channel(1);
let request_id = engine.mistralrs.next_request_id();
let mistralrs_request_id = engine.mistralrs.next_request_id();
let warmup_request = Request::Normal(Box::new(NormalRequest {
id: request_id,
id: mistralrs_request_id,
model_id: Some(display_name.to_string()),
messages: RequestMessage::Chat {
messages: vec![IndexMap::from([
......@@ -246,10 +246,10 @@ impl MistralRsEngine {
{
match response.as_result() {
Ok(r) => {
tracing::debug!(request_id, "Warmup response: {r:?}");
tracing::debug!(mistralrs_request_id, "Warmup response: {r:?}");
}
Err(err) => {
tracing::error!(request_id, %err, "Failed converting response to result.");
tracing::error!(mistralrs_request_id, %err, "Failed converting response to result.");
}
}
}
......@@ -272,6 +272,7 @@ impl
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
let (request, context) = request.transfer(());
let ctx = context.context();
let request_id = ctx.id().to_string();
let (tx, mut rx) = channel(10_000);
let mut messages = vec![];
......@@ -338,9 +339,9 @@ impl
n_choices: 1,
dry_params: det.dry_params,
};
let request_id = self.mistralrs.next_request_id();
let mistralrs_request_id = self.mistralrs.next_request_id();
let mistralrs_request = Request::Normal(Box::new(NormalRequest {
id: request_id,
id: mistralrs_request_id,
model_id: Some(self.display_name.clone()),
messages: RequestMessage::Chat {
messages,
......@@ -369,14 +370,14 @@ impl
let response = match response.as_result() {
Ok(r) => r,
Err(err) => {
tracing::error!(request_id, %err, "Failed converting mistralrs channel response to result.");
tracing::error!(mistralrs_request_id, %err, "Failed converting mistralrs channel response to result.");
break;
}
};
match response {
ResponseOk::Chunk(c) => {
let Some(from_assistant) = c.choices[0].delta.content.clone() else {
tracing::warn!(request_id, "No content from mistralrs. Abandoning request.");
tracing::warn!(mistralrs_request_id, "No content from mistralrs. Abandoning request.");
break;
};
let finish_reason = match &c.choices[0].finish_reason.as_deref() {
......@@ -387,7 +388,7 @@ impl
Some(FinishReason::Length)
}
Some(s) => {
tracing::warn!(request_id, stop_reason = s, "Unknow stop reason");
tracing::warn!(mistralrs_request_id, stop_reason = s, "Unknow stop reason");
Some(FinishReason::Stop)
}
None => None,
......@@ -396,7 +397,7 @@ impl
#[allow(deprecated)]
let delta = NvCreateChatCompletionStreamResponse {
id: c.id,
id: format!("chatcmpl-{request_id}"),
choices: vec![dynamo_async_openai::types::ChatChoiceStream{
index: 0,
delta: dynamo_async_openai::types::ChatCompletionStreamResponseDelta{
......@@ -427,11 +428,11 @@ impl
yield ann;
if finish_reason.is_some() {
//tracing::trace!(request_id, "Finish reason: {finish_reason:?}");
//tracing::trace!(mistralrs_request_id, "Finish reason: {finish_reason:?}");
break;
}
},
x => tracing::error!(request_id, "Unhandled. {x:?}"),
x => tracing::error!(mistralrs_request_id, "Unhandled. {x:?}"),
}
}
};
......@@ -485,7 +486,7 @@ impl
let (request, context) = request.transfer(());
let ctx = context.context();
let (tx, mut rx) = channel(10_000);
let response_generator = request.response_generator();
let response_generator = request.response_generator(ctx.id().to_string());
let messages = RequestMessage::Completion {
text: prompt_to_string(&request.inner.prompt),
......@@ -539,9 +540,9 @@ impl
dry_params: det.dry_params,
};
let request_id = self.mistralrs.next_request_id();
let mistralrs_request_id = self.mistralrs.next_request_id();
let mistralrs_request = Request::Normal(Box::new(NormalRequest {
id: request_id,
id: mistralrs_request_id,
model_id: Some(self.display_name.clone()),
messages,
sampling_params,
......@@ -567,7 +568,7 @@ impl
let response = match response.as_result() {
Ok(r) => r,
Err(err) => {
tracing::error!(request_id, %err, "Failed converting mistralrs channel response to result.");
tracing::error!(mistralrs_request_id, %err, "Failed converting mistralrs channel response to result.");
break;
}
};
......@@ -583,7 +584,7 @@ impl
Some(FinishReason::Length)
}
Some(s) => {
tracing::warn!(request_id, stop_reason = s, "Unknow stop reason");
tracing::warn!(mistralrs_request_id, stop_reason = s, "Unknow stop reason");
Some(FinishReason::Stop)
}
None => None,
......@@ -602,7 +603,7 @@ impl
break;
}
},
x => tracing::error!(request_id, "Unhandled. {x:?}"),
x => tracing::error!(mistralrs_request_id, "Unhandled. {x:?}"),
}
}
};
......
......@@ -14,7 +14,6 @@ use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
use dynamo_runtime::protocols::annotated::Annotated;
use crate::backend::ExecutionContext;
use crate::local_model::runtime_config;
use crate::preprocessor::PreprocessedRequest;
use crate::protocols::common::llm_backend::LLMEngineOutput;
use crate::protocols::openai::{
......@@ -184,8 +183,8 @@ impl
incoming_request: SingleIn<NvCreateChatCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
let (request, context) = incoming_request.transfer(());
let mut deltas = request.response_generator(runtime_config::ModelRuntimeConfig::default());
let ctx = context.context();
let mut deltas = request.response_generator(ctx.id().to_string());
let req = request.inner.messages.into_iter().next_back().unwrap();
let prompt = match req {
......@@ -231,8 +230,8 @@ impl
incoming_request: SingleIn<NvCreateCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateCompletionResponse>>, Error> {
let (request, context) = incoming_request.transfer(());
let deltas = request.response_generator();
let ctx = context.context();
let deltas = request.response_generator(ctx.id().to_string());
let chars_string = prompt_to_string(&request.inner.prompt);
let output = stream! {
let mut id = 1;
......
......@@ -253,8 +253,7 @@ async fn completions(
// return a 503 if the service is not ready
check_ready(&state)?;
// todo - extract distributed tracing id and context id from headers
let request_id = uuid::Uuid::new_v4().to_string();
let request_id = request.id().to_string();
// todo - decide on default
let streaming = request.inner.stream.unwrap_or(false);
......@@ -354,13 +353,15 @@ async fn completions(
#[tracing::instrument(skip_all)]
async fn embeddings(
State(state): State<Arc<service_v2::State>>,
headers: HeaderMap,
Json(request): Json<NvCreateEmbeddingRequest>,
) -> Result<Response, ErrorResponse> {
// return a 503 if the service is not ready
check_ready(&state)?;
// todo - extract distributed tracing id and context id from headers
let request_id = uuid::Uuid::new_v4().to_string();
let request_id = get_or_create_request_id(request.inner.user.as_deref(), &headers);
let request = Context::with_id(request, request_id);
let request_id = request.id().to_string();
// Embeddings are typically not streamed, so we default to non-streaming
let streaming = false;
......@@ -381,10 +382,6 @@ async fn embeddings(
.metrics_clone()
.create_inflight_guard(model, Endpoint::Embeddings, streaming);
// setup context
// todo - inherit request_id from distributed trace details
let request = Context::with_id(request, request_id.clone());
// issue the generate call on the engine
let stream = engine
.generate(request)
......
......@@ -22,7 +22,6 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::{collections::HashMap, sync::Arc};
use tracing;
use crate::local_model::runtime_config::ModelRuntimeConfig;
use crate::model_card::{ModelDeploymentCard, ModelInfo, TokenizerKind};
use crate::preprocessor::prompt::OAIChatLikeRequest;
use crate::tokenizers::Encoding;
......@@ -95,7 +94,6 @@ pub struct OpenAIPreprocessor {
formatter: Arc<dyn OAIPromptFormatter>,
tokenizer: Arc<dyn Tokenizer>,
model_info: Arc<dyn ModelInfo>,
runtime_config: ModelRuntimeConfig,
}
impl OpenAIPreprocessor {
......@@ -123,14 +121,11 @@ impl OpenAIPreprocessor {
};
let model_info = model_info.get_model_info().await?;
let runtime_config = mdc.runtime_config.clone();
Ok(Arc::new(Self {
formatter,
tokenizer,
model_info,
mdcsum,
runtime_config,
}))
}
......@@ -499,7 +494,7 @@ impl
let (request, context) = request.into_parts();
// create a response generator
let response_generator = request.response_generator(self.runtime_config.clone());
let response_generator = request.response_generator(context.id().to_string());
let mut response_generator = Box::new(response_generator);
// convert the chat completion request to a common completion request
......@@ -553,7 +548,7 @@ impl
let (request, context) = request.into_parts();
// create a response generator
let response_generator = request.response_generator();
let response_generator = request.response_generator(context.id().to_string());
let mut response_generator = Box::new(response_generator);
// convert the chat completion request to a common completion request
let (common_request, annotations) = self.preprocess_request(&request)?;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};
use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
use crate::{
local_model::runtime_config,
local_model::runtime_config::ModelRuntimeConfig,
protocols::common::{self},
types::TokenIdType,
};
use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};
/// Provides a method for generating a [`DeltaGenerator`] from a chat completion request.
impl NvCreateChatCompletionRequest {
/// Creates a [`DeltaGenerator`] instance based on the chat completion request.
///
/// # Arguments
/// * `request_id` - The request ID to use for the chat completion response ID.
///
/// # Returns
/// * [`DeltaGenerator`] configured with model name and response options.
pub fn response_generator(
&self,
runtime_config: runtime_config::ModelRuntimeConfig,
) -> DeltaGenerator {
pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
let options = DeltaGeneratorOptions {
enable_usage: true,
enable_logprobs: self.inner.logprobs.unwrap_or(false)
|| self.inner.top_logprobs.unwrap_or(0) > 0,
runtime_config,
runtime_config: ModelRuntimeConfig::default(),
};
DeltaGenerator::new(self.inner.model.clone(), options)
DeltaGenerator::new(self.inner.model.clone(), options, request_id)
}
}
......@@ -39,7 +38,7 @@ pub struct DeltaGeneratorOptions {
/// Determines whether log probabilities should be included in the response.
pub enable_logprobs: bool,
pub runtime_config: runtime_config::ModelRuntimeConfig,
pub runtime_config: ModelRuntimeConfig,
}
/// Generates incremental chat completion responses in a streaming fashion.
......@@ -74,10 +73,11 @@ impl DeltaGenerator {
/// # Arguments
/// * `model` - The model name used for response generation.
/// * `options` - Configuration options for enabling usage and log probabilities.
/// * `request_id` - The request ID to use for the chat completion response.
///
/// # Returns
/// * A new instance of [`DeltaGenerator`].
pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
......@@ -108,8 +108,10 @@ impl DeltaGenerator {
.unwrap_or("basic"),
);
let chatcmpl_id = format!("chatcmpl-{request_id}");
Self {
id: format!("chatcmpl-{}", uuid::Uuid::new_v4()),
id: chatcmpl_id,
object: "chat.completion.chunk".to_string(),
created: now,
model,
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{NvCreateCompletionRequest, NvCreateCompletionResponse};
use crate::{protocols::common, types::TokenIdType};
......@@ -19,13 +7,13 @@ use crate::{protocols::common, types::TokenIdType};
impl NvCreateCompletionRequest {
// put this method on the request
// inspect the request to extract options
pub fn response_generator(&self) -> DeltaGenerator {
pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
let options = DeltaGeneratorOptions {
enable_usage: true,
enable_logprobs: self.inner.logprobs.unwrap_or(0) > 0,
};
DeltaGenerator::new(self.inner.model.clone(), options)
DeltaGenerator::new(self.inner.model.clone(), options, request_id)
}
}
......@@ -47,7 +35,7 @@ pub struct DeltaGenerator {
}
impl DeltaGenerator {
pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
......@@ -67,8 +55,10 @@ impl DeltaGenerator {
prompt_tokens_details: None,
};
let completion_id = format!("cmpl-{request_id}");
Self {
id: format!("cmpl-{}", uuid::Uuid::new_v4()),
id: completion_id,
object: "text_completion".to_string(),
created: now,
model,
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::Error;
use async_stream::stream;
use dynamo_async_openai::config::OpenAIConfig;
use dynamo_llm::http::{
client::{
GenericBYOTClient, HttpClientConfig, HttpRequestContext, NvCustomClient, PureOpenAIClient,
},
service::{
Metrics,
error::HttpError,
metrics::{Endpoint, FRONTEND_METRIC_PREFIX, RequestType, Status},
service_v2::HttpService,
},
};
use dynamo_llm::protocols::{
Annotated,
codec::SseLineCodec,
......@@ -25,21 +24,6 @@ use dynamo_llm::protocols::{
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
},
};
use dynamo_llm::{
http::{
client::{
GenericBYOTClient, HttpClientConfig, HttpRequestContext, NvCustomClient,
PureOpenAIClient,
},
service::{
Metrics,
error::HttpError,
metrics::{Endpoint, FRONTEND_METRIC_PREFIX, RequestType, Status},
service_v2::HttpService,
},
},
local_model::runtime_config,
};
use dynamo_runtime::{
CancellationToken,
engine::AsyncEngineContext,
......@@ -99,8 +83,7 @@ impl
let max_tokens = request.inner.max_tokens.unwrap_or(0) as u64;
// let generator = NvCreateChatCompletionStreamResponse::generator(request.model.clone());
let mut generator =
request.response_generator(runtime_config::ModelRuntimeConfig::default());
let mut generator = request.response_generator(ctx.id().to_string());
let stream = stream! {
tokio::time::sleep(std::time::Duration::from_millis(max_tokens)).await;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment