Unverified Commit bf9e6d04 authored by milesial's avatar milesial Committed by GitHub
Browse files

chore: remove media-nixl feature (#5940)


Signed-off-by: default avatarAlexandre Milesi <milesial@users.noreply.github.com>
parent 9b1e461e
...@@ -91,15 +91,15 @@ jobs: ...@@ -91,15 +91,15 @@ jobs:
working-directory: ./deploy working-directory: ./deploy
run: | run: |
docker compose up -d nats-server etcd-server docker compose up -d nats-server etcd-server
- name: Run Rust checks (block-manager + media-nixl + media-ffmpeg + integration tests) - name: Run Rust checks (block-manager + media-ffmpeg + integration tests)
run: | run: |
docker run --rm -w /workspace/lib/llm \ docker run --rm -w /workspace/lib/llm \
--name ${{ env.CONTAINER_ID }}_rust_checks \ --name ${{ env.CONTAINER_ID }}_rust_checks \
${{ steps.define_image_tag.outputs.image_tag }} \ ${{ steps.define_image_tag.outputs.image_tag }} \
bash -ec 'rustup component add rustfmt clippy && \ bash -ec 'rustup component add rustfmt clippy && \
cargo fmt -- --check && \ cargo fmt -- --check && \
cargo clippy --features block-manager,media-nixl,media-ffmpeg --no-deps --all-targets -- -D warnings && \ cargo clippy --features block-manager,media-ffmpeg --no-deps --all-targets -- -D warnings && \
cargo test --locked --all-targets --features=block-manager,media-nixl,media-ffmpeg,testing-nixl && \ cargo test --locked --all-targets --features=block-manager,media-ffmpeg,testing-nixl && \
cargo test --locked --features integration -- --nocapture' cargo test --locked --features integration -- --nocapture'
- name: Cleanup services - name: Cleanup services
if: always() if: always()
......
...@@ -272,8 +272,6 @@ The build process automatically: ...@@ -272,8 +272,6 @@ The build process automatically:
For more details, see [`deploy/inference-gateway/README.md`](../deploy/inference-gateway/README.md). For more details, see [`deploy/inference-gateway/README.md`](../deploy/inference-gateway/README.md).
**Note:** `--framework none` defaults `ENABLE_MEDIA_NIXL=false`.
#### Frontend Image Contents #### Frontend Image Contents
The frontend image includes: The frontend image includes:
......
...@@ -25,7 +25,6 @@ dynamo: ...@@ -25,7 +25,6 @@ dynamo:
nixl_ucx_efa_ref: 9d2b88a1f67faf9876f267658bd077b379b8bb76 nixl_ucx_efa_ref: 9d2b88a1f67faf9876f267658bd077b379b8bb76
nixl_libfabric_ref: v2.3.0 nixl_libfabric_ref: v2.3.0
enable_kvbm: "false" enable_kvbm: "false"
enable_media_nixl: "false"
enable_media_ffmpeg: "false" enable_media_ffmpeg: "false"
enable_gpu_memory_service: "false" enable_gpu_memory_service: "false"
ffmpeg_version: "7.1" ffmpeg_version: "7.1"
...@@ -39,7 +38,6 @@ vllm: ...@@ -39,7 +38,6 @@ vllm:
flashinf_ref: v0.5.3 flashinf_ref: v0.5.3
lmcache_ref: 0.3.12 lmcache_ref: 0.3.12
max_jobs: "10" max_jobs: "10"
enable_media_nixl: "true"
enable_media_ffmpeg: "true" enable_media_ffmpeg: "true"
enable_gpu_memory_service: "true" enable_gpu_memory_service: "true"
enable_kvbm: "true" enable_kvbm: "true"
...@@ -57,7 +55,6 @@ sglang: ...@@ -57,7 +55,6 @@ sglang:
cuda13.0: cuda13.0:
base_image_tag: 25.11-cuda13.0-devel-ubuntu24.04 base_image_tag: 25.11-cuda13.0-devel-ubuntu24.04
runtime_image_tag: v0.5.8-cu130-runtime runtime_image_tag: v0.5.8-cu130-runtime
enable_media_nixl: "true"
enable_media_ffmpeg: "true" enable_media_ffmpeg: "true"
enable_gpu_memory_service: "true" enable_gpu_memory_service: "true"
enable_kvbm: "false" enable_kvbm: "false"
...@@ -67,7 +64,6 @@ trtllm: ...@@ -67,7 +64,6 @@ trtllm:
base_image_tag: 25.12-py3 base_image_tag: 25.12-py3
runtime_image: nvcr.io/nvidia/cuda-dl-base runtime_image: nvcr.io/nvidia/cuda-dl-base
runtime_image_tag: 25.10-cuda13.0-runtime-ubuntu24.04 runtime_image_tag: 25.10-cuda13.0-runtime-ubuntu24.04
enable_media_nixl: "true"
enable_media_ffmpeg: "true" enable_media_ffmpeg: "true"
enable_gpu_memory_service: "false" enable_gpu_memory_service: "false"
enable_kvbm: "true" enable_kvbm: "true"
......
...@@ -50,7 +50,6 @@ ARG CARGO_BUILD_JOBS ...@@ -50,7 +50,6 @@ ARG CARGO_BUILD_JOBS
ARG NATS_VERSION={{ context.dynamo.nats_version }} ARG NATS_VERSION={{ context.dynamo.nats_version }}
ARG ETCD_VERSION={{ context.dynamo.etcd_version }} ARG ETCD_VERSION={{ context.dynamo.etcd_version }}
ARG ENABLE_MEDIA_NIXL={{ context[framework].enable_media_nixl }}
ARG ENABLE_MEDIA_FFMPEG={{ context[framework].enable_media_ffmpeg }} ARG ENABLE_MEDIA_FFMPEG={{ context[framework].enable_media_ffmpeg }}
ARG FFMPEG_VERSION={{ context.dynamo.ffmpeg_version }} ARG FFMPEG_VERSION={{ context.dynamo.ffmpeg_version }}
ARG ENABLE_GPU_MEMORY_SERVICE={{ context[framework].enable_gpu_memory_service }} ARG ENABLE_GPU_MEMORY_SERVICE={{ context[framework].enable_gpu_memory_service }}
......
...@@ -347,15 +347,8 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ ...@@ -347,15 +347,8 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \
cd /opt/dynamo && \ cd /opt/dynamo && \
uv build --wheel --out-dir /opt/dynamo/dist && \ uv build --wheel --out-dir /opt/dynamo/dist && \
cd /opt/dynamo/lib/bindings/python && \ cd /opt/dynamo/lib/bindings/python && \
FEATURES=""; \
if [ "$ENABLE_MEDIA_NIXL" = "true" ]; then \
FEATURES="$FEATURES dynamo-llm/media-nixl"; \
fi; \
if [ "$ENABLE_MEDIA_FFMPEG" = "true" ]; then \ if [ "$ENABLE_MEDIA_FFMPEG" = "true" ]; then \
FEATURES="$FEATURES media-ffmpeg"; \ maturin build --release --features "media-ffmpeg" --out /opt/dynamo/dist; \
fi; \
if [ -n "$FEATURES" ]; then \
maturin build --release --features "$FEATURES" --out /opt/dynamo/dist; \
else \ else \
maturin build --release --out /opt/dynamo/dist; \ maturin build --release --out /opt/dynamo/dist; \
fi && \ fi && \
......
...@@ -13,7 +13,7 @@ readme.workspace = true ...@@ -13,7 +13,7 @@ readme.workspace = true
description = "Dynamo LLM Library" description = "Dynamo LLM Library"
[features] [features]
default = ["media-nixl", "block-manager"] default = ["block-manager"]
# todo(ops): get this working in CI as a default. # todo(ops): get this working in CI as a default.
# default = ["block-manager", "testing-full"] # default = ["block-manager", "testing-full"]
...@@ -25,8 +25,7 @@ block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"] ...@@ -25,8 +25,7 @@ block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"]
block-manager-bench = ["block-manager", "testing-full", "dep:clap", "dep:indicatif"] block-manager-bench = ["block-manager", "testing-full", "dep:clap", "dep:indicatif"]
cuda = ["dep:cudarc"] cuda = ["dep:cudarc"]
integration = ["dynamo-runtime/integration"] integration = ["dynamo-runtime/integration"]
media-nixl = ["dep:nixl-sys", "dep:flate2"] media-ffmpeg = ["dep:video-rs", "dep:ffmpeg-next", "dep:memfile"]
media-ffmpeg = ["dep:video-rs", "dep:ffmpeg-next", "dep:memfile", "media-nixl"]
bench = ["dynamo-kv-router/bench"] bench = ["dynamo-kv-router/bench"]
kv-router-stress = ["dep:clap", "dep:indicatif", "bench"] kv-router-stress = ["dep:clap", "dep:indicatif", "bench"]
...@@ -117,8 +116,8 @@ nixl-sys = { version = "=0.9.0", optional = true } ...@@ -117,8 +116,8 @@ nixl-sys = { version = "=0.9.0", optional = true }
cudarc = { workspace = true, optional = true } cudarc = { workspace = true, optional = true }
nix = { version = "0.26", optional = true } nix = { version = "0.26", optional = true }
# media-nixl (zlib compression for NIXL metadata) # media (zlib compression for NIXL metadata)
flate2 = { version = "1", optional = true } flate2 = { version = "1" }
# block_manager_bench # block_manager_bench
clap = { version = "4.5.49", features = ["derive"], optional = true } clap = { version = "4.5.49", features = ["derive"], optional = true }
......
...@@ -28,7 +28,6 @@ use std::{collections::HashMap, pin::Pin, sync::Arc}; ...@@ -28,7 +28,6 @@ use std::{collections::HashMap, pin::Pin, sync::Arc};
use tracing; use tracing;
use crate::model_card::{ModelDeploymentCard, ModelInfo}; use crate::model_card::{ModelDeploymentCard, ModelInfo};
#[cfg(feature = "media-nixl")]
use crate::preprocessor::media::MediaLoader; use crate::preprocessor::media::MediaLoader;
use crate::preprocessor::prompt::OAIChatLikeRequest; use crate::preprocessor::prompt::OAIChatLikeRequest;
use crate::protocols::common::preprocessor::{ use crate::protocols::common::preprocessor::{
...@@ -141,7 +140,6 @@ pub struct OpenAIPreprocessor { ...@@ -141,7 +140,6 @@ pub struct OpenAIPreprocessor {
/// Per-model runtime configuration propagated to response generator (e.g., reasoning/tool parser) /// Per-model runtime configuration propagated to response generator (e.g., reasoning/tool parser)
runtime_config: crate::local_model::runtime_config::ModelRuntimeConfig, runtime_config: crate::local_model::runtime_config::ModelRuntimeConfig,
tool_call_parser: Option<String>, tool_call_parser: Option<String>,
#[cfg(feature = "media-nixl")]
media_loader: Option<MediaLoader>, media_loader: Option<MediaLoader>,
} }
...@@ -177,7 +175,6 @@ impl OpenAIPreprocessor { ...@@ -177,7 +175,6 @@ impl OpenAIPreprocessor {
// // Initialize runtime config from the ModelDeploymentCard // // Initialize runtime config from the ModelDeploymentCard
let runtime_config = mdc.runtime_config.clone(); let runtime_config = mdc.runtime_config.clone();
#[cfg(feature = "media-nixl")]
let media_loader = match mdc.media_decoder { let media_loader = match mdc.media_decoder {
Some(media_decoder) => Some(MediaLoader::new(media_decoder, mdc.media_fetcher)?), Some(media_decoder) => Some(MediaLoader::new(media_decoder, mdc.media_fetcher)?),
None => None, None => None,
...@@ -191,7 +188,6 @@ impl OpenAIPreprocessor { ...@@ -191,7 +188,6 @@ impl OpenAIPreprocessor {
lora_name, lora_name,
runtime_config, runtime_config,
tool_call_parser, tool_call_parser,
#[cfg(feature = "media-nixl")]
media_loader, media_loader,
})) }))
} }
...@@ -336,7 +332,6 @@ impl OpenAIPreprocessor { ...@@ -336,7 +332,6 @@ impl OpenAIPreprocessor {
builder: &mut PreprocessedRequestBuilder, builder: &mut PreprocessedRequestBuilder,
) -> Result<()> { ) -> Result<()> {
let mut media_map: MultimodalDataMap = HashMap::new(); let mut media_map: MultimodalDataMap = HashMap::new();
#[cfg(feature = "media-nixl")]
let mut fetch_tasks: Vec<(String, ChatCompletionRequestUserMessageContentPart)> = let mut fetch_tasks: Vec<(String, ChatCompletionRequestUserMessageContentPart)> =
Vec::new(); Vec::new();
...@@ -366,7 +361,6 @@ impl OpenAIPreprocessor { ...@@ -366,7 +361,6 @@ impl OpenAIPreprocessor {
_ => continue, _ => continue,
}; };
#[cfg(feature = "media-nixl")]
if self.media_loader.is_some() { if self.media_loader.is_some() {
fetch_tasks.push((type_str, content_part.clone())); fetch_tasks.push((type_str, content_part.clone()));
continue; continue;
...@@ -381,7 +375,6 @@ impl OpenAIPreprocessor { ...@@ -381,7 +375,6 @@ impl OpenAIPreprocessor {
} }
// Execute all fetch tasks // Execute all fetch tasks
#[cfg(feature = "media-nixl")]
if !fetch_tasks.is_empty() { if !fetch_tasks.is_empty() {
let loader = self.media_loader.as_ref().unwrap(); let loader = self.media_loader.as_ref().unwrap();
let media_io_kwargs = request.media_io_kwargs(); let media_io_kwargs = request.media_io_kwargs();
......
...@@ -10,6 +10,4 @@ pub use common::EncodedMediaData; ...@@ -10,6 +10,4 @@ pub use common::EncodedMediaData;
pub use decoders::{Decoder, ImageDecoder, MediaDecoder}; pub use decoders::{Decoder, ImageDecoder, MediaDecoder};
pub use loader::{MediaFetcher, MediaLoader}; pub use loader::{MediaFetcher, MediaLoader};
pub use rdma::{DecodedMediaData, RdmaMediaDataDescriptor}; pub use rdma::{DecodedMediaData, RdmaMediaDataDescriptor, get_nixl_agent, get_nixl_metadata};
#[cfg(feature = "media-nixl")]
pub use rdma::{get_nixl_agent, get_nixl_metadata};
...@@ -44,7 +44,7 @@ register_llm( ...@@ -44,7 +44,7 @@ register_llm(
## Known Limitations ## Known Limitations
> [!WARNING] > [!WARNING]
> **Incompatible with `Dockerfile.frontend`**: Frontend media decoding (enabled with `--features media-nixl`) is not supported when using `Dockerfile.frontend`. The frontend image built from `Dockerfile.frontend` does not enable the feature + include the required NIXL/UCX dependencies. > **Incompatible with `Dockerfile.frontend`**: Frontend media decoding is not supported when using `Dockerfile.frontend`. The frontend image built from `Dockerfile.frontend` does not include the required NIXL/UCX dependencies.
> [!WARNING] > [!WARNING]
> **Requires GPU node**: The frontend must run on a node with GPU access. During media processing, decoded tensors are written to GPU memory via NIXL, which requires `libcuda.so.1` to be available. Running the frontend on a CPU-only node will fail with something like: `Failed to initialize required backends: [UCX: No UCX plugin found]`. > **Requires GPU node**: The frontend must run on a node with GPU access. During media processing, decoded tensors are written to GPU memory via NIXL, which requires `libcuda.so.1` to be available. Running the frontend on a CPU-only node will fail with something like: `Failed to initialize required backends: [UCX: No UCX plugin found]`.
......
...@@ -7,15 +7,11 @@ use std::time::Duration; ...@@ -7,15 +7,11 @@ use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use dynamo_async_openai::types::ChatCompletionRequestUserMessageContentPart; use dynamo_async_openai::types::ChatCompletionRequestUserMessageContentPart;
use dynamo_memory::nixl::NixlAgent;
use super::decoders::MediaDecoder; use super::common::EncodedMediaData;
use super::rdma::RdmaMediaDataDescriptor; use super::decoders::{Decoder, MediaDecoder};
use super::rdma::{RdmaMediaDataDescriptor, get_nixl_agent};
#[cfg(feature = "media-nixl")]
use {
super::common::EncodedMediaData, super::decoders::Decoder, super::rdma::get_nixl_agent,
dynamo_memory::nixl::NixlAgent,
};
const DEFAULT_HTTP_USER_AGENT: &str = "dynamo-ai/dynamo"; const DEFAULT_HTTP_USER_AGENT: &str = "dynamo-ai/dynamo";
const DEFAULT_HTTP_TIMEOUT: Duration = Duration::from_secs(30); const DEFAULT_HTTP_TIMEOUT: Duration = Duration::from_secs(30);
...@@ -75,7 +71,6 @@ pub struct MediaLoader { ...@@ -75,7 +71,6 @@ pub struct MediaLoader {
http_client: reqwest::Client, http_client: reqwest::Client,
#[allow(dead_code)] #[allow(dead_code)]
media_fetcher: MediaFetcher, media_fetcher: MediaFetcher,
#[cfg(feature = "media-nixl")]
nixl_agent: NixlAgent, nixl_agent: NixlAgent,
} }
...@@ -91,14 +86,12 @@ impl MediaLoader { ...@@ -91,14 +86,12 @@ impl MediaLoader {
let http_client = http_client_builder.build()?; let http_client = http_client_builder.build()?;
#[cfg(feature = "media-nixl")]
let nixl_agent = get_nixl_agent()?; let nixl_agent = get_nixl_agent()?;
Ok(Self { Ok(Self {
media_decoder, media_decoder,
http_client, http_client,
media_fetcher, media_fetcher,
#[cfg(feature = "media-nixl")]
nixl_agent, nixl_agent,
}) })
} }
...@@ -108,66 +101,58 @@ impl MediaLoader { ...@@ -108,66 +101,58 @@ impl MediaLoader {
oai_content_part: &ChatCompletionRequestUserMessageContentPart, oai_content_part: &ChatCompletionRequestUserMessageContentPart,
media_io_kwargs: Option<&MediaDecoder>, media_io_kwargs: Option<&MediaDecoder>,
) -> Result<RdmaMediaDataDescriptor> { ) -> Result<RdmaMediaDataDescriptor> {
#[cfg(not(feature = "media-nixl"))] // fetch the media, decode and NIXL-register
anyhow::bail!( let decoded = match oai_content_part {
"NIXL is not supported, cannot decode and register media data {oai_content_part:?} with media_io_kwargs {media_io_kwargs:?}" ChatCompletionRequestUserMessageContentPart::ImageUrl(image_part) => {
); let mdc_decoder = self
.media_decoder
.image
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Model does not support image inputs"))?;
let url = &image_part.image_url.url;
self.media_fetcher.check_if_url_allowed(url)?;
let data = EncodedMediaData::from_url(url, &self.http_client).await?;
// Use runtime decoder if provided, with MDC limits enforced
let decoder =
mdc_decoder.with_runtime(media_io_kwargs.and_then(|k| k.image.as_ref()));
decoder.decode_async(data).await?
}
#[allow(unused_variables)]
ChatCompletionRequestUserMessageContentPart::VideoUrl(video_part) => {
#[cfg(not(feature = "media-ffmpeg"))]
anyhow::bail!("Video decoding requires the 'media-ffmpeg' feature to be enabled");
#[cfg(feature = "media-nixl")] #[cfg(feature = "media-ffmpeg")]
{ {
// fetch the media, decode and NIXL-register
let decoded = match oai_content_part {
ChatCompletionRequestUserMessageContentPart::ImageUrl(image_part) => {
let mdc_decoder = let mdc_decoder =
self.media_decoder.image.as_ref().ok_or_else(|| { self.media_decoder.video.as_ref().ok_or_else(|| {
anyhow::anyhow!("Model does not support image inputs") anyhow::anyhow!("Model does not support video inputs")
})?; })?;
let url = &image_part.image_url.url; let url = &video_part.video_url.url;
self.media_fetcher.check_if_url_allowed(url)?; self.media_fetcher.check_if_url_allowed(url)?;
let data = EncodedMediaData::from_url(url, &self.http_client).await?; let data = EncodedMediaData::from_url(url, &self.http_client).await?;
// Use runtime decoder if provided, with MDC limits enforced // Use runtime decoder if provided, with MDC limits enforced
let decoder = let decoder =
mdc_decoder.with_runtime(media_io_kwargs.and_then(|k| k.image.as_ref())); mdc_decoder.with_runtime(media_io_kwargs.and_then(|k| k.video.as_ref()));
decoder.decode_async(data).await? decoder.decode_async(data).await?
} }
#[allow(unused_variables)] }
ChatCompletionRequestUserMessageContentPart::VideoUrl(video_part) => { ChatCompletionRequestUserMessageContentPart::AudioUrl(_) => {
#[cfg(not(feature = "media-ffmpeg"))] anyhow::bail!("Audio decoding is not supported yet");
anyhow::bail!( }
"Video decoding requires the 'media-ffmpeg' feature to be enabled" _ => anyhow::bail!("Unsupported media type"),
); };
#[cfg(feature = "media-ffmpeg")]
{
let mdc_decoder = self.media_decoder.video.as_ref().ok_or_else(|| {
anyhow::anyhow!("Model does not support video inputs")
})?;
let url = &video_part.video_url.url;
self.media_fetcher.check_if_url_allowed(url)?;
let data = EncodedMediaData::from_url(url, &self.http_client).await?;
// Use runtime decoder if provided, with MDC limits enforced
let decoder = mdc_decoder
.with_runtime(media_io_kwargs.and_then(|k| k.video.as_ref()));
decoder.decode_async(data).await?
}
}
ChatCompletionRequestUserMessageContentPart::AudioUrl(_) => {
anyhow::bail!("Audio decoding is not supported yet");
}
_ => anyhow::bail!("Unsupported media type"),
};
let rdma_descriptor = decoded.into_rdma_descriptor(&self.nixl_agent)?; let rdma_descriptor = decoded.into_rdma_descriptor(&self.nixl_agent)?;
Ok(rdma_descriptor) Ok(rdma_descriptor)
}
} }
} }
#[cfg(all(test, feature = "media-nixl", feature = "testing-nixl"))] #[cfg(all(test, feature = "testing-nixl"))]
mod tests { mod tests {
use super::super::decoders::ImageDecoder; use super::super::decoders::ImageDecoder;
use super::super::rdma::DataType; use super::super::rdma::DataType;
......
...@@ -2,18 +2,14 @@ ...@@ -2,18 +2,14 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use anyhow::Result; use anyhow::Result;
use base64::{Engine as _, engine::general_purpose};
use dynamo_memory::SystemStorage;
use dynamo_memory::nixl::{self, NixlAgent, NixlDescriptor, RegisteredView};
use flate2::{Compression, write::ZlibEncoder};
use ndarray::{ArrayBase, Dimension, OwnedRepr}; use ndarray::{ArrayBase, Dimension, OwnedRepr};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::io::Write;
#[cfg(feature = "media-nixl")] use std::sync::Arc;
use {
base64::{Engine as _, engine::general_purpose},
dynamo_memory::SystemStorage,
dynamo_memory::nixl::{self, NixlAgent, NixlDescriptor, RegisteredView},
flate2::{Compression, write::ZlibEncoder},
std::io::Write,
std::sync::Arc,
};
use super::decoders::DecodedMediaMetadata; use super::decoders::DecodedMediaMetadata;
...@@ -33,7 +29,6 @@ pub struct MediaTensorInfo { ...@@ -33,7 +29,6 @@ pub struct MediaTensorInfo {
// Decoded media data (image RGB, video frames pixels, ...) // Decoded media data (image RGB, video frames pixels, ...)
#[derive(Debug)] #[derive(Debug)]
pub struct DecodedMediaData { pub struct DecodedMediaData {
#[cfg(feature = "media-nixl")]
pub(crate) data: SystemStorage, pub(crate) data: SystemStorage,
pub(crate) tensor_info: MediaTensorInfo, pub(crate) tensor_info: MediaTensorInfo,
} }
...@@ -43,10 +38,8 @@ pub struct DecodedMediaData { ...@@ -43,10 +38,8 @@ pub struct DecodedMediaData {
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RdmaMediaDataDescriptor { pub struct RdmaMediaDataDescriptor {
// b64 agent metadata // b64 agent metadata
#[cfg(feature = "media-nixl")]
pub(crate) nixl_metadata: String, pub(crate) nixl_metadata: String,
// tensor descriptor // tensor descriptor
#[cfg(feature = "media-nixl")]
pub(crate) nixl_descriptor: NixlDescriptor, pub(crate) nixl_descriptor: NixlDescriptor,
#[serde(flatten)] #[serde(flatten)]
...@@ -55,12 +48,10 @@ pub struct RdmaMediaDataDescriptor { ...@@ -55,12 +48,10 @@ pub struct RdmaMediaDataDescriptor {
// reference to the actual data, kept alive while the rdma descriptor is alive // reference to the actual data, kept alive while the rdma descriptor is alive
#[serde(skip, default)] #[serde(skip, default)]
#[allow(dead_code)] #[allow(dead_code)]
#[cfg(feature = "media-nixl")]
pub(crate) source_storage: Option<Arc<nixl::NixlRegistered<SystemStorage>>>, pub(crate) source_storage: Option<Arc<nixl::NixlRegistered<SystemStorage>>>,
} }
impl DecodedMediaData { impl DecodedMediaData {
#[cfg(feature = "media-nixl")]
pub fn into_rdma_descriptor(self, nixl_agent: &NixlAgent) -> Result<RdmaMediaDataDescriptor> { pub fn into_rdma_descriptor(self, nixl_agent: &NixlAgent) -> Result<RdmaMediaDataDescriptor> {
let source_storage = self.data; let source_storage = self.data;
let registered = nixl::register_with_nixl(source_storage, nixl_agent, None) let registered = nixl::register_with_nixl(source_storage, nixl_agent, None)
...@@ -88,17 +79,13 @@ impl<D: Dimension> TryFrom<ArrayBase<OwnedRepr<u8>, D>> for DecodedMediaData { ...@@ -88,17 +79,13 @@ impl<D: Dimension> TryFrom<ArrayBase<OwnedRepr<u8>, D>> for DecodedMediaData {
fn try_from(array: ArrayBase<OwnedRepr<u8>, D>) -> Result<Self, Self::Error> { fn try_from(array: ArrayBase<OwnedRepr<u8>, D>) -> Result<Self, Self::Error> {
let shape = array.shape().to_vec(); let shape = array.shape().to_vec();
#[cfg(feature = "media-nixl")]
let (data_vec, _) = array.into_raw_vec_and_offset(); let (data_vec, _) = array.into_raw_vec_and_offset();
#[cfg(feature = "media-nixl")]
let mut storage = SystemStorage::new(data_vec.len())?; let mut storage = SystemStorage::new(data_vec.len())?;
#[cfg(feature = "media-nixl")]
unsafe { unsafe {
std::ptr::copy_nonoverlapping(data_vec.as_ptr(), storage.as_mut_ptr(), data_vec.len()); std::ptr::copy_nonoverlapping(data_vec.as_ptr(), storage.as_mut_ptr(), data_vec.len());
} }
Ok(Self { Ok(Self {
#[cfg(feature = "media-nixl")]
data: storage, data: storage,
tensor_info: MediaTensorInfo { tensor_info: MediaTensorInfo {
shape, shape,
...@@ -113,7 +100,6 @@ impl<D: Dimension> TryFrom<ArrayBase<OwnedRepr<u8>, D>> for DecodedMediaData { ...@@ -113,7 +100,6 @@ impl<D: Dimension> TryFrom<ArrayBase<OwnedRepr<u8>, D>> for DecodedMediaData {
// Returns zlib-compressed, base64-encoded metadata in format: "b64:<compressed_base64>" // Returns zlib-compressed, base64-encoded metadata in format: "b64:<compressed_base64>"
// This format matches what Python nixl_connect expects for RdmaMetadata.nixl_metadata // This format matches what Python nixl_connect expects for RdmaMetadata.nixl_metadata
// TODO: pre-allocate a fixed NIXL-registered RAM pool so metadata can be cached on the target? // TODO: pre-allocate a fixed NIXL-registered RAM pool so metadata can be cached on the target?
#[cfg(feature = "media-nixl")]
pub fn get_nixl_metadata(agent: &NixlAgent, _storage: &SystemStorage) -> Result<String> { pub fn get_nixl_metadata(agent: &NixlAgent, _storage: &SystemStorage) -> Result<String> {
// WAR: Until https://github.com/ai-dynamo/nixl/pull/970 is merged, can't use get_local_partial_md // WAR: Until https://github.com/ai-dynamo/nixl/pull/970 is merged, can't use get_local_partial_md
let nixl_md = agent.raw_agent().get_local_md()?; let nixl_md = agent.raw_agent().get_local_md()?;
...@@ -130,7 +116,6 @@ pub fn get_nixl_metadata(agent: &NixlAgent, _storage: &SystemStorage) -> Result< ...@@ -130,7 +116,6 @@ pub fn get_nixl_metadata(agent: &NixlAgent, _storage: &SystemStorage) -> Result<
Ok(format!("b64:{}", b64_encoded)) Ok(format!("b64:{}", b64_encoded))
} }
#[cfg(feature = "media-nixl")]
pub fn get_nixl_agent() -> Result<NixlAgent> { pub fn get_nixl_agent() -> Result<NixlAgent> {
let name = format!("media-loader-{}", uuid::Uuid::new_v4()); let name = format!("media-loader-{}", uuid::Uuid::new_v4());
let nixl_agent = NixlAgent::with_backends(&name, &["UCX"])?; let nixl_agent = NixlAgent::with_backends(&name, &["UCX"])?;
......
...@@ -9,7 +9,6 @@ use serde::{Deserialize, Serialize}; ...@@ -9,7 +9,6 @@ use serde::{Deserialize, Serialize};
use super::timing::RequestTracker; use super::timing::RequestTracker;
use super::{OutputOptions, SamplingOptions, StopConditions}; use super::{OutputOptions, SamplingOptions, StopConditions};
use crate::kv_router::RouterConfigOverride; use crate::kv_router::RouterConfigOverride;
#[cfg(feature = "media-nixl")]
use crate::preprocessor::media::RdmaMediaDataDescriptor; use crate::preprocessor::media::RdmaMediaDataDescriptor;
use crate::protocols::TokenIdType; use crate::protocols::TokenIdType;
...@@ -78,7 +77,6 @@ pub struct PrefillResult { ...@@ -78,7 +77,6 @@ pub struct PrefillResult {
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum MultimodalData { pub enum MultimodalData {
Url(url::Url), Url(url::Url),
#[cfg(feature = "media-nixl")]
Decoded(RdmaMediaDataDescriptor), Decoded(RdmaMediaDataDescriptor),
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment