Unverified Commit 761e848b authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

fix: Fix mocker+router segfaults + reliability fixes (#7162)


Signed-off-by: default avatarjthomson04 <jwillthomson19@gmail.com>
parent a31cbd0d
......@@ -284,6 +284,7 @@ pub struct EventPublisher {
sequence: AtomicU64,
tx: Arc<dyn EventTransportTx>,
codec: Arc<Codec>,
runtime_handle: tokio::runtime::Handle,
/// Discovery client and registered instance for unregistration on drop
discovery_client: Option<Arc<dyn Discovery>>,
discovery_instance: Option<crate::discovery::DiscoveryInstance>,
......@@ -335,6 +336,7 @@ impl EventPublisher {
) -> Result<Self> {
let publisher_id = drt.discovery().instance_id();
let discovery = Some(drt.discovery());
let runtime_handle = drt.runtime().secondary();
// Use Msgpack codec for all transports
enum TransportSetup {
......@@ -464,6 +466,7 @@ impl EventPublisher {
sequence: AtomicU64::new(0),
tx,
codec,
runtime_handle,
discovery_client: discovery,
discovery_instance,
})
......@@ -515,9 +518,12 @@ impl Drop for EventPublisher {
{
let topic = self.topic.clone();
let instance_id = instance.instance_id();
let runtime_handle = self.runtime_handle.clone();
// Spawn background task for async unregister since Drop is sync
tokio::spawn(async move {
// Drop can run outside any Tokio context (notably via PyO3 finalizers), so use
// the runtime that created the publisher rather than the ambient thread state.
let spawn_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
runtime_handle.spawn(async move {
match discovery.unregister(instance).await {
Ok(()) => {
tracing::info!(
......@@ -536,6 +542,15 @@ impl Drop for EventPublisher {
}
}
});
}));
if spawn_result.is_err() {
tracing::warn!(
topic = %self.topic,
instance_id = %instance_id,
"Skipping EventPublisher unregister during drop because the runtime is unavailable"
);
}
}
}
}
......
......@@ -190,6 +190,7 @@ filterwarnings = [
"ignore:The pynvml package is deprecated.*:FutureWarning", # Ignore pynvml deprecation warning, temporary until upstream library updates to nvidia-ml-py
"ignore:The behavior of DataFrame concatenation with empty or all-NA entries is deprecated.*:FutureWarning", # pandas 2.x concat deprecation in AIC SDK TODO: fix in AIC
"ignore:Automatic KV events configuration is deprecated.*:FutureWarning", # Ignore Dynamo's own KV events deprecation warning in tests
"ignore:builtin type (SwigPyPacked|SwigPyObject|swigvarlink) has no __module__ attribute:DeprecationWarning", # Python 3.12 SWIG extension warning from third-party tokenizer deps
# Pydantic V2 deprecation warnings from TRTLLM dependencies (raised at import time during collection)
"ignore:Support for class-based `config`.*:pydantic.warnings.PydanticDeprecatedSince20",
"ignore:Using extra keyword arguments on `Field`.*:pydantic.warnings.PydanticDeprecatedSince20",
......@@ -330,4 +331,3 @@ extra_content_footer = [
<script type="text/javascript">if (typeof _satellite !== "undefined") {_satellite.pageBottom();}</script>
''',
]
......@@ -459,14 +459,7 @@ class NatsServer(ManagedProcess):
def stop(self):
"""Stop the NATS server for restart. Does not release port or clean up fully."""
_logger.info(f"Stopping NATS server on port {self.port}")
self._terminate_process_group()
proc = self.proc # type: ignore[has-type]
if proc is not None:
try:
proc.wait(timeout=10)
except Exception as e:
_logger.warning(f"Error waiting for NATS process to stop: {e}")
self.proc = None
self._stop_started_processes()
def start(self):
"""Restart a stopped NATS server with fresh state."""
......
......@@ -46,7 +46,6 @@ pytestmark = [
pytest.mark.gpu_0,
pytest.mark.integration,
pytest.mark.model(MODEL_NAME),
pytest.mark.skip(reason="DYN-2365 - Flaky, temporarily disabled"),
]
NUM_MOCKERS = 2
SPEEDUP_RATIO = 10.0
......@@ -820,7 +819,7 @@ def test_kv_router_bindings(
],
indirect=["request_plane", "durable_kv_events"],
)
@pytest.mark.timeout(180)
@pytest.mark.timeout(300)
def test_indexers_sync(
request,
runtime_services_dynamic_ports,
......@@ -927,7 +926,7 @@ def test_query_instance_id_returns_worker_and_tokens(
)
@pytest.mark.timeout(90) # bumped for xdist contention (was 29s; ~9.55s serial avg)
@pytest.mark.timeout(300) # bumped for xdist contention (was 29s; ~9.55s serial avg)
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.parametrize(
"durable_kv_events,use_kv_events,zmq_kv_events",
......
......@@ -94,6 +94,33 @@ def terminate_process_tree(
psutil.wait_procs(alive, timeout=timeout)
class ManagedProcessStopError(RuntimeError):
"""Raised when a ManagedProcess subprocess cannot be stopped cleanly."""
def __init__(self, attr_name: str, pid: int, detail: str):
super().__init__(
f"Failed to stop managed process '{attr_name}' (pid={pid}): {detail}"
)
self.attr_name = attr_name
self.pid = pid
self.detail = detail
class ManagedProcessStopTimeoutError(ManagedProcessStopError):
"""Raised when a subprocess remains alive after forceful shutdown."""
def __init__(self, attr_name: str, pid: int, wait_timeout: float):
super().__init__(
attr_name,
pid,
(
"process did not exit after forceful shutdown "
f"within {wait_timeout:.1f}s"
),
)
self.wait_timeout = wait_timeout
@dataclass
class ManagedProcess:
"""Manages a subprocess with health checks and automatic cleanup.
......@@ -296,20 +323,7 @@ class ManagedProcess:
except psutil.NoSuchProcess:
pass
self._terminate_process_group()
process_list = [self.proc, self._tee_proc, self._sed_proc]
for process in process_list:
if process:
try:
if process.stdout:
process.stdout.close()
if process.stdin:
process.stdin.close()
terminate_process_tree(process.pid, self._logger)
process.wait()
except Exception as e:
self._logger.warning("Error terminating process: %s", e)
self._stop_started_processes()
# Kill any child processes that survived the process group kill.
# This catches children in different PGIDs (e.g. MPI workers, engine
......@@ -332,6 +346,114 @@ class ManagedProcess:
# Always run straggler cleanup, even if interrupted
self._cleanup_stragglers()
def _stop_started_processes(self, wait_timeout: float = 10.0):
"""Terminate launched subprocesses and close any open pipe handles.
This is used both during normal teardown and when a managed service
needs to stop and restart in-place without releasing higher-level
resources such as ports.
"""
self._terminate_process_group()
process_entries = [
("proc", self.proc),
("_tee_proc", self._tee_proc),
("_sed_proc", self._sed_proc),
]
for attr_name, process in process_entries:
if process is None:
continue
try:
for stream_name in ("stdout", "stdin", "stderr"):
stream = getattr(process, stream_name, None)
if stream is not None:
stream.close()
except OSError as e:
self._logger.warning("Error closing process streams: %s", e)
try:
terminate_process_tree(process.pid, self._logger)
process.wait(timeout=wait_timeout)
except (psutil.TimeoutExpired, subprocess.TimeoutExpired) as e:
self._logger.warning(
"Process '%s' (pid=%s) timed out during shutdown: %s. "
"Retrying with forceful termination.",
attr_name,
process.pid,
e,
)
self._force_stop_process(process, attr_name, wait_timeout)
except OSError as e:
if process.poll() is None:
self._logger.warning(
"Process '%s' (pid=%s) hit an OS error during shutdown: %s. "
"Retrying with forceful termination.",
attr_name,
process.pid,
e,
)
self._force_stop_process(process, attr_name, wait_timeout)
if process.poll() is None:
raise ManagedProcessStopTimeoutError(
attr_name, process.pid, wait_timeout
)
setattr(self, attr_name, None)
self._pgid = None
def _force_stop_process(
self,
process: subprocess.Popen,
attr_name: str,
wait_timeout: float,
) -> None:
"""Forcefully stop a process tree and confirm the launched child exits."""
try:
terminate_process_tree(
process.pid,
self._logger,
immediate_kill=True,
timeout=0,
)
except (psutil.TimeoutExpired, OSError) as e:
self._logger.warning(
"Forceful tree termination failed for process '%s' (pid=%s): %s. "
"Falling back to process.kill().",
attr_name,
process.pid,
e,
)
try:
process.kill()
except OSError as kill_err:
if process.poll() is None:
raise ManagedProcessStopError(
attr_name,
process.pid,
f"forceful shutdown failed: {kill_err}",
) from kill_err
try:
process.wait(timeout=wait_timeout)
except subprocess.TimeoutExpired as e:
if process.poll() is None:
raise ManagedProcessStopTimeoutError(
attr_name, process.pid, wait_timeout
) from e
except OSError as e:
if process.poll() is None:
raise ManagedProcessStopError(
attr_name,
process.pid,
f"error waiting for exit after forceful shutdown: {e}",
) from e
if process.poll() is None:
raise ManagedProcessStopTimeoutError(attr_name, process.pid, wait_timeout)
def _start_process(self):
assert self._command_name
assert self._log_path
......@@ -434,6 +556,11 @@ class ManagedProcess:
poll_interval = 0.1
elapsed = 0.0
while elapsed < timeout:
# Reap the launched child if it has already exited. Without this,
# the child can remain as a zombie and keep killpg(..., 0) reporting
# the process group as alive until the timeout expires.
if self.proc is not None:
self.proc.poll()
try:
# Check if any process in the group is still alive
os.killpg(self._pgid, 0) # Signal 0 = check existence (no kill)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment