Unverified Commit dc2f159f authored by Woosuk Kwon's avatar Woosuk Kwon Committed by GitHub
Browse files

Dump input metadata on crash for async scheduling (#21258)


Signed-off-by: default avatarWoosuk Kwon <woosuk.kwon@berkeley.edu>
parent d5b981f8
...@@ -234,9 +234,14 @@ class EngineCore: ...@@ -234,9 +234,14 @@ class EngineCore:
self.scheduler.finish_requests(request_ids, self.scheduler.finish_requests(request_ids,
RequestStatus.FINISHED_ABORTED) RequestStatus.FINISHED_ABORTED)
def execute_model(self, scheduler_output: SchedulerOutput): def execute_model_with_error_logging(
self,
model_fn: Callable[[SchedulerOutput], ModelRunnerOutput],
scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput:
"""Execute the model and log detailed info on failure."""
try: try:
return self.model_executor.execute_model(scheduler_output) return model_fn(scheduler_output)
except Exception as err: except Exception as err:
# We do not want to catch BaseException here since we're only # We do not want to catch BaseException here since we're only
# interested in dumping info when the exception is due to an # interested in dumping info when the exception is due to an
...@@ -259,7 +264,9 @@ class EngineCore: ...@@ -259,7 +264,9 @@ class EngineCore:
if not self.scheduler.has_requests(): if not self.scheduler.has_requests():
return {}, False return {}, False
scheduler_output = self.scheduler.schedule() scheduler_output = self.scheduler.schedule()
model_output = self.execute_model(scheduler_output) model_output = self.execute_model_with_error_logging(
self.model_executor.execute_model, # type: ignore
scheduler_output)
engine_core_outputs = self.scheduler.update_from_output( engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output) # type: ignore scheduler_output, model_output) # type: ignore
...@@ -306,8 +313,11 @@ class EngineCore: ...@@ -306,8 +313,11 @@ class EngineCore:
# so we need more work. # so we need more work.
if not scheduled_batch and not self.batch_queue.empty(): if not scheduled_batch and not self.batch_queue.empty():
future, scheduler_output = self.batch_queue.get_nowait() future, scheduler_output = self.batch_queue.get_nowait()
# Blocking until the first result is available. # Blocking until the first result is available.
model_output = future.result() model_output = self.execute_model_with_error_logging(
lambda _: future.result(), scheduler_output)
self.batch_queue.task_done() self.batch_queue.task_done()
engine_core_outputs = (self.scheduler.update_from_output( engine_core_outputs = (self.scheduler.update_from_output(
scheduler_output, model_output)) scheduler_output, model_output))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment