Unverified Commit c77620d2 authored by Woosuk Kwon's avatar Woosuk Kwon Committed by GitHub
Browse files

[V1][Minor] Minor code cleanup for scheduling metrics (#14800)


Signed-off-by: default avatarWoosuk Kwon <woosuk.kwon@berkeley.edu>
parent 989ecd20
...@@ -15,8 +15,8 @@ from vllm.v1.core.encoder_cache_manager import (EncoderCacheManager, ...@@ -15,8 +15,8 @@ from vllm.v1.core.encoder_cache_manager import (EncoderCacheManager,
from vllm.v1.core.kv_cache_manager import KVCacheManager from vllm.v1.core.kv_cache_manager import KVCacheManager
from vllm.v1.core.scheduler_output import (CachedRequestData, NewRequestData, from vllm.v1.core.scheduler_output import (CachedRequestData, NewRequestData,
SchedulerOutput) SchedulerOutput)
from vllm.v1.engine import (EngineCoreEvent, EngineCoreEventType, from vllm.v1.engine import (EngineCoreEventType, EngineCoreOutput,
EngineCoreOutput, EngineCoreOutputs) EngineCoreOutputs)
from vllm.v1.metrics.stats import SchedulerStats from vllm.v1.metrics.stats import SchedulerStats
from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus from vllm.v1.request import Request, RequestStatus
...@@ -178,7 +178,9 @@ class Scheduler: ...@@ -178,7 +178,9 @@ class Scheduler:
self.kv_cache_manager.free(preempted_req) self.kv_cache_manager.free(preempted_req)
preempted_req.status = RequestStatus.PREEMPTED preempted_req.status = RequestStatus.PREEMPTED
preempted_req.num_computed_tokens = 0 preempted_req.num_computed_tokens = 0
self.request_preempted(preempted_req, scheduled_timestamp) if self.log_stats:
preempted_req.record_event(
EngineCoreEventType.PREEMPTED, scheduled_timestamp)
self.waiting.appendleft(preempted_req) self.waiting.appendleft(preempted_req)
preempted_reqs.append(preempted_req) preempted_reqs.append(preempted_req)
...@@ -320,7 +322,9 @@ class Scheduler: ...@@ -320,7 +322,9 @@ class Scheduler:
req_index += 1 req_index += 1
self.running.append(request) self.running.append(request)
self.scheduled_req_ids.add(request.request_id) self.scheduled_req_ids.add(request.request_id)
self.request_scheduled(request, scheduled_timestamp) if self.log_stats:
request.record_event(EngineCoreEventType.SCHEDULED,
scheduled_timestamp)
if request.status == RequestStatus.WAITING: if request.status == RequestStatus.WAITING:
scheduled_new_reqs.append(request) scheduled_new_reqs.append(request)
elif request.status == RequestStatus.PREEMPTED: elif request.status == RequestStatus.PREEMPTED:
...@@ -666,7 +670,8 @@ class Scheduler: ...@@ -666,7 +670,8 @@ class Scheduler:
def add_request(self, request: Request) -> None: def add_request(self, request: Request) -> None:
self.waiting.append(request) self.waiting.append(request)
self.requests[request.request_id] = request self.requests[request.request_id] = request
self.request_queued(request) if self.log_stats:
request.record_event(EngineCoreEventType.QUEUED)
def finish_requests( def finish_requests(
self, self,
...@@ -728,26 +733,6 @@ class Scheduler: ...@@ -728,26 +733,6 @@ class Scheduler:
def reset_prefix_cache(self) -> bool: def reset_prefix_cache(self) -> bool:
return self.kv_cache_manager.reset_prefix_cache() return self.kv_cache_manager.reset_prefix_cache()
def request_queued(self, request: Request):
if not self.log_stats:
return
request.events.append(
EngineCoreEvent.new_event(EngineCoreEventType.QUEUED))
def request_scheduled(self, request: Request, timestamp: float):
if not self.log_stats:
return
request.events.append(
EngineCoreEvent.new_event(EngineCoreEventType.SCHEDULED,
timestamp))
def request_preempted(self, request: Request, timestamp: float):
if not self.log_stats:
return
request.events.append(
EngineCoreEvent.new_event(EngineCoreEventType.PREEMPTED,
timestamp))
def make_stats(self) -> Optional[SchedulerStats]: def make_stats(self) -> Optional[SchedulerStats]:
if not self.log_stats: if not self.log_stats:
return None return None
......
...@@ -88,21 +88,6 @@ class Request: ...@@ -88,21 +88,6 @@ class Request:
sampling_params=request.sampling_params), sampling_params=request.sampling_params),
) )
def queued(self, timestamp: Optional[float] = None) -> None:
self.events.append(
EngineCoreEvent.new_event(EngineCoreEventType.QUEUED, timestamp))
def scheduled(self, timestamp: Optional[float] = None) -> None:
self.events.append(
EngineCoreEvent.new_event(EngineCoreEventType.SCHEDULED,
timestamp))
def take_events(self) -> Optional[list[EngineCoreEvent]]:
if not self.events:
return None
events, self.events = self.events, []
return events
def append_output_token_ids( def append_output_token_ids(
self, self,
token_ids: Union[int, list[int]], token_ids: Union[int, list[int]],
...@@ -146,6 +131,19 @@ class Request: ...@@ -146,6 +131,19 @@ class Request:
def use_structured_output(self) -> bool: def use_structured_output(self) -> bool:
return self.sampling_params.guided_decoding is not None return self.sampling_params.guided_decoding is not None
def record_event(
self,
event_type: EngineCoreEventType,
timestamp: Optional[float] = None,
) -> None:
self.events.append(EngineCoreEvent.new_event(event_type, timestamp))
def take_events(self) -> Optional[list[EngineCoreEvent]]:
if not self.events:
return None
events, self.events = self.events, []
return events
class RequestStatus(enum.IntEnum): class RequestStatus(enum.IntEnum):
"""Status of a request.""" """Status of a request."""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment