Unverified Commit fbad2860 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Remove the python bindings for port allocation (#4237)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent e1af3af6
...@@ -1645,18 +1645,15 @@ dependencies = [ ...@@ -1645,18 +1645,15 @@ dependencies = [
"dynamo-runtime", "dynamo-runtime",
"either", "either",
"futures", "futures",
"local-ip-address",
"once_cell", "once_cell",
"parking_lot", "parking_lot",
"prometheus", "prometheus",
"pyo3", "pyo3",
"pyo3-async-runtimes", "pyo3-async-runtimes",
"pythonize", "pythonize",
"rand 0.9.2",
"rstest", "rstest",
"serde", "serde",
"serde_json", "serde_json",
"socket2 0.6.0",
"thiserror 2.0.16", "thiserror 2.0.16",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
......
...@@ -36,11 +36,8 @@ async-trait = { version = "0.1" } ...@@ -36,11 +36,8 @@ async-trait = { version = "0.1" }
derive-getters = "0.5" derive-getters = "0.5"
either = { version = "1.13", features = ["serde"] } either = { version = "1.13", features = ["serde"] }
futures = { version = "0.3" } futures = { version = "0.3" }
local-ip-address = { version = "0.6" }
once_cell = { version = "1.20.3" } once_cell = { version = "1.20.3" }
parking_lot = { version = "0.12.4" } parking_lot = { version = "0.12.4" }
rand = { version = "0.9" }
socket2 = { version = "0.6" }
serde = { version = "1" } serde = { version = "1" }
serde_json = { version = "1.0.138" } serde_json = { version = "1.0.138" }
thiserror = { version = "2.0" } thiserror = { version = "2.0" }
......
...@@ -11,13 +11,10 @@ use pyo3::exceptions::PyStopAsyncIteration; ...@@ -11,13 +11,10 @@ use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::types::PyCapsule; use pyo3::types::PyCapsule;
use pyo3::types::{PyDict, PyString}; use pyo3::types::{PyDict, PyString};
use pyo3::{exceptions::PyException, prelude::*}; use pyo3::{exceptions::PyException, prelude::*};
use rand::seq::IteratorRandom as _;
use rs::pipeline::network::Ingress; use rs::pipeline::network::Ingress;
use std::ffi::CString; use std::ffi::CString;
use std::fs; use std::fs;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration;
use std::{ use std::{
fmt::Display, fmt::Display,
sync::{Arc, Weak}, sync::{Arc, Weak},
...@@ -312,7 +309,7 @@ fn register_llm<'p>( ...@@ -312,7 +309,7 @@ fn register_llm<'p>(
.media_fetcher(media_fetcher.map(|m| m.inner)); .media_fetcher(media_fetcher.map(|m| m.inner));
// Load the ModelDeploymentCard // Load the ModelDeploymentCard
let mut local_model = builder.build().await.map_err(to_pyerr)?; let mut local_model = builder.build().await.map_err(to_pyerr)?;
// Advertise ourself on etcd so ingress can find us // Advertise ourself so ingress can find us
local_model local_model
.attach(&endpoint.inner, model_type_obj, model_input) .attach(&endpoint.inner, model_type_obj, model_input)
.await .await
...@@ -497,138 +494,6 @@ impl DistributedRuntime { ...@@ -497,138 +494,6 @@ impl DistributedRuntime {
}) })
} }
/// Allocate a contiguous block of ports from the specified range and atomically reserve them.
/// Returns a list of all allocated ports in order.
#[pyo3(signature = (namespace, port_min, port_max, block_size, context=None))]
fn allocate_port_block<'p>(
&self,
py: Python<'p>,
namespace: &str,
port_min: u16,
port_max: u16,
block_size: u16,
context: Option<String>, // Optional info to store alongside the reservation
) -> PyResult<Bound<'p, PyAny>> {
const MAX_ALLOCATE_ATTEMPTS: usize = 100;
if block_size == 0 {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Block size must be at least 1",
));
}
let Some(etcd_client) = self.inner.etcd_client() else {
return Err(PyErr::new::<PyException, _>(
"Static workers should not need to reserve ports",
));
};
let min = port_min;
let max = port_max;
// Compute maximum valid starting port (inclusive)
let max_start_port = max.saturating_sub(block_size.saturating_sub(1));
if max_start_port < min {
return Err(PyErr::new::<PyException, _>(format!(
"Port range {min}-{max} is too small for block size {block_size}",
)));
}
// Randomize candidate starting ports to reduce contention/races
let candidate_count =
(max_start_port - port_min + 1).min(MAX_ALLOCATE_ATTEMPTS as u16) as usize;
let mut rng = rand::rng();
let candidate_ports: Vec<u16> =
(port_min..=max_start_port).choose_multiple(&mut rng, candidate_count);
let local_ip = match local_ip() {
Ok(ip) => ip,
Err(err) => {
return Err(PyErr::new::<PyException, _>(format!(
"Failed fetching local IP address: {err}"
)));
}
};
let context_bytes = context.map(|s| s.as_bytes().to_vec()).unwrap_or_default();
let namespace = namespace.to_owned();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
for (attempt_idx, start_port) in candidate_ports.into_iter().enumerate() {
let end_port_exclusive = start_port + block_size;
let ports_to_reserve: Vec<u16> = (start_port..end_port_exclusive).collect();
// Hold/bind all ports in the block
let mut sockets = Vec::with_capacity(ports_to_reserve.len());
let mut bind_failed = false;
for &port in &ports_to_reserve {
match bind_tcp_port(port) {
Ok(sock) => sockets.push(sock),
Err(e) => {
tracing::error!(
"Failed to bind to port block starting at {start_port} (attempt {}): {e}",
attempt_idx + 1,
);
bind_failed = true;
break;
}
}
}
if bind_failed {
// Let previously bound sockets drop here
if attempt_idx < candidate_count - 1 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
continue;
}
// With sockets held, reserve in ETCD
let mut reserved_keys = Vec::with_capacity(ports_to_reserve.len());
let mut reservation_failed = false;
for port in &ports_to_reserve {
let key = make_port_key(&namespace, local_ip, *port).map_err(to_pyerr)?;
if let Err(e) = etcd_client
.kv_create(&key, context_bytes.clone(), None)
.await
{
tracing::error!(
"Failed to reserve port block starting at {start_port} (attempt {}): {e}",
attempt_idx + 1,
);
reservation_failed = true;
break;
}
reserved_keys.push(key);
}
if reservation_failed {
// Cleanup partial reservations
for key in reserved_keys {
if let Err(e) = etcd_client.kv_delete(key.as_str(), None).await {
tracing::warn!("Failed to cleanup reserved port {key}: {e}");
}
}
// Sockets automatically released via RAII
if attempt_idx < candidate_count - 1 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
continue;
}
// Success - sockets will be released automatically
tracing::debug!("Reserved port block {ports_to_reserve:?}");
return Ok(ports_to_reserve);
}
Err(PyErr::new::<PyException, _>(format!(
"Failed to allocate and reserve a port block of size {block_size} from range {min}-{max} after {candidate_count} attempts"
)))
})
}
fn shutdown(&self) { fn shutdown(&self) {
self.inner.shutdown(); self.inner.shutdown();
} }
...@@ -658,33 +523,6 @@ impl DistributedRuntime { ...@@ -658,33 +523,6 @@ impl DistributedRuntime {
} }
} }
// Bind a TCP port and return a socket held until dropped.
fn bind_tcp_port(port: u16) -> std::io::Result<socket2::Socket> {
let sock = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
sock.set_reuse_address(true)?;
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port));
sock.bind(&addr.into())?;
Ok(sock)
}
fn make_port_key(namespace: &str, node_ip: IpAddr, port: u16) -> anyhow::Result<String> {
Ok(format!("v1/{namespace}/ports/{node_ip}/{port}"))
}
fn local_ip() -> Result<IpAddr, local_ip_address::Error> {
local_ip_address::local_ip().or_else(|err| match err {
local_ip_address::Error::LocalIpAddressNotFound => {
// Fall back to IPv6 if no IPv4 addresses are found
local_ip_address::local_ipv6()
}
_ => Err(err),
})
}
#[pymethods] #[pymethods]
impl CancellationToken { impl CancellationToken {
fn cancel(&self) { fn cancel(&self) {
......
...@@ -45,13 +45,6 @@ class DistributedRuntime: ...@@ -45,13 +45,6 @@ class DistributedRuntime:
""" """
... ...
def allocate_port_block(self, namespace, port_min, port_max, block_size, context=None) -> List[int]:
"""
Allocate a contiguous block of ports from the specified range and atomically reserve them.
Returns a list of all allocated ports in order.
"""
...
def shutdown(self) -> None: def shutdown(self) -> None:
""" """
Shutdown the runtime by triggering the cancellation token Shutdown the runtime by triggering the cancellation token
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment