Unverified Commit 7a5a0bd6 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Upgrade Rust to 1.90 (#3147)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 406c4d4e
...@@ -91,13 +91,13 @@ rust-base: ...@@ -91,13 +91,13 @@ rust-base:
ENV RUSTUP_HOME=/usr/local/rustup ENV RUSTUP_HOME=/usr/local/rustup
ENV CARGO_HOME=/usr/local/cargo ENV CARGO_HOME=/usr/local/cargo
ENV PATH=/usr/local/cargo/bin:$PATH ENV PATH=/usr/local/cargo/bin:$PATH
ENV RUST_VERSION=1.89.0 ENV RUST_VERSION=1.90.0
ENV RUSTARCH=x86_64-unknown-linux-gnu ENV RUSTARCH=x86_64-unknown-linux-gnu
RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/x86_64-unknown-linux-gnu/rustup-init" && \ RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/x86_64-unknown-linux-gnu/rustup-init" && \
echo "a3339fb004c3d0bb9862ba0bce001861fe5cbde9c10d16591eb3f39ee6cd3e7f *rustup-init" | sha256sum -c - && \ echo "a3339fb004c3d0bb9862ba0bce001861fe5cbde9c10d16591eb3f39ee6cd3e7f *rustup-init" | sha256sum -c - && \
chmod +x rustup-init && \ chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain 1.89.0 --default-host x86_64-unknown-linux-gnu && \ ./rustup-init -y --no-modify-path --profile minimal --default-toolchain 1.90.0 --default-host x86_64-unknown-linux-gnu && \
rm rustup-init && \ rm rustup-init && \
chmod -R a+w $RUSTUP_HOME $CARGO_HOME chmod -R a+w $RUSTUP_HOME $CARGO_HOME
......
...@@ -93,7 +93,7 @@ ENV SCCACHE_BUCKET=${USE_SCCACHE:+${SCCACHE_BUCKET}} \ ...@@ -93,7 +93,7 @@ ENV SCCACHE_BUCKET=${USE_SCCACHE:+${SCCACHE_BUCKET}} \
ENV RUSTUP_HOME=/usr/local/rustup \ ENV RUSTUP_HOME=/usr/local/rustup \
CARGO_HOME=/usr/local/cargo \ CARGO_HOME=/usr/local/cargo \
PATH=/usr/local/cargo/bin:$PATH \ PATH=/usr/local/cargo/bin:$PATH \
RUST_VERSION=1.89.0 RUST_VERSION=1.90.0
# Define Rust target based on ARCH_ALT ARG # Define Rust target based on ARCH_ALT ARG
ARG RUSTARCH=${ARCH_ALT}-unknown-linux-gnu ARG RUSTARCH=${ARCH_ALT}-unknown-linux-gnu
...@@ -367,4 +367,4 @@ RUN --mount=type=bind,source=./container/launch_message.txt,target=/opt/dynamo/l ...@@ -367,4 +367,4 @@ RUN --mount=type=bind,source=./container/launch_message.txt,target=/opt/dynamo/l
echo "cat ~/.launch_screen" >> ~/.bashrc echo "cat ~/.launch_screen" >> ~/.bashrc
ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"] ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"]
CMD [] CMD []
\ No newline at end of file
...@@ -11,7 +11,7 @@ ARG ARCH_ALT="x86_64" ...@@ -11,7 +11,7 @@ ARG ARCH_ALT="x86_64"
ARG NIXL_UCX_REF="v1.19.0" ARG NIXL_UCX_REF="v1.19.0"
ARG NIXL_TAG="0.5.0" ARG NIXL_TAG="0.5.0"
ARG CMAKE_VERSION="3.31.8" ARG CMAKE_VERSION="3.31.8"
ARG RUST_VERSION="1.89.0" ARG RUST_VERSION="1.90.0"
ARG CARGO_BUILD_JOBS="16" ARG CARGO_BUILD_JOBS="16"
RUN apt-get update -y && \ RUN apt-get update -y && \
......
...@@ -41,7 +41,7 @@ ARG RUSTARCH=${ARCH_ALT}-unknown-linux-gnu ...@@ -41,7 +41,7 @@ ARG RUSTARCH=${ARCH_ALT}-unknown-linux-gnu
ENV RUSTUP_HOME=/usr/local/rustup \ ENV RUSTUP_HOME=/usr/local/rustup \
CARGO_HOME=/usr/local/cargo \ CARGO_HOME=/usr/local/cargo \
PATH=/usr/local/cargo/bin:$PATH \ PATH=/usr/local/cargo/bin:$PATH \
RUST_VERSION=1.89.0 RUST_VERSION=1.90.0
# Install Rust using RUSTARCH derived from ARCH_ALT # Install Rust using RUSTARCH derived from ARCH_ALT
RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/${RUSTARCH}/rustup-init" && \ RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/${RUSTARCH}/rustup-init" && \
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
[package] [package]
name = "dynamo-py3" name = "dynamo-py3"
version = "0.5.0" version = "0.5.0"
edition = "2021" edition = "2024"
authors = ["NVIDIA"] authors = ["NVIDIA"]
license = "Apache-2.0" license = "Apache-2.0"
homepage = "https://github.com/ai-dynamo/dynamo" homepage = "https://github.com/ai-dynamo/dynamo"
......
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
// Context is a wrapper around the AsyncEngineContext to allow for Python bindings. // Context is a wrapper around the AsyncEngineContext to allow for Python bindings.
use dynamo_runtime::pipeline::context::Controller;
pub use dynamo_runtime::pipeline::AsyncEngineContext; pub use dynamo_runtime::pipeline::AsyncEngineContext;
use dynamo_runtime::pipeline::context::Controller;
use pyo3::prelude::*; use pyo3::prelude::*;
use std::sync::Arc; use std::sync::Arc;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use super::context::{callable_accepts_kwarg, Context}; use super::context::{Context, callable_accepts_kwarg};
use pyo3::prelude::*; use pyo3::prelude::*;
use pyo3::types::{PyDict, PyModule}; use pyo3::types::{PyDict, PyModule};
use pyo3::{PyAny, PyErr}; use pyo3::{PyAny, PyErr};
...@@ -9,15 +9,15 @@ use pyo3_async_runtimes::TaskLocals; ...@@ -9,15 +9,15 @@ use pyo3_async_runtimes::TaskLocals;
use pythonize::{depythonize, pythonize}; use pythonize::{depythonize, pythonize};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_stream::{StreamExt, wrappers::ReceiverStream};
pub use dynamo_runtime::{ pub use dynamo_runtime::{
CancellationToken, Error, Result,
pipeline::{ pipeline::{
async_trait, AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn,
SingleIn, async_trait,
}, },
protocols::annotated::Annotated, protocols::annotated::Annotated,
CancellationToken, Error, Result,
}; };
pub use serde::{Deserialize, Serialize}; pub use serde::{Deserialize, Serialize};
...@@ -180,7 +180,7 @@ where ...@@ -180,7 +180,7 @@ where
let py_request = pythonize(py, &request)?; let py_request = pythonize(py, &request)?;
let py_ctx = Py::new(py, Context::new(ctx_python.clone()))?; let py_ctx = Py::new(py, Context::new(ctx_python.clone()))?;
let gen = if has_context { let gen_result = if has_context {
// Pass context as a kwarg // Pass context as a kwarg
let kwarg = PyDict::new(py); let kwarg = PyDict::new(py);
kwarg.set_item("context", &py_ctx)?; kwarg.set_item("context", &py_ctx)?;
...@@ -191,7 +191,10 @@ where ...@@ -191,7 +191,10 @@ where
}?; }?;
let locals = TaskLocals::new(event_loop.bind(py).clone()); let locals = TaskLocals::new(event_loop.bind(py).clone());
pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py)) pyo3_async_runtimes::tokio::into_stream_with_locals_v1(
locals,
gen_result.into_bound(py),
)
}) })
}) })
.await??; .await??;
...@@ -234,18 +237,27 @@ where ...@@ -234,18 +237,27 @@ where
// right now, this is impossible as we are not passing the context to the python async generator // right now, this is impossible as we are not passing the context to the python async generator
// todo: add task-local context to the python async generator // todo: add task-local context to the python async generator
ctx.stop_generating(); ctx.stop_generating();
let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e); let msg = format!(
"critical error: invalid response object from python async generator; application-logic-mismatch: {}",
e
);
msg msg
} }
ResponseProcessingError::PyGeneratorExit(_) => { ResponseProcessingError::PyGeneratorExit(_) => {
"Stream ended before generation completed".to_string() "Stream ended before generation completed".to_string()
} }
ResponseProcessingError::PythonException(e) => { ResponseProcessingError::PythonException(e) => {
let msg = format!("a python exception was caught while processing the async generator: {}", e); let msg = format!(
"a python exception was caught while processing the async generator: {}",
e
);
msg msg
} }
ResponseProcessingError::OffloadError(e) => { ResponseProcessingError::OffloadError(e) => {
let msg = format!("critical error: failed to offload the python async generator to a new thread: {}", e); let msg = format!(
"critical error: failed to offload the python async generator to a new thread: {}",
e
);
msg msg
} }
}; };
......
...@@ -5,15 +5,14 @@ use std::sync::Arc; ...@@ -5,15 +5,14 @@ use std::sync::Arc;
use pyo3::{exceptions::PyException, prelude::*}; use pyo3::{exceptions::PyException, prelude::*};
use crate::{engine::*, to_pyerr, CancellationToken}; use crate::{CancellationToken, engine::*, to_pyerr};
pub use dynamo_llm::endpoint_type::EndpointType; pub use dynamo_llm::endpoint_type::EndpointType;
pub use dynamo_llm::http::service::{error as http_error, service_v2}; pub use dynamo_llm::http::service::{error as http_error, service_v2};
pub use dynamo_runtime::{ pub use dynamo_runtime::{
error, Error, Result, error,
pipeline::{async_trait, AsyncEngine, Data, ManyOut, SingleIn}, pipeline::{AsyncEngine, Data, ManyOut, SingleIn, async_trait},
protocols::annotated::Annotated, protocols::annotated::Annotated,
Error, Result,
}; };
#[pyclass] #[pyclass]
......
...@@ -3,10 +3,10 @@ ...@@ -3,10 +3,10 @@
use futures::StreamExt; use futures::StreamExt;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use pyo3::IntoPyObjectExt;
use pyo3::exceptions::PyStopAsyncIteration; use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::types::PyBytes; use pyo3::types::PyBytes;
use pyo3::types::{PyDict, PyList, PyString}; use pyo3::types::{PyDict, PyList, PyString};
use pyo3::IntoPyObjectExt;
use pyo3::{exceptions::PyException, prelude::*}; use pyo3::{exceptions::PyException, prelude::*};
use rand::seq::IteratorRandom as _; use rand::seq::IteratorRandom as _;
use rs::pipeline::network::Ingress; use rs::pipeline::network::Ingress;
...@@ -19,8 +19,8 @@ use tokio::sync::Mutex; ...@@ -19,8 +19,8 @@ use tokio::sync::Mutex;
use dynamo_runtime::{ use dynamo_runtime::{
self as rs, logging, self as rs, logging,
pipeline::{ pipeline::{
context::Context as RsContext, network::egress::push_router::RouterMode as RsRouterMode, EngineStream, ManyOut, SingleIn, context::Context as RsContext,
EngineStream, ManyOut, SingleIn, network::egress::push_router::RouterMode as RsRouterMode,
}, },
protocols::annotated::Annotated as RsAnnotated, protocols::annotated::Annotated as RsAnnotated,
traits::DistributedRuntimeProvider, traits::DistributedRuntimeProvider,
...@@ -446,9 +446,9 @@ impl DistributedRuntime { ...@@ -446,9 +446,9 @@ impl DistributedRuntime {
Ok(sock) => sockets.push(sock), Ok(sock) => sockets.push(sock),
Err(e) => { Err(e) => {
tracing::error!( tracing::error!(
"Failed to bind to port block starting at {start_port} (attempt {}): {e}", "Failed to bind to port block starting at {start_port} (attempt {}): {e}",
attempt_idx + 1, attempt_idx + 1,
); );
bind_failed = true; bind_failed = true;
break; break;
} }
...@@ -504,7 +504,8 @@ impl DistributedRuntime { ...@@ -504,7 +504,8 @@ impl DistributedRuntime {
} }
Err(PyErr::new::<PyException, _>(format!( Err(PyErr::new::<PyException, _>(format!(
"Failed to allocate and reserve a port block of size {block_size} from range {min}-{max} after {candidate_count} attempts"))) "Failed to allocate and reserve a port block of size {block_size} from range {min}-{max} after {candidate_count} attempts"
)))
}) })
} }
...@@ -735,12 +736,12 @@ impl Endpoint { ...@@ -735,12 +736,12 @@ impl Endpoint {
})?; })?;
// Require an object/dict // Require an object/dict
if let Some(ref payload) = health_payload_json { if let Some(ref payload) = health_payload_json
if !payload.is_object() { && !payload.is_object()
return Err(pyo3::exceptions::PyTypeError::new_err( {
"health_check_payload must be a JSON object (dict)", return Err(pyo3::exceptions::PyTypeError::new_err(
)); "health_check_payload must be a JSON object (dict)",
} ));
} }
let mut builder = self let mut builder = self
...@@ -1087,11 +1088,10 @@ async fn process_stream( ...@@ -1087,11 +1088,10 @@ async fn process_stream(
// Convert the response to a PyObject using Python's GIL // Convert the response to a PyObject using Python's GIL
let annotated: RsAnnotated<serde_json::Value> = response; let annotated: RsAnnotated<serde_json::Value> = response;
let annotated: RsAnnotated<PyObject> = annotated.map_data(|data| { let annotated: RsAnnotated<PyObject> = annotated.map_data(|data| {
let result = Python::with_gil(|py| match pythonize::pythonize(py, &data) { Python::with_gil(|py| match pythonize::pythonize(py, &data) {
Ok(pyobj) => Ok(pyobj.into()), Ok(pyobj) => Ok(pyobj.into()),
Err(e) => Err(e.to_string()), Err(e) => Err(e.to_string()),
}); })
result
}); });
let is_error = annotated.is_error(); let is_error = annotated.is_error();
......
...@@ -11,13 +11,13 @@ use pyo3::{prelude::*, wrap_pymodule}; ...@@ -11,13 +11,13 @@ use pyo3::{prelude::*, wrap_pymodule};
use dynamo_llm::{ use dynamo_llm::{
block_manager::{ block_manager::{
BasicMetadata, DeviceStorage, Storage,
block::{ block::{
BlockId, ImmutableBlock, MutableBlock,
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, data::logical::distributed_leader_worker::DistributedLeaderWorkerResources,
locality::{LocalityProvider, Logical}, locality::{LocalityProvider, Logical},
BlockId, ImmutableBlock, MutableBlock,
}, },
pool::{BlockPool, BlockPoolError}, pool::{BlockPool, BlockPoolError},
BasicMetadata, DeviceStorage, Storage,
}, },
tokens::{SaltHash, SequenceHash, TokenBlockSequence, Tokens}, tokens::{SaltHash, SequenceHash, TokenBlockSequence, Tokens},
}; };
...@@ -314,7 +314,19 @@ impl std::fmt::Debug for GenericSlotUpdate<String> { ...@@ -314,7 +314,19 @@ impl std::fmt::Debug for GenericSlotUpdate<String> {
format!("{:?}", self.tokens_to_append) format!("{:?}", self.tokens_to_append)
}; };
write!(f, "GenericSlotUpdate(request_id: {}, request_num_tokens: {}, request_num_computed_tokens: {}, tokens_to_append: {}, num_new_tokens: {}, num_new_computed_tokens: {:?}, new_computed_blocks: {:?}, num_lookahead_blocks: {:?}, delay_cache_blocks: {:?})", self.request_id, self.request_num_tokens, self.request_num_computed_tokens, tokens_display, self.num_new_tokens, self.num_new_computed_tokens, self.new_computed_blocks, self.num_lookahead_blocks, self.delay_cache_blocks) write!(
f,
"GenericSlotUpdate(request_id: {}, request_num_tokens: {}, request_num_computed_tokens: {}, tokens_to_append: {}, num_new_tokens: {}, num_new_computed_tokens: {:?}, new_computed_blocks: {:?}, num_lookahead_blocks: {:?}, delay_cache_blocks: {:?})",
self.request_id,
self.request_num_tokens,
self.request_num_computed_tokens,
tokens_display,
self.num_new_tokens,
self.num_new_computed_tokens,
self.new_computed_blocks,
self.num_lookahead_blocks,
self.delay_cache_blocks
)
} }
} }
...@@ -558,7 +570,8 @@ impl<R: RequestKey> SlotManager<R> { ...@@ -558,7 +570,8 @@ impl<R: RequestKey> SlotManager<R> {
let isl_host = slot.num_blocks_cached_from_host() * self.block_size; let isl_host = slot.num_blocks_cached_from_host() * self.block_size;
let isl_disk = slot.num_blocks_cached_from_disk() * self.block_size; let isl_disk = slot.num_blocks_cached_from_disk() * self.block_size;
tracing::info!( tracing::info!(
request_id, "request complete isl: {} - cache hits: device: {}, host: {}, disk: {} - prefilled: {}", request_id,
"request complete isl: {} - cache hits: device: {}, host: {}, disk: {} - prefilled: {}",
isl, isl,
isl_device, isl_device,
isl_host, isl_host,
......
...@@ -9,21 +9,21 @@ use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; ...@@ -9,21 +9,21 @@ use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use dynamo_runtime::DistributedRuntime; use dynamo_runtime::DistributedRuntime;
use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState}; use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState};
use crate::DistributedRuntime as PyDistributedRuntime;
use crate::llm::block_manager::BlockManagerBuilder; use crate::llm::block_manager::BlockManagerBuilder;
use crate::llm::block_manager::{ use crate::llm::block_manager::{
distributed::KvbmLeader as PyKvbmLeader, vllm::connector::leader::slot::VllmConnectorSlot, VllmBlockManager, distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest,
vllm::KvbmRequest, VllmBlockManager, vllm::connector::leader::slot::VllmConnectorSlot,
}; };
use crate::DistributedRuntime as PyDistributedRuntime;
use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use dynamo_llm::block_manager::{ use dynamo_llm::block_manager::{
BasicMetadata, DiskStorage, ImmutableBlock, PinnedStorage,
block::{ block::{
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, data::logical::distributed_leader_worker::DistributedLeaderWorkerResources,
locality::Logical, locality::Logical,
}, },
connector::*, connector::*,
BasicMetadata, DiskStorage, ImmutableBlock, PinnedStorage,
}; };
use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens}; use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens};
use std::sync::{Arc, OnceLock}; use std::sync::{Arc, OnceLock};
...@@ -218,7 +218,9 @@ impl Leader for KvConnectorLeader { ...@@ -218,7 +218,9 @@ impl Leader for KvConnectorLeader {
); );
if slot.state() == SlotState::SkippedPrefill || slot.state() == SlotState::SkippedDecode { if slot.state() == SlotState::SkippedPrefill || slot.state() == SlotState::SkippedDecode {
tracing::debug!("slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early"); tracing::debug!(
"slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early"
);
match slot.state() { match slot.state() {
SlotState::SkippedPrefill => { SlotState::SkippedPrefill => {
slot.mark_as_prefilling(self.iteration_counter)?; slot.mark_as_prefilling(self.iteration_counter)?;
......
...@@ -5,10 +5,10 @@ use std::{any::Any, sync::Arc}; ...@@ -5,10 +5,10 @@ use std::{any::Any, sync::Arc};
use dynamo_llm::{ use dynamo_llm::{
block_manager::{ block_manager::{
block::{locality::LocalityProvider, BlockMetadata}, Storage,
block::{BlockMetadata, locality::LocalityProvider},
connector::protocol::{LeaderTransferRequest, RequestType, TransferType}, connector::protocol::{LeaderTransferRequest, RequestType, TransferType},
distributed::{BlockTransferPool, BlockTransferRequest, KvbmLeader}, distributed::{BlockTransferPool, BlockTransferRequest, KvbmLeader},
Storage,
}, },
tokens::TokenBlock, tokens::TokenBlock,
}; };
...@@ -398,7 +398,11 @@ impl VllmConnectorSlot { ...@@ -398,7 +398,11 @@ impl VllmConnectorSlot {
SlotState::SkippedPrefill => Ok(()), // already skipped SlotState::SkippedPrefill => Ok(()), // already skipped
SlotState::SkippedDecode => Ok(()), // already skipped SlotState::SkippedDecode => Ok(()), // already skipped
_ => { _ => {
tracing::debug!("slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}", self.state, self.request_id); tracing::debug!(
"slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}",
self.state,
self.request_id
);
Ok(()) Ok(())
} }
} }
...@@ -594,7 +598,8 @@ impl Slot for VllmConnectorSlot { ...@@ -594,7 +598,8 @@ impl Slot for VllmConnectorSlot {
if computed_position < self.current_position { if computed_position < self.current_position {
tracing::debug!( tracing::debug!(
"computed_position={} < current_position={}, so we are onboarding during prefilling phase", "computed_position={} < current_position={}, so we are onboarding during prefilling phase",
computed_position, self.current_position computed_position,
self.current_position
); );
return Ok(()); return Ok(());
} }
...@@ -916,7 +921,8 @@ impl ExternallyManagedDeviceSlot for VllmConnectorSlot { ...@@ -916,7 +921,8 @@ impl ExternallyManagedDeviceSlot for VllmConnectorSlot {
if self.current_position + num_tokens > self.sequence().total_tokens() { if self.current_position + num_tokens > self.sequence().total_tokens() {
return Err(SlotError::InvalidOperation(format!( return Err(SlotError::InvalidOperation(format!(
"cannot advance computed position from {} by {num_tokens} tokens, total tokens is {}", "cannot advance computed position from {} by {num_tokens} tokens, total tokens is {}",
self.current_position, self.sequence().total_tokens() self.current_position,
self.sequence().total_tokens()
))); )));
} }
......
...@@ -3,12 +3,12 @@ ...@@ -3,12 +3,12 @@
use super::*; use super::*;
use crate::DistributedRuntime as PyDistributedRuntime;
use crate::llm::block_manager::BlockManagerBuilder;
use crate::llm::block_manager::vllm::connector::leader::slot::{ use crate::llm::block_manager::vllm::connector::leader::slot::{
ConnectorSlotManager, SlotManager, SlotState, ConnectorSlotManager, SlotManager, SlotState,
}; };
use crate::llm::block_manager::BlockManagerBuilder;
use crate::llm::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest}; use crate::llm::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest};
use crate::DistributedRuntime as PyDistributedRuntime;
use anyhow; use anyhow;
use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
......
...@@ -13,15 +13,15 @@ use super::*; ...@@ -13,15 +13,15 @@ use super::*;
use crate::llm::block_manager::distributed::get_barrier_id_prefix; use crate::llm::block_manager::distributed::get_barrier_id_prefix;
use crate::llm::block_manager::vllm::connector::worker::event_sync_blocking; use crate::llm::block_manager::vllm::connector::worker::event_sync_blocking;
use crate::{ use crate::{
llm::block_manager::distributed::VllmTensor, to_pyerr, DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor,
DistributedRuntime as PyDistributedRuntime, to_pyerr,
}; };
use anyhow; use anyhow;
use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig};
use dynamo_llm::block_manager::storage::torch::TorchTensor; use dynamo_llm::block_manager::storage::torch::TorchTensor;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
use dynamo_runtime::DistributedRuntime; use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub trait Worker: Send + Sync { pub trait Worker: Send + Sync {
fn register_kv_caches( fn register_kv_caches(
...@@ -274,7 +274,10 @@ impl Worker for KvConnectorWorker { ...@@ -274,7 +274,10 @@ impl Worker for KvConnectorWorker {
.maybe_finished_offloading .maybe_finished_offloading
.contains(&request_id.to_string()) .contains(&request_id.to_string())
{ {
tracing::warn!(request_id, "possibly got a duplicate finished request; request_id already in the maybe_finished_offloading set"); tracing::warn!(
request_id,
"possibly got a duplicate finished request; request_id already in the maybe_finished_offloading set"
);
} else { } else {
tracing::debug!( tracing::debug!(
request_id, request_id,
...@@ -300,7 +303,10 @@ impl Worker for KvConnectorWorker { ...@@ -300,7 +303,10 @@ impl Worker for KvConnectorWorker {
.maybe_finished_onboarding .maybe_finished_onboarding
.contains(&request_id.to_string()) .contains(&request_id.to_string())
{ {
tracing::warn!(request_id, "possibly got a duplicate finished request; request_id already in the maybe_finished_onboarding set"); tracing::warn!(
request_id,
"possibly got a duplicate finished request; request_id already in the maybe_finished_onboarding set"
);
} }
} }
...@@ -316,7 +322,9 @@ impl Worker for KvConnectorWorker { ...@@ -316,7 +322,9 @@ impl Worker for KvConnectorWorker {
} else { } else {
// made this condition more strict slot existence checks were added as a prerequesite // made this condition more strict slot existence checks were added as a prerequesite
// to be added to the maybe_finished_offloading set. // to be added to the maybe_finished_offloading set.
panic!("request slot missing for {request_id}; however, it was present when added to the maybe finished offloading set"); panic!(
"request slot missing for {request_id}; however, it was present when added to the maybe finished offloading set"
);
} }
} }
...@@ -329,7 +337,10 @@ impl Worker for KvConnectorWorker { ...@@ -329,7 +337,10 @@ impl Worker for KvConnectorWorker {
if self.connector.has_slot(request_id) { if self.connector.has_slot(request_id) {
self.connector.remove_slot(request_id); self.connector.remove_slot(request_id);
} else { } else {
tracing::debug!(request_id, "is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set"); tracing::debug!(
request_id,
"is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set"
);
} }
} }
...@@ -343,7 +354,9 @@ impl Worker for KvConnectorWorker { ...@@ -343,7 +354,9 @@ impl Worker for KvConnectorWorker {
tracing::debug!(request_id, "request slot is not finished onboarding"); tracing::debug!(request_id, "request slot is not finished onboarding");
} }
} else { } else {
panic!("request slot missing for {request_id}; however, it was present when added to the maybe finished onboarding set"); panic!(
"request slot missing for {request_id}; however, it was present when added to the maybe finished onboarding set"
);
} }
} }
......
...@@ -13,16 +13,16 @@ use std::sync::{Arc, OnceLock}; ...@@ -13,16 +13,16 @@ use std::sync::{Arc, OnceLock};
use super::*; use super::*;
use crate::llm::block_manager::distributed::get_barrier_id_prefix; use crate::llm::block_manager::distributed::get_barrier_id_prefix;
use crate::{ use crate::{
llm::block_manager::distributed::VllmTensor, to_pyerr, DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor,
DistributedRuntime as PyDistributedRuntime, to_pyerr,
}; };
use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use anyhow; use anyhow;
use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig};
use dynamo_llm::block_manager::storage::torch::TorchTensor; use dynamo_llm::block_manager::storage::torch::TorchTensor;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
use dynamo_runtime::DistributedRuntime; use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub trait Worker: Send + Sync { pub trait Worker: Send + Sync {
fn register_kv_caches( fn register_kv_caches(
...@@ -326,7 +326,10 @@ impl Worker for KvConnectorWorker { ...@@ -326,7 +326,10 @@ impl Worker for KvConnectorWorker {
"got a finished warning for a request that is onboarding" "got a finished warning for a request that is onboarding"
); );
} else if self.maybe_finished_offloading.contains(&request_id) { } else if self.maybe_finished_offloading.contains(&request_id) {
tracing::warn!(request_id, "possibly got a duplicate finished request; request_id already in the maybe_finished_offloading set"); tracing::warn!(
request_id,
"possibly got a duplicate finished request; request_id already in the maybe_finished_offloading set"
);
} else { } else {
tracing::debug!( tracing::debug!(
request_id, request_id,
...@@ -348,7 +351,9 @@ impl Worker for KvConnectorWorker { ...@@ -348,7 +351,9 @@ impl Worker for KvConnectorWorker {
} else { } else {
// made this condition more strict slot existence checks were added as a prerequesite // made this condition more strict slot existence checks were added as a prerequesite
// to be added to the maybe_finished_offloading set. // to be added to the maybe_finished_offloading set.
panic!("request slot missing for {request_id}; however, it was present when added to the maybe finished offloading set"); panic!(
"request slot missing for {request_id}; however, it was present when added to the maybe finished offloading set"
);
} }
} }
...@@ -361,7 +366,10 @@ impl Worker for KvConnectorWorker { ...@@ -361,7 +366,10 @@ impl Worker for KvConnectorWorker {
if self.connector.has_slot(request_id) { if self.connector.has_slot(request_id) {
self.connector.remove_slot(request_id); self.connector.remove_slot(request_id);
} else { } else {
tracing::debug!(request_id, "is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set"); tracing::debug!(
request_id,
"is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set"
);
} }
} }
...@@ -375,7 +383,9 @@ impl Worker for KvConnectorWorker { ...@@ -375,7 +383,9 @@ impl Worker for KvConnectorWorker {
tracing::debug!(request_id, "request slot is not finished"); tracing::debug!(request_id, "request slot is not finished");
} }
} else { } else {
panic!("request slot missing for {request_id}; however, it was present when added to the maybe finished onboarding set"); panic!(
"request slot missing for {request_id}; however, it was present when added to the maybe finished onboarding set"
);
} }
} }
...@@ -460,7 +470,7 @@ impl PyKvConnectorWorker { ...@@ -460,7 +470,7 @@ impl PyKvConnectorWorker {
} }
use cudarc::driver::sys::{ use cudarc::driver::sys::{
cuCtxGetCurrent, cuEventSynchronize, cudaError_enum, CUcontext, CUevent, CUcontext, CUevent, cuCtxGetCurrent, cuEventSynchronize, cudaError_enum,
}; };
use std::ptr; use std::ptr;
......
...@@ -67,7 +67,11 @@ impl<S: Storage, L: LocalityProvider> std::fmt::Debug for Slot<S, L> { ...@@ -67,7 +67,11 @@ impl<S: Storage, L: LocalityProvider> std::fmt::Debug for Slot<S, L> {
.iter() .iter()
.map(|b| b.block_id()) .map(|b| b.block_id())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
write!(f, "Slot(computed_position: {}, prefill_position: {}, immutable_block_ids: {:?}, mutable_block_ids: {:?})", self.computed_position, self.prefill_position, immutable_block_ids, mutable_block_ids) write!(
f,
"Slot(computed_position: {}, prefill_position: {}, immutable_block_ids: {:?}, mutable_block_ids: {:?})",
self.computed_position, self.prefill_position, immutable_block_ids, mutable_block_ids
)
} }
} }
......
...@@ -6,9 +6,9 @@ use std::path::PathBuf; ...@@ -6,9 +6,9 @@ use std::path::PathBuf;
use pyo3::{exceptions::PyException, prelude::*}; use pyo3::{exceptions::PyException, prelude::*};
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::EngineConfig as RsEngineConfig; use dynamo_llm::entrypoint::EngineConfig as RsEngineConfig;
use dynamo_llm::entrypoint::RouterConfig as RsRouterConfig; use dynamo_llm::entrypoint::RouterConfig as RsRouterConfig;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::kv_router::KvRouterConfig as RsKvRouterConfig; use dynamo_llm::kv_router::KvRouterConfig as RsKvRouterConfig;
use dynamo_llm::local_model::DEFAULT_HTTP_PORT; use dynamo_llm::local_model::DEFAULT_HTTP_PORT;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder}; use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
......
...@@ -8,8 +8,8 @@ use tokio_stream::StreamExt; ...@@ -8,8 +8,8 @@ use tokio_stream::StreamExt;
use super::*; use super::*;
use crate::Component; use crate::Component;
use llm_rs::kv_router::indexer::compute_block_hash_for_seq;
use llm_rs::kv_router::indexer::KvIndexerInterface; use llm_rs::kv_router::indexer::KvIndexerInterface;
use llm_rs::kv_router::indexer::compute_block_hash_for_seq;
use llm_rs::kv_router::protocols::ForwardPassMetrics as RsForwardPassMetrics; use llm_rs::kv_router::protocols::ForwardPassMetrics as RsForwardPassMetrics;
use llm_rs::kv_router::protocols::KvStats as RsKvStats; use llm_rs::kv_router::protocols::KvStats as RsKvStats;
use llm_rs::kv_router::protocols::SpecDecodeStats as RsSpecDecodeStats; use llm_rs::kv_router::protocols::SpecDecodeStats as RsSpecDecodeStats;
...@@ -18,7 +18,7 @@ use rs::traits::events::EventSubscriber; ...@@ -18,7 +18,7 @@ use rs::traits::events::EventSubscriber;
use tracing; use tracing;
use llm_rs::kv_router::protocols::*; use llm_rs::kv_router::protocols::*;
use llm_rs::kv_router::publisher::{create_stored_blocks, KvEventSourceConfig}; use llm_rs::kv_router::publisher::{KvEventSourceConfig, create_stored_blocks};
use llm_rs::protocols::common::{OutputOptions, SamplingOptions, StopConditions}; use llm_rs::protocols::common::{OutputOptions, SamplingOptions, StopConditions};
#[pyfunction] #[pyfunction]
......
...@@ -8,10 +8,10 @@ use llm_rs::{ ...@@ -8,10 +8,10 @@ use llm_rs::{
preprocessor::OpenAIPreprocessor, preprocessor::OpenAIPreprocessor,
protocols::common::llm_backend::{BackendOutput, PreprocessedRequest}, protocols::common::llm_backend::{BackendOutput, PreprocessedRequest},
types::{ types::{
Annotated,
openai::chat_completions::{ openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
}, },
Annotated,
}, },
}; };
......
...@@ -28,7 +28,7 @@ fn create_unique_blocks_from_sequence( ...@@ -28,7 +28,7 @@ fn create_unique_blocks_from_sequence(
.collect(); .collect();
// Only push the partial block if tokens count isn't a multiple of block_size // Only push the partial block if tokens count isn't a multiple of block_size
if tokens.total_tokens() % block_size != 0 { if !tokens.total_tokens().is_multiple_of(block_size) {
unique_blocks.push(match uuid { unique_blocks.push(match uuid {
Some(uuid) => UniqueBlock::PartialBlock(uuid), Some(uuid) => UniqueBlock::PartialBlock(uuid),
None => UniqueBlock::default(), None => UniqueBlock::default(),
...@@ -258,7 +258,7 @@ impl ActiveSequence { ...@@ -258,7 +258,7 @@ impl ActiveSequence {
self.generated_tokens = self.generated_tokens.saturating_sub(1); self.generated_tokens = self.generated_tokens.saturating_sub(1);
// Reverts to the last full block // Reverts to the last full block
if self.tokens.total_tokens() % self.block_size == 0 { if self.tokens.total_tokens().is_multiple_of(self.block_size) {
self.unique_blocks.pop(); self.unique_blocks.pop();
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment