Unverified Commit 795668dc authored by Yingchun Lai's avatar Yingchun Lai Committed by GitHub
Browse files

feat: add tp_rank, pp_rank and dp_rank labels for scheduler metrics (#7597)


Co-authored-by: default avatarStefan He <hebiaobuaa@gmail.com>
parent 4395c87a
...@@ -252,6 +252,9 @@ class Scheduler( ...@@ -252,6 +252,9 @@ class Scheduler(
self.enable_overlap = not server_args.disable_overlap_schedule self.enable_overlap = not server_args.disable_overlap_schedule
self.skip_tokenizer_init = server_args.skip_tokenizer_init self.skip_tokenizer_init = server_args.skip_tokenizer_init
self.enable_metrics = server_args.enable_metrics self.enable_metrics = server_args.enable_metrics
self.enable_metrics_for_all_schedulers = (
server_args.enable_metrics_for_all_schedulers
)
self.enable_kv_cache_events = server_args.kv_events_config is not None self.enable_kv_cache_events = server_args.kv_events_config is not None
self.stream_interval = server_args.stream_interval self.stream_interval = server_args.stream_interval
self.spec_algorithm = SpeculativeAlgorithm.from_string( self.spec_algorithm = SpeculativeAlgorithm.from_string(
...@@ -281,9 +284,6 @@ class Scheduler( ...@@ -281,9 +284,6 @@ class Scheduler(
self.send_to_tokenizer = get_zmq_socket( self.send_to_tokenizer = get_zmq_socket(
context, zmq.PUSH, port_args.tokenizer_ipc_name, False context, zmq.PUSH, port_args.tokenizer_ipc_name, False
) )
self.send_metrics_from_scheduler = get_zmq_socket(
context, zmq.PUSH, port_args.metrics_ipc_name, False
)
if server_args.skip_tokenizer_init: if server_args.skip_tokenizer_init:
# Directly send to the TokenizerManager # Directly send to the TokenizerManager
...@@ -309,10 +309,14 @@ class Scheduler( ...@@ -309,10 +309,14 @@ class Scheduler(
else: else:
self.recv_from_tokenizer = None self.recv_from_tokenizer = None
self.recv_from_rpc = None self.recv_from_rpc = None
self.send_metrics_from_scheduler = None
self.send_to_tokenizer = SimpleNamespace(send_pyobj=lambda x: None) self.send_to_tokenizer = SimpleNamespace(send_pyobj=lambda x: None)
self.send_to_detokenizer = SimpleNamespace(send_pyobj=lambda x: None) self.send_to_detokenizer = SimpleNamespace(send_pyobj=lambda x: None)
if self.current_scheduler_metrics_enabled():
self.send_metrics_from_scheduler = get_zmq_socket(
context, zmq.PUSH, port_args.metrics_ipc_name, False
)
# Init tokenizer # Init tokenizer
self.init_tokenizer() self.init_tokenizer()
...@@ -495,7 +499,7 @@ class Scheduler( ...@@ -495,7 +499,7 @@ class Scheduler(
self.init_profier() self.init_profier()
# Init metrics stats # Init metrics stats
self.init_metrics() self.init_metrics(tp_rank, pp_rank, dp_rank)
self.init_kv_events(server_args.kv_events_config) self.init_kv_events(server_args.kv_events_config)
# Init request dispatcher # Init request dispatcher
...@@ -537,6 +541,9 @@ class Scheduler( ...@@ -537,6 +541,9 @@ class Scheduler(
if get_bool_env_var("SGLANG_GC_LOG"): if get_bool_env_var("SGLANG_GC_LOG"):
configure_gc_logger() configure_gc_logger()
def current_scheduler_metrics_enabled(self):
return self.attn_tp_rank == 0 or self.enable_metrics_for_all_schedulers
def maybe_sleep_on_idle(self): def maybe_sleep_on_idle(self):
if self.idle_sleeper is not None: if self.idle_sleeper is not None:
self.idle_sleeper.maybe_sleep() self.idle_sleeper.maybe_sleep()
...@@ -660,7 +667,7 @@ class Scheduler( ...@@ -660,7 +667,7 @@ class Scheduler(
self.profile_in_progress: bool = False self.profile_in_progress: bool = False
self.rpd_profiler = None self.rpd_profiler = None
def init_metrics(self): def init_metrics(self, tp_rank: int, pp_rank: int, dp_rank: Optional[int]):
self.last_gen_throughput: float = 0.0 self.last_gen_throughput: float = 0.0
self.last_input_throughput: float = 0.0 self.last_input_throughput: float = 0.0
self.step_time_dict = defaultdict(list) # Dict[batch size -> step time] self.step_time_dict = defaultdict(list) # Dict[batch size -> step time]
...@@ -671,12 +678,15 @@ class Scheduler( ...@@ -671,12 +678,15 @@ class Scheduler(
self.stats = SchedulerStats() self.stats = SchedulerStats()
if self.enable_metrics: if self.enable_metrics:
engine_type = "unified" engine_type = "unified"
self.metrics_collector = SchedulerMetricsCollector( labels = {
labels={ "model_name": self.server_args.served_model_name,
"model_name": self.server_args.served_model_name, "engine_type": engine_type,
"engine_type": engine_type, "tp_rank": tp_rank,
}, "pp_rank": pp_rank,
) }
if dp_rank is not None:
labels["dp_rank"] = dp_rank
self.metrics_collector = SchedulerMetricsCollector(labels=labels)
def init_kv_events(self, kv_events_config: Optional[str]): def init_kv_events(self, kv_events_config: Optional[str]):
if self.enable_kv_cache_events: if self.enable_kv_cache_events:
...@@ -1519,7 +1529,7 @@ class Scheduler( ...@@ -1519,7 +1529,7 @@ class Scheduler(
if ( if (
self.enable_metrics self.enable_metrics
and self.attn_tp_rank == 0 and self.current_scheduler_metrics_enabled()
and time.perf_counter() > self.metrics_collector.last_log_time + 30 and time.perf_counter() > self.metrics_collector.last_log_time + 30
): ):
# During idle time, also collect metrics every 30 seconds. # During idle time, also collect metrics every 30 seconds.
...@@ -1755,7 +1765,7 @@ class Scheduler( ...@@ -1755,7 +1765,7 @@ class Scheduler(
self.chunked_req.is_chunked += 1 self.chunked_req.is_chunked += 1
# Print stats # Print stats
if self.attn_tp_rank == 0: if self.current_scheduler_metrics_enabled():
self.log_prefill_stats(adder, can_run_list, running_bs) self.log_prefill_stats(adder, can_run_list, running_bs)
# Create a new batch # Create a new batch
......
...@@ -290,7 +290,7 @@ class SchedulerOutputProcessorMixin: ...@@ -290,7 +290,7 @@ class SchedulerOutputProcessorMixin:
self.forward_ct_decode = (self.forward_ct_decode + 1) % (1 << 30) self.forward_ct_decode = (self.forward_ct_decode + 1) % (1 << 30)
if ( if (
self.attn_tp_rank == 0 self.current_scheduler_metrics_enabled()
and self.forward_ct_decode % self.server_args.decode_log_interval == 0 and self.forward_ct_decode % self.server_args.decode_log_interval == 0
): ):
self.log_decode_stats(can_run_cuda_graph, running_batch=batch) self.log_decode_stats(can_run_cuda_graph, running_batch=batch)
......
...@@ -105,6 +105,7 @@ class ServerArgs: ...@@ -105,6 +105,7 @@ class ServerArgs:
crash_dump_folder: Optional[str] = None crash_dump_folder: Optional[str] = None
show_time_cost: bool = False show_time_cost: bool = False
enable_metrics: bool = False enable_metrics: bool = False
enable_metrics_for_all_schedulers: bool = False
bucket_time_to_first_token: Optional[List[float]] = None bucket_time_to_first_token: Optional[List[float]] = None
bucket_e2e_request_latency: Optional[List[float]] = None bucket_e2e_request_latency: Optional[List[float]] = None
bucket_inter_token_latency: Optional[List[float]] = None bucket_inter_token_latency: Optional[List[float]] = None
...@@ -1002,6 +1003,13 @@ class ServerArgs: ...@@ -1002,6 +1003,13 @@ class ServerArgs:
action="store_true", action="store_true",
help="Enable log prometheus metrics.", help="Enable log prometheus metrics.",
) )
parser.add_argument(
"--enable-metrics-for-all-schedulers",
action="store_true",
help="Enable --enable-metrics-for-all-schedulers when you want schedulers on all TP ranks (not just TP 0) "
"to record request metrics separately. This is especially useful when dp_attention is enabled, as "
"otherwise all metrics appear to come from TP 0.",
)
parser.add_argument( parser.add_argument(
"--bucket-time-to-first-token", "--bucket-time-to-first-token",
type=float, type=float,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment