Unverified Commit 02a58803 authored by Wentao Ye's avatar Wentao Ye Committed by GitHub
Browse files

[CI] Fix mypy for vllm/v1/executor (#30517)


Signed-off-by: default avataryewentao256 <zhyanwentao@126.com>
parent d2c919dc
...@@ -43,6 +43,7 @@ FILES = [ ...@@ -43,6 +43,7 @@ FILES = [
"vllm/worker", "vllm/worker",
"vllm/v1/core", "vllm/v1/core",
"vllm/v1/engine", "vllm/v1/engine",
"vllm/v1/executor",
"vllm/v1/metrics", "vllm/v1/metrics",
"vllm/v1/pool", "vllm/v1/pool",
"vllm/v1/sample", "vllm/v1/sample",
...@@ -60,7 +61,6 @@ SEPARATE_GROUPS = [ ...@@ -60,7 +61,6 @@ SEPARATE_GROUPS = [
"vllm/model_executor", "vllm/model_executor",
# v1 related # v1 related
"vllm/v1/attention", "vllm/v1/attention",
"vllm/v1/executor",
"vllm/v1/kv_offload", "vllm/v1/kv_offload",
"vllm/v1/spec_decode", "vllm/v1/spec_decode",
"vllm/v1/structured_output", "vllm/v1/structured_output",
......
...@@ -219,7 +219,7 @@ class Executor(ABC): ...@@ -219,7 +219,7 @@ class Executor(ABC):
def sample_tokens( def sample_tokens(
self, grammar_output: GrammarOutput | None, non_block: bool = False self, grammar_output: GrammarOutput | None, non_block: bool = False
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]: ) -> ModelRunnerOutput | Future[ModelRunnerOutput]:
output = self.collective_rpc( # type: ignore[call-overload] output = self.collective_rpc( # type: ignore[call-overload]
"sample_tokens", args=(grammar_output,), non_block=non_block "sample_tokens", args=(grammar_output,), non_block=non_block
) )
......
...@@ -294,8 +294,8 @@ class MultiprocExecutor(Executor): ...@@ -294,8 +294,8 @@ class MultiprocExecutor(Executor):
kwargs: dict | None = None, kwargs: dict | None = None,
non_block: bool = False, non_block: bool = False,
unique_reply_rank: int | None = None, unique_reply_rank: int | None = None,
kv_output_aggregator: KVOutputAggregator = None, kv_output_aggregator: KVOutputAggregator | None = None,
) -> Any | list[Any] | Future[Any | list[Any]]: ) -> Any:
"""Returns single result if unique_reply_rank and/or kv_output_aggregator """Returns single result if unique_reply_rank and/or kv_output_aggregator
is provided, otherwise list.""" is provided, otherwise list."""
assert self.rpc_broadcast_mq is not None, ( assert self.rpc_broadcast_mq is not None, (
...@@ -476,6 +476,8 @@ class WorkerProc: ...@@ -476,6 +476,8 @@ class WorkerProc:
"""Wrapper that runs one Worker in a separate process.""" """Wrapper that runs one Worker in a separate process."""
READY_STR = "READY" READY_STR = "READY"
rpc_broadcast_mq: MessageQueue | None
worker_response_mq: MessageQueue | None
def _init_message_queues( def _init_message_queues(
self, input_shm_handle: Handle, vllm_config: VllmConfig self, input_shm_handle: Handle, vllm_config: VllmConfig
...@@ -487,7 +489,7 @@ class WorkerProc: ...@@ -487,7 +489,7 @@ class WorkerProc:
) )
# Initializes a message queue for sending the model output # Initializes a message queue for sending the model output
self.worker_response_mq: MessageQueue = MessageQueue(1, 1) self.worker_response_mq = MessageQueue(1, 1)
self.peer_response_handles = [] self.peer_response_handles = []
else: else:
# Initialize remote MessageQueue for receiving SchedulerOutput across nodes # Initialize remote MessageQueue for receiving SchedulerOutput across nodes
...@@ -720,6 +722,7 @@ class WorkerProc: ...@@ -720,6 +722,7 @@ class WorkerProc:
try: try:
reader.close() reader.close()
worker = WorkerProc(*args, **kwargs) worker = WorkerProc(*args, **kwargs)
assert worker.worker_response_mq is not None
# Send READY once we know everything is loaded # Send READY once we know everything is loaded
ready_writer.send( ready_writer.send(
...@@ -804,6 +807,7 @@ class WorkerProc: ...@@ -804,6 +807,7 @@ class WorkerProc:
def worker_busy_loop(self, cancel: threading.Event | None = None): def worker_busy_loop(self, cancel: threading.Event | None = None):
"""Main busy loop for Multiprocessing Workers""" """Main busy loop for Multiprocessing Workers"""
assert self.rpc_broadcast_mq is not None
while True: while True:
method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue( method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue(
cancel=cancel, indefinite=True cancel=cancel, indefinite=True
......
...@@ -413,7 +413,7 @@ class RayDistributedExecutor(Executor): ...@@ -413,7 +413,7 @@ class RayDistributedExecutor(Executor):
self, self,
grammar_output: "GrammarOutput | None", grammar_output: "GrammarOutput | None",
non_block: bool = False, non_block: bool = False,
) -> ModelRunnerOutput | Future[ModelRunnerOutput]: ) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
"""Execute the model on the Ray workers. """Execute the model on the Ray workers.
The scheduler output to use should have been provided in The scheduler output to use should have been provided in
...@@ -428,7 +428,7 @@ class RayDistributedExecutor(Executor): ...@@ -428,7 +428,7 @@ class RayDistributedExecutor(Executor):
""" """
scheduler_output = self.scheduler_output scheduler_output = self.scheduler_output
if scheduler_output is None: if scheduler_output is None:
return COMPLETED_NONE_FUTURE if non_block else None # noqa return COMPLETED_NONE_FUTURE if non_block else None
self.scheduler_output = None self.scheduler_output = None
...@@ -439,7 +439,7 @@ class RayDistributedExecutor(Executor): ...@@ -439,7 +439,7 @@ class RayDistributedExecutor(Executor):
scheduler_output: SchedulerOutput, scheduler_output: SchedulerOutput,
grammar_output: "GrammarOutput | None", grammar_output: "GrammarOutput | None",
non_block: bool = False, non_block: bool = False,
) -> ModelRunnerOutput | Future[ModelRunnerOutput]: ) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
# Build the compiled DAG for the first time. # Build the compiled DAG for the first time.
if self.forward_dag is None: # type: ignore if self.forward_dag is None: # type: ignore
self.forward_dag = self._compiled_ray_dag(enable_asyncio=False) self.forward_dag = self._compiled_ray_dag(enable_asyncio=False)
......
...@@ -67,7 +67,7 @@ class UniProcExecutor(Executor): ...@@ -67,7 +67,7 @@ class UniProcExecutor(Executor):
kwargs: dict | None = None, kwargs: dict | None = None,
non_block: bool = False, non_block: bool = False,
single_value: bool = False, single_value: bool = False,
) -> Any | list[Any] | Future[Any | list[Any]]: ) -> Any:
if kwargs is None: if kwargs is None:
kwargs = {} kwargs = {}
...@@ -79,10 +79,13 @@ class UniProcExecutor(Executor): ...@@ -79,10 +79,13 @@ class UniProcExecutor(Executor):
result = run_method(self.driver_worker, method, args, kwargs) result = run_method(self.driver_worker, method, args, kwargs)
if isinstance(result, AsyncModelRunnerOutput): if isinstance(result, AsyncModelRunnerOutput):
if (async_thread := self.async_output_thread) is not None: if (async_thread := self.async_output_thread) is not None:
get_output = result.get_output if single_value:
if not single_value: return async_thread.submit(result.get_output)
get_output = lambda go=result.get_output: [go()]
return async_thread.submit(get_output) def get_output_list() -> list[Any]:
return [result.get_output()]
return async_thread.submit(get_output_list)
result = result.get_output() result = result.get_output()
future = Future[Any]() future = Future[Any]()
future.set_result(result if single_value else [result]) future.set_result(result if single_value else [result])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment