Unverified Commit 44d43d0c authored by John Pohl's avatar John Pohl Committed by GitHub
Browse files

feat: MJPEG video streaming via /v1/videos/stream (experimental) (#6487)


Signed-off-by: default avatarRyan McCormick <mccormick.codes@gmail.com>
Co-authored-by: default avatarRyan McCormick <rmccormick@nvidia.com>
Co-authored-by: default avatarishandhanani <82981111+ishandhanani@users.noreply.github.com>
parent 65dc451d
......@@ -21,6 +21,8 @@ use axum::{
},
routing::{get, post},
};
use base64::Engine as _;
use bytes::Bytes;
use dynamo_runtime::config::environment_names::llm as env_llm;
use dynamo_runtime::{
pipeline::{AsyncEngineContextProvider, Context},
......@@ -1887,20 +1889,155 @@ async fn videos(
Ok(Json(response).into_response())
}
/// [EXPERIMENTAL] MJPEG streaming handler for `/v1/videos/stream`.
///
/// The backend is expected to yield one [`NvVideosResponse`] per frame, carrying a
/// JPEG-encoded frame as `data[0].b64_json`. This handler decodes each frame and
/// writes it as an MJPEG multipart boundary so the client receives a live
/// `multipart/x-mixed-replace` stream viewable directly in a browser `<img>` tag
/// or via `ffplay http://.../v1/videos/stream`.
async fn video_stream(
State(state): State<Arc<service_v2::State>>,
headers: HeaderMap,
Json(request): Json<NvCreateVideoRequest>,
) -> Result<Response, ErrorResponse> {
check_ready(&state)?;
let request_id = get_or_create_request_id(request.user.as_deref(), &headers);
let request = Context::with_id(request, request_id);
let model = request.model.clone();
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
let engine = state
.manager()
.get_videos_engine(&model)
.map_err(|_| ErrorMessage::model_not_found())?;
let mut inflight = state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Videos, true);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
let stream = engine
.generate(request)
.await
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to start video stream"))?;
// Capture the context to cancel the stream if the client disconnects.
let ctx = stream.context();
// Create connection monitor. The connection_handle is disarmed immediately because
// video_stream returns the streaming body directly (graceful handler exit).
// The stream_handle is armed below and lives inside the monitored stream so that
// a client disconnect (body drop) signals the engine context to cancel.
let (mut connection_handle, mut stream_handle) =
create_connection_monitor(ctx.clone(), Some(state.metrics_clone())).await;
connection_handle.disarm();
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| {
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
});
// Map each annotated NvVideosResponse to an MJPEG boundary chunk.
// The backend yields one response per frame with the JPEG in data[0].b64_json.
let mjpeg_stream = stream.filter_map(|annotated| async move {
let ann = match annotated.ok() {
Ok(a) => a,
Err(e) => {
tracing::error!("Video stream error: {e}");
return None;
}
};
let response = ann.data?;
let frame = response.data.into_iter().next()?;
let b64 = frame.b64_json?;
let jpeg_bytes = match base64::prelude::BASE64_STANDARD.decode(&b64) {
Ok(b) => b,
Err(e) => {
tracing::warn!("Failed to decode frame base64: {e}");
return None;
}
};
let header = format!(
"--frame\r\nContent-Type: image/jpeg\r\nContent-Length: {}\r\n\r\n",
jpeg_bytes.len()
);
let mut chunk = Vec::with_capacity(header.len() + jpeg_bytes.len() + 2);
chunk.extend_from_slice(header.as_bytes());
chunk.extend_from_slice(&jpeg_bytes);
chunk.extend_from_slice(b"\r\n");
Some(Ok::<Bytes, std::convert::Infallible>(Bytes::from(chunk)))
});
// Arm the stream handle and monitor for client disconnects or context cancellation.
// inflight.mark_ok() is deferred until the stream ends naturally. If the stream is
// dropped early (client disconnect), the armed stream_handle signals the connection
// monitor, which cancels the engine context.
stream_handle.arm();
let monitored_stream = async_stream::stream! {
tokio::pin!(mjpeg_stream);
loop {
tokio::select! {
frame = mjpeg_stream.next() => {
match frame {
Some(item) => yield item,
None => {
// Stream ended naturally: mark inflight OK and disarm the handle.
inflight.mark_ok();
stream_handle.disarm();
break;
}
}
}
_ = ctx.stopped() => {
tracing::trace!("Context stopped; breaking MJPEG stream");
break;
}
}
}
};
axum::http::Response::builder()
.status(axum::http::StatusCode::OK)
.header(
axum::http::header::CONTENT_TYPE,
"multipart/x-mixed-replace; boundary=frame",
)
.body(Body::from_stream(monitored_stream))
.map(|r| r.into_response())
.map_err(|e| {
ErrorMessage::internal_server_error(&format!("Failed to build MJPEG response: {e}"))
})
}
/// Create an Axum [`Router`] for the OpenAI API Videos endpoint
/// If no path is provided, the default path is `/v1/videos`
///
/// Two routes are registered:
/// - `POST /v1/videos` — non-streaming, returns a single JSON response
/// - `POST /v1/videos/stream` — MJPEG streaming via `multipart/x-mixed-replace`
pub fn videos_router(
state: Arc<service_v2::State>,
path: Option<String>,
) -> (Vec<RouteDoc>, Router) {
let path = path.unwrap_or("/v1/videos".to_string());
let stream_path = format!("{}/stream", path);
let doc = RouteDoc::new(axum::http::Method::POST, &path);
let stream_doc = RouteDoc::new(axum::http::Method::POST, &stream_path);
let router = Router::new()
.route(&path, post(videos))
.route(&stream_path, post(video_stream))
.layer(middleware::from_fn(smart_json_error_middleware))
.layer(axum::extract::DefaultBodyLimit::max(get_body_limit()))
.with_state(state);
(vec![doc], router)
(vec![doc, stream_doc], router)
}
#[cfg(test)]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment