Unverified Commit 2be83be2 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: add video generation support (T2V) (#5793)

parent 14eceb43
......@@ -51,6 +51,7 @@ use crate::protocols::openai::{
embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse},
images::{NvCreateImageRequest, NvImagesResponse},
responses::{NvCreateResponse, NvResponse, ResponseParams, chat_completion_to_response},
videos::{NvCreateVideoRequest, NvVideosResponse},
};
use crate::request_template::RequestTemplate;
use crate::types::Annotated;
......@@ -1712,6 +1713,87 @@ pub fn images_router(
(vec![doc], router)
}
async fn videos(
State(state): State<Arc<service_v2::State>>,
headers: HeaderMap,
Json(request): Json<NvCreateVideoRequest>,
) -> Result<Response, ErrorResponse> {
// return a 503 if the service is not ready
check_ready(&state)?;
let request_id = get_or_create_request_id(request.user.as_deref(), &headers);
let request = Context::with_id(request, request_id);
let request_id = request.id().to_string();
// Videos are typically not streamed, so we default to non-streaming
let streaming = false;
// Get the model name from the request (video generation model)
let model = request.model.clone();
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
// Get the video generation engine
let engine = state
.manager()
.get_videos_engine(&model)
.map_err(|_| ErrorMessage::model_not_found())?;
// this will increment the inflight gauge for the model
let mut inflight =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Videos, streaming);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
// issue the generate call on the engine
let stream = engine
.generate(request)
.await
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate videos"))?;
// Process stream to collect metrics and drop http_queue_guard on first token
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| {
// Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
});
// Videos are typically returned as a single response (non-streaming)
// so we fold the stream into a single response
let response = NvVideosResponse::from_annotated_stream(stream)
.await
.map_err(|e| {
tracing::error!("Failed to fold videos stream for {}: {:?}", request_id, e);
ErrorMessage::internal_server_error("Failed to fold videos stream")
})?;
inflight.mark_ok();
Ok(Json(response).into_response())
}
/// Create an Axum [`Router`] for the OpenAI API Videos endpoint
/// If no path is provided, the default path is `/v1/videos`
pub fn videos_router(
state: Arc<service_v2::State>,
path: Option<String>,
) -> (Vec<RouteDoc>, Router) {
let path = path.unwrap_or("/v1/videos".to_string());
let doc = RouteDoc::new(axum::http::Method::POST, &path);
let router = Router::new()
.route(&path, post(videos))
.layer(middleware::from_fn(smart_json_error_middleware))
.layer(axum::extract::DefaultBodyLimit::max(get_body_limit()))
.with_state(state);
(vec![doc], router)
}
#[cfg(test)]
mod tests {
......
......@@ -48,6 +48,7 @@ struct StateFlags {
cmpl_endpoints_enabled: AtomicBool,
embeddings_endpoints_enabled: AtomicBool,
images_endpoints_enabled: AtomicBool,
videos_endpoints_enabled: AtomicBool,
responses_endpoints_enabled: AtomicBool,
}
......@@ -58,6 +59,7 @@ impl StateFlags {
EndpointType::Completion => self.cmpl_endpoints_enabled.load(Ordering::Relaxed),
EndpointType::Embedding => self.embeddings_endpoints_enabled.load(Ordering::Relaxed),
EndpointType::Images => self.images_endpoints_enabled.load(Ordering::Relaxed),
EndpointType::Videos => self.videos_endpoints_enabled.load(Ordering::Relaxed),
EndpointType::Responses => self.responses_endpoints_enabled.load(Ordering::Relaxed),
}
}
......@@ -76,6 +78,9 @@ impl StateFlags {
EndpointType::Images => self
.images_endpoints_enabled
.store(enabled, Ordering::Relaxed),
EndpointType::Videos => self
.videos_endpoints_enabled
.store(enabled, Ordering::Relaxed),
EndpointType::Responses => self
.responses_endpoints_enabled
.store(enabled, Ordering::Relaxed),
......@@ -107,6 +112,7 @@ impl State {
cmpl_endpoints_enabled: AtomicBool::new(false),
embeddings_endpoints_enabled: AtomicBool::new(false),
images_endpoints_enabled: AtomicBool::new(false),
videos_endpoints_enabled: AtomicBool::new(false),
responses_endpoints_enabled: AtomicBool::new(false),
},
cancel_token,
......@@ -491,6 +497,7 @@ impl HttpServiceConfigBuilder {
let (embed_docs, embed_route) =
super::openai::embeddings_router(state.clone(), var(HTTP_SVC_EMB_PATH_ENV).ok());
let (images_docs, images_route) = super::openai::images_router(state.clone(), None);
let (videos_docs, videos_route) = super::openai::videos_router(state.clone(), None);
let (responses_docs, responses_route) = super::openai::responses_router(
state.clone(),
request_template.clone(),
......@@ -502,6 +509,7 @@ impl HttpServiceConfigBuilder {
endpoint_routes.insert(EndpointType::Completion, (cmpl_docs, cmpl_route));
endpoint_routes.insert(EndpointType::Embedding, (embed_docs, embed_route));
endpoint_routes.insert(EndpointType::Images, (images_docs, images_route));
endpoint_routes.insert(EndpointType::Videos, (videos_docs, videos_route));
endpoint_routes.insert(EndpointType::Responses, (responses_docs, responses_route));
for endpoint_type in EndpointType::all() {
......
......@@ -38,6 +38,7 @@ bitflags! {
const TensorBased = 1 << 3;
const Prefill = 1 << 4;
const Images = 1 << 5;
const Videos = 1 << 6;
}
}
......@@ -64,6 +65,9 @@ impl ModelType {
pub fn supports_images(&self) -> bool {
self.contains(ModelType::Images)
}
pub fn supports_videos(&self) -> bool {
self.contains(ModelType::Videos)
}
pub fn as_vec(&self) -> Vec<&'static str> {
let mut result = Vec::new();
......@@ -85,6 +89,9 @@ impl ModelType {
if self.supports_images() {
result.push("images");
}
if self.supports_videos() {
result.push("videos");
}
result
}
......@@ -110,6 +117,9 @@ impl ModelType {
if self.supports_images() {
result.push(ModelType::Images);
}
if self.supports_videos() {
result.push(ModelType::Videos);
}
result
}
......@@ -131,6 +141,9 @@ impl ModelType {
endpoint_types.push(crate::endpoint_type::EndpointType::Images);
endpoint_types.push(crate::endpoint_type::EndpointType::Chat);
}
if self.contains(Self::Videos) {
endpoint_types.push(crate::endpoint_type::EndpointType::Videos);
}
// [gluo NOTE] ModelType::Tensor doesn't map to any endpoint type,
// current use of endpoint type is LLM specific and so does the HTTP
// server that uses it.
......
......@@ -20,6 +20,7 @@ pub mod nvext;
pub mod responses;
pub mod tools;
pub mod validate;
pub mod videos;
use validate::{
BEST_OF_RANGE, FREQUENCY_PENALTY_RANGE, MIN_P_RANGE, N_RANGE, PRESENCE_PENALTY_RANGE,
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use dynamo_runtime::protocols::annotated::AnnotationsProvider;
use serde::{Deserialize, Serialize};
use validator::Validate;
mod aggregator;
mod nvext;
pub use aggregator::DeltaAggregator;
pub use nvext::{NvExt, NvExtProvider};
/// Request for video generation (/v1/videos endpoint)
#[derive(Serialize, Deserialize, Validate, Debug, Clone)]
pub struct NvCreateVideoRequest {
/// The text prompt for video generation
pub prompt: String,
/// The model to use for video generation
pub model: String,
/// Optional image reference that guides generation (for I2V)
#[serde(skip_serializing_if = "Option::is_none")]
pub input_reference: Option<String>,
/// Clip duration in seconds
#[serde(skip_serializing_if = "Option::is_none")]
pub seconds: Option<i32>,
/// Video size in WxH format (default: "832x480")
#[serde(skip_serializing_if = "Option::is_none")]
pub size: Option<String>,
/// Optional user identifier
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
/// Response format: "url" or "b64_json" (default: "url")
#[serde(skip_serializing_if = "Option::is_none")]
pub response_format: Option<String>,
/// NVIDIA extensions
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<NvExt>,
}
/// Video data in response
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct VideoData {
/// URL of the generated video (if response_format is "url")
#[serde(skip_serializing_if = "Option::is_none")]
pub url: Option<String>,
/// Base64-encoded video (if response_format is "b64_json")
#[serde(skip_serializing_if = "Option::is_none")]
pub b64_json: Option<String>,
}
/// Response structure for video generation
#[derive(Serialize, Deserialize, Validate, Debug, Clone)]
pub struct NvVideosResponse {
/// Unique identifier for the response
pub id: String,
/// Object type (always "video")
#[serde(default = "default_object_type")]
pub object: String,
/// Model used for generation
pub model: String,
/// Status of the generation ("completed", "failed", etc.)
#[serde(default = "default_status")]
pub status: String,
/// Progress percentage (0-100)
#[serde(default = "default_progress")]
pub progress: i32,
/// Unix timestamp of creation
pub created: i64,
/// Generated video data
#[serde(default)]
pub data: Vec<VideoData>,
/// Error message if generation failed
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
/// Inference time in seconds
#[serde(skip_serializing_if = "Option::is_none")]
pub inference_time_s: Option<f64>,
}
fn default_object_type() -> String {
"video".to_string()
}
fn default_status() -> String {
"completed".to_string()
}
fn default_progress() -> i32 {
100
}
impl NvVideosResponse {
pub fn empty() -> Self {
Self {
id: String::new(),
object: "video".to_string(),
model: String::new(),
status: "completed".to_string(),
progress: 100,
created: 0,
data: vec![],
error: None,
inference_time_s: None,
}
}
}
/// Implements `NvExtProvider` for `NvCreateVideoRequest`,
/// providing access to NVIDIA-specific extensions.
impl NvExtProvider for NvCreateVideoRequest {
/// Returns a reference to the optional `NvExt` extension, if available.
fn nvext(&self) -> Option<&NvExt> {
self.nvext.as_ref()
}
}
/// Implements `AnnotationsProvider` for `NvCreateVideoRequest`,
/// enabling retrieval and management of request annotations.
impl AnnotationsProvider for NvCreateVideoRequest {
/// Retrieves the list of annotations from `NvExt`, if present.
fn annotations(&self) -> Option<Vec<String>> {
self.nvext
.as_ref()
.and_then(|nvext| nvext.annotations.clone())
}
/// Checks whether a specific annotation exists in the request.
///
/// # Arguments
/// * `annotation` - A string slice representing the annotation to check.
///
/// # Returns
/// `true` if the annotation exists, `false` otherwise.
fn has_annotation(&self, annotation: &str) -> bool {
self.nvext
.as_ref()
.and_then(|nvext| nvext.annotations.as_ref())
.map(|annotations| annotations.contains(&annotation.to_string()))
.unwrap_or(false)
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use futures::{Stream, StreamExt};
use crate::types::Annotated;
use super::NvVideosResponse;
/// Aggregator for combining video response deltas into a final response.
#[derive(Debug)]
pub struct DeltaAggregator {
response: Option<NvVideosResponse>,
error: Option<String>,
}
impl Default for DeltaAggregator {
/// Provides a default implementation for `DeltaAggregator` by calling [`DeltaAggregator::new`].
fn default() -> Self {
Self::new()
}
}
impl DeltaAggregator {
pub fn new() -> Self {
DeltaAggregator {
response: None,
error: None,
}
}
/// Aggregates a stream of annotated video responses into a final response.
pub async fn apply(
stream: impl Stream<Item = Annotated<NvVideosResponse>>,
) -> Result<NvVideosResponse, String> {
let aggregator = stream
.fold(DeltaAggregator::new(), |mut aggregator, delta| async move {
// Attempt to unwrap the delta, capturing any errors.
let delta = match delta.ok() {
Ok(delta) => delta,
Err(error) => {
aggregator.error = Some(error);
return aggregator;
}
};
if aggregator.error.is_none()
&& let Some(response) = delta.data
{
// For videos, we typically expect a single complete response
// or we accumulate data from multiple responses
match &mut aggregator.response {
Some(existing) => {
// Merge video data if we have multiple responses
existing.data.extend(response.data);
}
None => {
aggregator.response = Some(response);
}
}
}
aggregator
})
.await;
// Return early if an error was encountered.
if let Some(error) = aggregator.error {
return Err(error);
}
// Return the aggregated response or an empty response if none was found.
Ok(aggregator.response.unwrap_or_else(NvVideosResponse::empty))
}
}
impl NvVideosResponse {
/// Aggregates an annotated stream of video responses into a final response.
///
/// # Arguments
/// * `stream` - A stream of annotated video responses.
///
/// # Returns
/// * `Ok(NvVideosResponse)` if aggregation succeeds.
/// * `Err(String)` if an error occurs.
pub async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvVideosResponse>>,
) -> Result<NvVideosResponse, String> {
DeltaAggregator::apply(stream).await
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use validator::{Validate, ValidationError};
pub trait NvExtProvider {
fn nvext(&self) -> Option<&NvExt>;
}
/// NVIDIA extensions to the OpenAI Videos API
#[derive(ToSchema, Serialize, Deserialize, Builder, Validate, Debug, Clone)]
#[validate(schema(function = "validate_nv_ext"))]
pub struct NvExt {
/// Annotations
/// User requests triggers which result in the request issue back out-of-band information in the SSE
/// stream using the `event:` field.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub annotations: Option<Vec<String>>,
/// Frames per second (default: 24)
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub fps: Option<i32>,
/// Number of frames to generate (overrides fps * seconds if set)
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub num_frames: Option<i32>,
/// A text description of the undesired video content.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub negative_prompt: Option<String>,
/// The number of denoising steps. More steps usually lead to higher quality at the expense of slower inference.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub num_inference_steps: Option<i32>,
/// The CFG scale. Higher values usually lead to more coherent output.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub guidance_scale: Option<f32>,
/// The seed for the random number generator.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub seed: Option<i64>,
}
impl Default for NvExt {
fn default() -> Self {
NvExt::builder().build().unwrap()
}
}
impl NvExt {
pub fn builder() -> NvExtBuilder {
NvExtBuilder::default()
}
}
fn validate_nv_ext(_nv_ext: &NvExt) -> Result<(), ValidationError> {
Ok(())
}
impl NvExtBuilder {
pub fn add_annotation(&mut self, annotation: impl Into<String>) -> &mut Self {
self.annotations
.get_or_insert_with(|| Some(vec![]))
.as_mut()
.expect("annotations should always be Some(Vec)")
.push(annotation.into());
self
}
}
......@@ -83,6 +83,19 @@ pub mod openai {
pub type OpenAIImagesStreamingEngine =
ServerStreamingEngine<NvCreateImageRequest, Annotated<NvImagesResponse>>;
}
pub mod videos {
use super::*;
pub use protocols::openai::videos::{NvCreateVideoRequest, NvVideosResponse};
/// A [`UnaryEngine`] implementation for the OpenAI Videos API
pub type OpenAIVideosUnaryEngine = UnaryEngine<NvCreateVideoRequest, NvVideosResponse>;
/// A [`ServerStreamingEngine`] implementation for the OpenAI Videos API
pub type OpenAIVideosStreamingEngine =
ServerStreamingEngine<NvCreateVideoRequest, Annotated<NvVideosResponse>>;
}
}
pub mod generic {
......
......@@ -218,6 +218,7 @@ fn compute_index(endpoint: &Endpoint, request_type: &RequestType, status: &Statu
Endpoint::Responses => todo!(),
Endpoint::Tensor => todo!(),
Endpoint::Images => todo!(),
Endpoint::Videos => todo!(),
};
let request_type = match request_type {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment