Commit b0d3eba1 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix(dynamo-run): Network interface detection is Linux only (#133)

"netlink" doesn't exist on Mac. We print the primary network interface to help multi-node setup, which is also unlikely on Mac.
parent 3d9ade88
...@@ -1507,7 +1507,6 @@ dependencies = [ ...@@ -1507,7 +1507,6 @@ dependencies = [
"dynamo-runtime", "dynamo-runtime",
"futures", "futures",
"futures-util", "futures-util",
"libc",
"netlink-packet-route", "netlink-packet-route",
"rtnetlink", "rtnetlink",
"serde", "serde",
......
...@@ -41,9 +41,6 @@ clap = { version = "4.5", features = ["derive", "env"] } ...@@ -41,9 +41,6 @@ clap = { version = "4.5", features = ["derive", "env"] }
dialoguer = { version = "0.11", default-features = false, features = ["editor", "history"] } dialoguer = { version = "0.11", default-features = false, features = ["editor", "history"] }
futures = { version = "0.3" } futures = { version = "0.3" }
futures-util = "0.3" futures-util = "0.3"
libc = { version = "0.2" }
netlink-packet-route = { version = "0.19", optional = true }
rtnetlink = { version = "0.14", optional = true }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
...@@ -52,3 +49,7 @@ tracing = { version = "0.1" } ...@@ -52,3 +49,7 @@ tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "json"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "json"] }
dynamo-runtime = { path = "../../lib/runtime" } dynamo-runtime = { path = "../../lib/runtime" }
dynamo-llm = { path = "../../lib/llm" } dynamo-llm = { path = "../../lib/llm" }
[target.x86_64-unknown-linux-gnu.dependencies]
netlink-packet-route = { version = "0.19", optional = true }
rtnetlink = { version = "0.14", optional = true }
...@@ -13,77 +13,17 @@ ...@@ -13,77 +13,17 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use futures_util::TryStreamExt; // Mac build uses none of this
use netlink_packet_route::address::AddressAttribute; #![allow(dead_code)]
use netlink_packet_route::link::LinkLayerType;
use netlink_packet_route::link::State as LinkState;
use netlink_packet_route::link::{LinkAttribute, LinkMessage};
use netlink_packet_route::AddressFamily;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::{collections::HashMap, error::Error};
#[cfg(target_os = "linux")]
pub async fn get_primary_interface() -> Result<Option<String>, LinkDataError> { pub async fn get_primary_interface() -> Result<Option<String>, LinkDataError> {
let mut candidates: VecDeque<String> = get_ipv4_interface_links() unix::get_primary_interface().await
.await?
.into_iter()
.filter(|(k, v)| v.is_ethernet() && v.link_is_up() && v.has_carrier() && k.starts_with("e"))
.map(|(k, _)| k)
.collect();
Ok(candidates.pop_front())
} }
#[derive(Clone, Debug)] #[cfg(target_os = "macos")]
// Most of the fields are Option<T> because the netlink protocol allows them pub async fn get_primary_interface() -> Result<Option<String>, LinkDataError> {
// to be absent (even though we have no reason to believe they'd ever actually Ok(None)
// be missing).
struct InterfaceLinkData {
link_type: LinkLayerType,
state: Option<LinkState>,
has_carrier: bool,
}
impl InterfaceLinkData {
pub fn link_is_up(&self) -> bool {
self.state
.map(|state| matches!(state, LinkState::Up))
.unwrap_or(false)
}
pub fn is_ethernet(&self) -> bool {
matches!(self.link_type, LinkLayerType::Ether)
}
pub fn has_carrier(&self) -> bool {
self.has_carrier
}
}
impl From<LinkMessage> for InterfaceLinkData {
fn from(link_message: LinkMessage) -> Self {
let link_type = link_message.header.link_layer_type;
let state = link_message
.attributes
.iter()
.find_map(|attribute| match attribute {
LinkAttribute::OperState(state) => Some(*state),
_ => None,
});
let has_carrier = link_message
.attributes
.iter()
.find_map(|attribute| match attribute {
LinkAttribute::Carrier(1) => Some(true),
_ => None,
})
.unwrap_or(false);
InterfaceLinkData {
link_type,
state,
has_carrier,
}
}
} }
#[derive(Debug)] #[derive(Debug)]
...@@ -117,8 +57,8 @@ impl std::fmt::Display for LinkDataError { ...@@ -117,8 +57,8 @@ impl std::fmt::Display for LinkDataError {
} }
} }
impl Error for LinkDataError { impl std::error::Error for LinkDataError {
fn source(&self) -> Option<&(dyn Error + 'static)> { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self.kind { match self.kind {
LinkDataErrorKind::Connection(ref e) => Some(e), LinkDataErrorKind::Connection(ref e) => Some(e),
LinkDataErrorKind::Communication(ref e) => Some(e), LinkDataErrorKind::Communication(ref e) => Some(e),
...@@ -132,42 +72,121 @@ pub enum LinkDataErrorKind { ...@@ -132,42 +72,121 @@ pub enum LinkDataErrorKind {
Communication(rtnetlink::Error), Communication(rtnetlink::Error),
} }
// Retrieve the link data (state, MTU, etc.) for all interfaces, and return #[cfg(target_os = "linux")]
// them as a HashMap keyed by interface name. This is roughly equivalent to `ip mod unix {
// link show` since we're using the same netlink interface under the hood as
// that command. use futures_util::TryStreamExt;
async fn get_ipv4_interface_links() -> Result<HashMap<String, InterfaceLinkData>, LinkDataError> { use netlink_packet_route::address::AddressAttribute;
let (netlink_connection, rtnetlink_handle, _receiver) = use netlink_packet_route::link::LinkLayerType;
rtnetlink::new_connection().map_err(LinkDataError::connection)?; use netlink_packet_route::link::State as LinkState;
use netlink_packet_route::link::{LinkAttribute, LinkMessage};
// We have to spawn off the netlink connection because of the architecture use netlink_packet_route::AddressFamily;
// of `netlink_proto::Connection`, which runs in the background and owns use std::collections::HashMap;
// the socket. We communicate with it via channel messages, and it will exit use std::collections::HashSet;
// when both `rtnetlink_handle` and `_receiver` go out of scope. use std::collections::VecDeque;
tokio::spawn(netlink_connection);
pub async fn get_primary_interface() -> Result<Option<String>, super::LinkDataError> {
let address_handle = rtnetlink_handle.address().get().execute(); let mut candidates: VecDeque<String> = get_ipv4_interface_links()
let ipv4s: HashSet<String> = address_handle .await?
.try_filter_map(|addr_message| async move { .into_iter()
if matches!(addr_message.header.family, AddressFamily::Inet) { .filter(|(k, v)| {
Ok(addr_message v.is_ethernet() && v.link_is_up() && v.has_carrier() && k.starts_with("e")
.attributes })
.into_iter() .map(|(k, _)| k)
.find(|attr| matches!(attr, AddressAttribute::Label(_))) .collect();
.and_then(|x| match x {
AddressAttribute::Label(label) => Some(label), Ok(candidates.pop_front())
_ => None, }
}))
} else { #[derive(Clone, Debug)]
Ok(None) // Most of the fields are Option<T> because the netlink protocol allows them
// to be absent (even though we have no reason to believe they'd ever actually
// be missing).
struct InterfaceLinkData {
link_type: LinkLayerType,
state: Option<LinkState>,
has_carrier: bool,
}
impl InterfaceLinkData {
pub fn link_is_up(&self) -> bool {
self.state
.map(|state| matches!(state, LinkState::Up))
.unwrap_or(false)
}
pub fn is_ethernet(&self) -> bool {
matches!(self.link_type, LinkLayerType::Ether)
}
pub fn has_carrier(&self) -> bool {
self.has_carrier
}
}
impl From<LinkMessage> for InterfaceLinkData {
fn from(link_message: LinkMessage) -> Self {
let link_type = link_message.header.link_layer_type;
let state = link_message
.attributes
.iter()
.find_map(|attribute| match attribute {
LinkAttribute::OperState(state) => Some(*state),
_ => None,
});
let has_carrier = link_message
.attributes
.iter()
.find_map(|attribute| match attribute {
LinkAttribute::Carrier(1) => Some(true),
_ => None,
})
.unwrap_or(false);
InterfaceLinkData {
link_type,
state,
has_carrier,
} }
}) }
.try_collect() }
.await
.map_err(LinkDataError::communication)?;
let link_handle = rtnetlink_handle.link().get().execute(); // Retrieve the link data (state, MTU, etc.) for all interfaces, and return
link_handle // them as a HashMap keyed by interface name. This is roughly equivalent to `ip
// link show` since we're using the same netlink interface under the hood as
// that command.
async fn get_ipv4_interface_links(
) -> Result<HashMap<String, InterfaceLinkData>, super::LinkDataError> {
let (netlink_connection, rtnetlink_handle, _receiver) =
rtnetlink::new_connection().map_err(super::LinkDataError::connection)?;
// We have to spawn off the netlink connection because of the architecture
// of `netlink_proto::Connection`, which runs in the background and owns
// the socket. We communicate with it via channel messages, and it will exit
// when both `rtnetlink_handle` and `_receiver` go out of scope.
tokio::spawn(netlink_connection);
let address_handle = rtnetlink_handle.address().get().execute();
let ipv4s: HashSet<String> = address_handle
.try_filter_map(|addr_message| async move {
if matches!(addr_message.header.family, AddressFamily::Inet) {
Ok(addr_message
.attributes
.into_iter()
.find(|attr| matches!(attr, AddressAttribute::Label(_)))
.and_then(|x| match x {
AddressAttribute::Label(label) => Some(label),
_ => None,
}))
} else {
Ok(None)
}
})
.try_collect()
.await
.map_err(super::LinkDataError::communication)?;
let link_handle = rtnetlink_handle.link().get().execute();
link_handle
.try_filter_map(|link_message| async { .try_filter_map(|link_message| async {
let maybe_interface_data = match extract_interface_name(&link_message) { let maybe_interface_data = match extract_interface_name(&link_message) {
Some(interface_name) => { Some(interface_name) => {
...@@ -189,15 +208,16 @@ async fn get_ipv4_interface_links() -> Result<HashMap<String, InterfaceLinkData> ...@@ -189,15 +208,16 @@ async fn get_ipv4_interface_links() -> Result<HashMap<String, InterfaceLinkData>
}) })
.try_collect() .try_collect()
.await .await
.map_err(LinkDataError::communication) .map_err(super::LinkDataError::communication)
} }
fn extract_interface_name(link_message: &LinkMessage) -> Option<String> { fn extract_interface_name(link_message: &LinkMessage) -> Option<String> {
link_message link_message
.attributes .attributes
.iter() .iter()
.find_map(|attribute| match attribute { .find_map(|attribute| match attribute {
LinkAttribute::IfName(name) => Some(name.clone()), LinkAttribute::IfName(name) => Some(name.clone()),
_ => None, _ => None,
}) })
}
} }
...@@ -339,7 +339,7 @@ async fn start_vllm( ...@@ -339,7 +339,7 @@ async fn start_vllm(
let mut log_level = line_parts.next().unwrap_or_default(); let mut log_level = line_parts.next().unwrap_or_default();
// Skip date (0) and time (1). Print last (2) which is everything else. // Skip date (0) and time (1). Print last (2) which is everything else.
let line = line_parts.nth(2).unwrap_or_default(); let line = line_parts.nth(2).unwrap_or_default();
if line.starts_with("custom_op.py:68") { if line.starts_with("custom_op.py:68") || line.trim().len() == 0 {
// Skip a noisy line // Skip a noisy line
// custom_op.py:68] custom op <the op> enabled // custom_op.py:68] custom op <the op> enabled
continue; continue;
...@@ -349,7 +349,7 @@ async fn start_vllm( ...@@ -349,7 +349,7 @@ async fn start_vllm(
} }
match log_level { match log_level {
"DEBUG" => tracing::debug!("VLLM: {line}"), "DEBUG" => tracing::debug!("VLLM: {line}"),
"INFO" => tracing::debug!("VLLM: {line}"), // VLLM is noisy "INFO" => tracing::debug!("VLLM: {line}"), // VLLM is noisy in debug mode
"WARNING" => tracing::warn!("VLLM: {line}"), "WARNING" => tracing::warn!("VLLM: {line}"),
"ERROR" => tracing::error!("VLLM: {line}"), "ERROR" => tracing::error!("VLLM: {line}"),
level => tracing::info!("VLLM: {level} {line}"), level => tracing::info!("VLLM: {level} {line}"),
...@@ -359,6 +359,9 @@ async fn start_vllm( ...@@ -359,6 +359,9 @@ async fn start_vllm(
tokio::spawn(async move { tokio::spawn(async move {
let mut lines = stderr.lines(); let mut lines = stderr.lines();
while let Ok(Some(line)) = lines.next_line().await { while let Ok(Some(line)) = lines.next_line().await {
if line.trim().len() == 0 {
continue;
}
tracing::warn!("VLLM: {line}"); tracing::warn!("VLLM: {line}");
} }
}); });
...@@ -399,7 +402,7 @@ async fn start_vllm( ...@@ -399,7 +402,7 @@ async fn start_vllm(
.extract(py) .extract(py)
.unwrap() .unwrap()
}); });
tracing::info!("vllm zmq backend is ready: {resp:?}"); tracing::debug!("vllm zmq backend is ready: {resp:?}");
Ok(proc) Ok(proc)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment