Unverified Commit 2c9aebea authored by Liangsheng Yin's avatar Liangsheng Yin Committed by GitHub
Browse files

Simplify watchdog (#12463)

parent bc741073
...@@ -170,7 +170,6 @@ from sglang.srt.utils import ( ...@@ -170,7 +170,6 @@ from sglang.srt.utils import (
broadcast_pyobj, broadcast_pyobj,
configure_gc_logger, configure_gc_logger,
configure_logger, configure_logger,
disable_request_logging,
freeze_gc, freeze_gc,
get_available_gpu_memory, get_available_gpu_memory,
get_bool_env_var, get_bool_env_var,
...@@ -179,7 +178,6 @@ from sglang.srt.utils import ( ...@@ -179,7 +178,6 @@ from sglang.srt.utils import (
kill_itself_when_parent_died, kill_itself_when_parent_died,
numa_bind_to_node, numa_bind_to_node,
point_to_point_pyobj, point_to_point_pyobj,
pyspy_dump_schedulers,
require_mlp_sync, require_mlp_sync,
require_mlp_tp_gather, require_mlp_tp_gather,
set_gpu_proc_affinity, set_gpu_proc_affinity,
...@@ -2295,76 +2293,6 @@ class Scheduler( ...@@ -2295,76 +2293,6 @@ class Scheduler(
self._add_request_to_queue(req) self._add_request_to_queue(req)
self.grammar_queue = self.grammar_queue[num_ready_reqs:] self.grammar_queue = self.grammar_queue[num_ready_reqs:]
def watchdog_thread(self):
"""A watch dog thread that will try to kill the server itself if one forward batch takes too long."""
self.watchdog_last_forward_ct = 0
self.watchdog_last_time = time.perf_counter()
while True:
current = time.perf_counter()
if self.cur_batch is not None:
if self.watchdog_last_forward_ct == self.forward_ct:
if current > self.watchdog_last_time + self.watchdog_timeout:
break
else:
self.watchdog_last_forward_ct = self.forward_ct
self.watchdog_last_time = current
time.sleep(self.watchdog_timeout // 2)
if not disable_request_logging():
# Print batch size and memory pool info to check whether there are de-sync issues.
if self.is_hybrid:
(
_,
_,
_,
_,
full_available_size,
full_evictable_size,
swa_available_size,
swa_evictable_size,
) = self._get_swa_token_info()
info_msg = (
f"{full_available_size=}, "
f"{full_evictable_size=}, "
f"{swa_available_size=}, "
f"{swa_evictable_size=}, "
)
elif self.is_hybrid_gdn and isinstance(self.tree_cache, MambaRadixCache):
(
_,
_,
_,
_,
full_available_size,
full_evictable_size,
mamba_available_size,
mamba_evictable_size,
) = self._get_mamba_token_info()
info_msg = (
f"{full_available_size=}, "
f"{full_evictable_size=}, "
f"{mamba_available_size=}, "
f"{mamba_evictable_size=}, "
)
else:
_, _, available_size, evictable_size = self._get_token_info()
info_msg = f"{available_size=}, " f"{evictable_size=}, "
logger.error(
f"{self.cur_batch.batch_size()=}, "
f"{self.cur_batch.reqs=}, "
f"{info_msg}"
)
pyspy_dump_schedulers()
logger.error(f"Watchdog timeout ({self.watchdog_timeout=})")
print(file=sys.stderr, flush=True)
print(file=sys.stdout, flush=True)
# Wait for some time so that the parent process can print the error.
time.sleep(5)
self.parent_process.send_signal(signal.SIGQUIT)
def flush_cache_wrapped(self, recv_req: FlushCacheReqInput): def flush_cache_wrapped(self, recv_req: FlushCacheReqInput):
success = self.flush_cache() success = self.flush_cache()
return FlushCacheReqOutput(success=success) return FlushCacheReqOutput(success=success)
......
from __future__ import annotations from __future__ import annotations
import logging
import signal
import sys
import time import time
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
...@@ -7,10 +10,13 @@ from sglang.srt.disaggregation.utils import DisaggregationMode ...@@ -7,10 +10,13 @@ from sglang.srt.disaggregation.utils import DisaggregationMode
from sglang.srt.managers.schedule_batch import ScheduleBatch from sglang.srt.managers.schedule_batch import ScheduleBatch
from sglang.srt.mem_cache.mamba_radix_cache import MambaRadixCache from sglang.srt.mem_cache.mamba_radix_cache import MambaRadixCache
from sglang.srt.mem_cache.swa_radix_cache import SWARadixCache from sglang.srt.mem_cache.swa_radix_cache import SWARadixCache
from sglang.srt.utils.common import disable_request_logging, pyspy_dump_schedulers
if TYPE_CHECKING: if TYPE_CHECKING:
from sglang.srt.managers.scheduler import Scheduler from sglang.srt.managers.scheduler import Scheduler
logger = logging.getLogger(__name__)
class SchedulerRuntimeCheckerMixin: class SchedulerRuntimeCheckerMixin:
...@@ -215,3 +221,42 @@ class SchedulerRuntimeCheckerMixin: ...@@ -215,3 +221,42 @@ class SchedulerRuntimeCheckerMixin:
self.check_tree_cache() self.check_tree_cache()
self.new_token_ratio = self.init_new_token_ratio self.new_token_ratio = self.init_new_token_ratio
self.maybe_sleep_on_idle() self.maybe_sleep_on_idle()
def watchdog_thread(self: Scheduler):
"""A watch dog thread that will try to kill the server itself if one forward batch takes too long."""
self.watchdog_last_forward_ct = 0
self.watchdog_last_time = time.perf_counter()
while True:
current = time.perf_counter()
if self.cur_batch is not None:
if self.watchdog_last_forward_ct == self.forward_ct:
if current > self.watchdog_last_time + self.watchdog_timeout:
break
else:
self.watchdog_last_forward_ct = self.forward_ct
self.watchdog_last_time = current
time.sleep(self.watchdog_timeout // 2)
if not disable_request_logging():
# Print batch size and memory pool info to check whether there are de-sync issues.
if self.is_hybrid:
_, info_msg = self._check_hybrid_memory()
elif self.is_hybrid_gdn and isinstance(self.tree_cache, MambaRadixCache):
_, info_msg = self._check_mamba_memory()
else:
_, info_msg = self._check_radix_cache_memory()
logger.error(
f"{self.cur_batch.batch_size()=}\n"
f"{self.cur_batch.reqs=}\n"
f"{info_msg}"
)
pyspy_dump_schedulers()
logger.error(f"Watchdog timeout ({self.watchdog_timeout=})")
print(file=sys.stderr, flush=True)
print(file=sys.stdout, flush=True)
# Wait for some time so that the parent process can print the error.
time.sleep(5)
self.parent_process.send_signal(signal.SIGQUIT)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment