Commit 4f7f4b40 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix(mac): Fix for virtual env (#164)

On Mac embedded python interpreters don't pick up the virtual env. This seems to be a known problem. Fix the sys.path.
parent 663cde81
...@@ -50,3 +50,24 @@ impl Default for MultiNodeConfig { ...@@ -50,3 +50,24 @@ impl Default for MultiNodeConfig {
} }
} }
} }
#[cfg(feature = "python")]
use pyo3::prelude::*;
/// On Mac embedded Python interpreters do not pick up the virtual env.
#[cfg(all(target_os = "macos", feature = "python"))]
fn fix_venv(venv: String, py: pyo3::Python<'_>) -> anyhow::Result<()> {
let version_info = py.version_info();
let sys: PyObject = py.import("sys")?.into();
let sys_path = sys.getattr(py, "path")?;
let venv_path = format!(
"{venv}/lib/python{}.{}/site-packages",
version_info.major, version_info.minor
);
// TODO: This should go _before_ the site-packages
sys_path.call_method1(py, "append", (venv_path,))?;
Ok(())
}
#[cfg(all(target_os = "linux", feature = "python"))]
fn fix_venv(_venv: String, _py: Python<'_>) {}
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
// limitations under the License. // limitations under the License.
use std::ffi::CStr; use std::ffi::CStr;
use std::{path::Path, sync::Arc}; use std::{env, path::Path, sync::Arc};
use anyhow::Context; use anyhow::Context;
use dynamo_runtime::pipeline::error as pipeline_error; use dynamo_runtime::pipeline::error as pipeline_error;
...@@ -76,6 +76,9 @@ pub async fn make_string_engine( ...@@ -76,6 +76,9 @@ pub async fn make_string_engine(
py_args: Vec<String>, py_args: Vec<String>,
) -> pipeline_error::Result<OpenAIChatCompletionsStreamingEngine> { ) -> pipeline_error::Result<OpenAIChatCompletionsStreamingEngine> {
pyo3::prepare_freethreaded_python(); pyo3::prepare_freethreaded_python();
if let Ok(venv) = env::var("VIRTUAL_ENV") {
Python::with_gil(|py| super::fix_venv(venv, py));
}
let engine = new_engine(cancel_token, py_file, py_args).await?; let engine = new_engine(cancel_token, py_file, py_args).await?;
let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine); let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine);
...@@ -89,6 +92,9 @@ pub async fn make_token_engine( ...@@ -89,6 +92,9 @@ pub async fn make_token_engine(
py_args: Vec<String>, py_args: Vec<String>,
) -> pipeline_error::Result<ExecutionContext> { ) -> pipeline_error::Result<ExecutionContext> {
pyo3::prepare_freethreaded_python(); pyo3::prepare_freethreaded_python();
if let Ok(venv) = env::var("VIRTUAL_ENV") {
Python::with_gil(|py| super::fix_venv(venv, py));
}
let engine = new_engine(cancel_token, py_file, py_args).await?; let engine = new_engine(cancel_token, py_file, py_args).await?;
let engine: ExecutionContext = Arc::new(engine); let engine: ExecutionContext = Arc::new(engine);
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
// limitations under the License. // limitations under the License.
use pyo3::{types::IntoPyDict, Python}; use pyo3::{types::IntoPyDict, Python};
use std::{os::fd::RawFd, path::Path}; use std::{env, os::fd::RawFd, path::Path};
use crate::engines::MultiNodeConfig; use crate::engines::MultiNodeConfig;
...@@ -93,6 +93,9 @@ pub fn run_subprocess( ...@@ -93,6 +93,9 @@ pub fn run_subprocess(
gpu_config: super::MultiGPUConfig, gpu_config: super::MultiGPUConfig,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize" pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
if let Ok(venv) = env::var("VIRTUAL_ENV") {
Python::with_gil(|py| crate::engines::fix_venv(venv, py));
}
let dir = model_path.display().to_string(); let dir = model_path.display().to_string();
Python::with_gil(|py| { Python::with_gil(|py| {
let locals = [ let locals = [
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
fmt, env, fmt,
os::fd::{FromRawFd as _, RawFd}, os::fd::{FromRawFd as _, RawFd},
path::Path, path::Path,
process::Stdio, process::Stdio,
...@@ -293,7 +293,10 @@ pub async fn start( ...@@ -293,7 +293,10 @@ pub async fn start(
tp_size: u32, tp_size: u32,
base_gpu_id: u32, base_gpu_id: u32,
) -> anyhow::Result<SgLangWorker> { ) -> anyhow::Result<SgLangWorker> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize" pyo3::prepare_freethreaded_python();
if let Ok(venv) = env::var("VIRTUAL_ENV") {
Python::with_gil(|py| crate::engines::fix_venv(venv, py));
}
let Sockets { let Sockets {
context, context,
......
...@@ -86,20 +86,16 @@ pub fn start_leader(leader_address: SocketAddrV4) -> Result<Ray, RayError> { ...@@ -86,20 +86,16 @@ pub fn start_leader(leader_address: SocketAddrV4) -> Result<Ray, RayError> {
// Process stdout // Process stdout
if let Some(stdout) = child.stdout.take() { if let Some(stdout) = child.stdout.take() {
let reader = BufReader::new(stdout); let reader = BufReader::new(stdout);
for line in reader.lines() { for line in reader.lines().map_while(Result::ok) {
if let Ok(line) = line { tracing::info!("RAY: {line}");
tracing::info!("RAY: {line}");
}
} }
} }
// Process stderr // Process stderr
if let Some(stderr) = child.stderr.take() { if let Some(stderr) = child.stderr.take() {
let reader = BufReader::new(stderr); let reader = BufReader::new(stderr);
for line in reader.lines() { for line in reader.lines().map_while(Result::ok) {
if let Ok(line) = line { tracing::info!("RAY: {line}");
tracing::info!("RAY: {line}");
}
} }
} }
...@@ -126,20 +122,16 @@ pub fn start_follower(leader_address: SocketAddrV4) -> Result<Ray, RayError> { ...@@ -126,20 +122,16 @@ pub fn start_follower(leader_address: SocketAddrV4) -> Result<Ray, RayError> {
// Process stdout // Process stdout
if let Some(stdout) = child.stdout.take() { if let Some(stdout) = child.stdout.take() {
let reader = BufReader::new(stdout); let reader = BufReader::new(stdout);
for line in reader.lines() { for line in reader.lines().map_while(Result::ok) {
if let Ok(line) = line { tracing::info!("RAY: {line}");
tracing::info!("RAY: {line}");
}
} }
} }
// Process stderr // Process stderr
if let Some(stderr) = child.stderr.take() { if let Some(stderr) = child.stderr.take() {
let reader = BufReader::new(stderr); let reader = BufReader::new(stderr);
for line in reader.lines() { for line in reader.lines().map_while(Result::ok) {
if let Ok(line) = line { tracing::info!("RAY: {line}");
tracing::info!("RAY: {line}");
}
} }
} }
...@@ -248,6 +240,7 @@ fn parse_ray_status(output: &str) -> Option<RayStatus> { ...@@ -248,6 +240,7 @@ fn parse_ray_status(output: &str) -> Option<RayStatus> {
// Regex to match node IDs // Regex to match node IDs
let node_regex = Regex::new(r"(\d+)\s+(node_[a-f0-9]+)").unwrap(); let node_regex = Regex::new(r"(\d+)\s+(node_[a-f0-9]+)").unwrap();
let num_regex = Regex::new(r"(\d+)").unwrap();
for line in output.lines() { for line in output.lines() {
let trimmed = line.trim(); let trimmed = line.trim();
...@@ -280,7 +273,7 @@ fn parse_ray_status(output: &str) -> Option<RayStatus> { ...@@ -280,7 +273,7 @@ fn parse_ray_status(output: &str) -> Option<RayStatus> {
} }
} else if in_pending_section && trimmed != "(no pending nodes)" { } else if in_pending_section && trimmed != "(no pending nodes)" {
// Count pending nodes // Count pending nodes
if let Some(captures) = Regex::new(r"(\d+)").unwrap().captures(trimmed) { if let Some(captures) = num_regex.captures(trimmed) {
if let Some(count) = captures.get(1) { if let Some(count) = captures.get(1) {
if let Ok(count) = count.as_str().parse::<usize>() { if let Ok(count) = count.as_str().parse::<usize>() {
pending_nodes_count += count; pending_nodes_count += count;
...@@ -289,7 +282,7 @@ fn parse_ray_status(output: &str) -> Option<RayStatus> { ...@@ -289,7 +282,7 @@ fn parse_ray_status(output: &str) -> Option<RayStatus> {
} }
} else if in_failures_section && trimmed != "(no failures)" { } else if in_failures_section && trimmed != "(no failures)" {
// Count failures // Count failures
if let Some(captures) = Regex::new(r"(\d+)").unwrap().captures(trimmed) { if let Some(captures) = num_regex.captures(trimmed) {
if let Some(count) = captures.get(1) { if let Some(count) = captures.get(1) {
if let Ok(count) = count.as_str().parse::<usize>() { if let Ok(count) = count.as_str().parse::<usize>() {
recent_failures_count += count; recent_failures_count += count;
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
// limitations under the License. // limitations under the License.
use pyo3::{types::IntoPyDict, Python}; use pyo3::{types::IntoPyDict, Python};
use std::env;
use std::path::Path; use std::path::Path;
use crate::engines::MultiNodeConfig; use crate::engines::MultiNodeConfig;
...@@ -56,6 +57,9 @@ pub fn run_subprocess( ...@@ -56,6 +57,9 @@ pub fn run_subprocess(
tp_size: u32, tp_size: u32,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize" pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
if let Ok(venv) = env::var("VIRTUAL_ENV") {
Python::with_gil(|py| crate::engines::fix_venv(venv, py));
}
let card = model_card_path.display().to_string(); let card = model_card_path.display().to_string();
let model_path_str = model_path.display().to_string(); let model_path_str = model_path.display().to_string();
Python::with_gil(|py| { Python::with_gil(|py| {
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
// limitations under the License. // limitations under the License.
use std::collections::HashMap; use std::collections::HashMap;
use std::env;
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::process::Stdio; use std::process::Stdio;
...@@ -169,6 +170,9 @@ pub async fn start( ...@@ -169,6 +170,9 @@ pub async fn start(
tensor_parallel_size: u32, tensor_parallel_size: u32,
) -> anyhow::Result<VllmWorker> { ) -> anyhow::Result<VllmWorker> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize" pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
if let Ok(venv) = env::var("VIRTUAL_ENV") {
Python::with_gil(|py| crate::engines::fix_venv(venv, py));
}
let py_imports = Arc::new(python_imports()); let py_imports = Arc::new(python_imports());
let Sockets { let Sockets {
...@@ -339,7 +343,7 @@ async fn start_vllm( ...@@ -339,7 +343,7 @@ async fn start_vllm(
let mut log_level = line_parts.next().unwrap_or_default(); let mut log_level = line_parts.next().unwrap_or_default();
// Skip date (0) and time (1). Print last (2) which is everything else. // Skip date (0) and time (1). Print last (2) which is everything else.
let line = line_parts.nth(2).unwrap_or_default(); let line = line_parts.nth(2).unwrap_or_default();
if line.starts_with("custom_op.py:68") || line.trim().len() == 0 { if line.starts_with("custom_op.py:68") || line.trim().is_empty() {
// Skip a noisy line // Skip a noisy line
// custom_op.py:68] custom op <the op> enabled // custom_op.py:68] custom op <the op> enabled
continue; continue;
...@@ -359,7 +363,7 @@ async fn start_vllm( ...@@ -359,7 +363,7 @@ async fn start_vllm(
tokio::spawn(async move { tokio::spawn(async move {
let mut lines = stderr.lines(); let mut lines = stderr.lines();
while let Ok(Some(line)) = lines.next_line().await { while let Ok(Some(line)) = lines.next_line().await {
if line.trim().len() == 0 { if line.trim().is_empty() {
continue; continue;
} }
tracing::warn!("VLLM: {line}"); tracing::warn!("VLLM: {line}");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment