Unverified Commit da436f86 authored by Nick Hill's avatar Nick Hill Committed by GitHub
Browse files

[Minor] Small pooler output processing optimization (#31667)


Signed-off-by: default avatarnjhill <nickhill123@gmail.com>
parent f099cd55
...@@ -263,7 +263,6 @@ class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput): ...@@ -263,7 +263,6 @@ class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput):
async_output_copy_stream: torch.cuda.Stream, async_output_copy_stream: torch.cuda.Stream,
): ):
self._model_runner_output = model_runner_output self._model_runner_output = model_runner_output
self._finished_mask = finished_mask
# Event on the copy stream so we can synchronize the non-blocking copy. # Event on the copy stream so we can synchronize the non-blocking copy.
self.async_copy_ready_event = torch.Event() self.async_copy_ready_event = torch.Event()
...@@ -276,11 +275,15 @@ class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput): ...@@ -276,11 +275,15 @@ class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput):
default_stream = torch.cuda.current_stream() default_stream = torch.cuda.current_stream()
with torch.cuda.stream(async_output_copy_stream): with torch.cuda.stream(async_output_copy_stream):
async_output_copy_stream.wait_stream(default_stream) async_output_copy_stream.wait_stream(default_stream)
self._raw_pooler_output_cpu = json_map_leaves( raw_pooler_output_cpu = json_map_leaves(
lambda x: None if x is None else x.to("cpu", non_blocking=True), lambda x: None if x is None else x.to("cpu", non_blocking=True),
self._raw_pooler_output, self._raw_pooler_output,
) )
self.async_copy_ready_event.record() self.async_copy_ready_event.record()
self._model_runner_output.pooler_output = [
out if include else None
for out, include in zip(raw_pooler_output_cpu, finished_mask)
]
def get_output(self) -> ModelRunnerOutput: def get_output(self) -> ModelRunnerOutput:
"""Copy the device tensors to the host and return a ModelRunnerOutput. """Copy the device tensors to the host and return a ModelRunnerOutput.
...@@ -290,11 +293,6 @@ class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput): ...@@ -290,11 +293,6 @@ class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput):
# Release the device tensors once the copy has completed. # Release the device tensors once the copy has completed.
del self._raw_pooler_output del self._raw_pooler_output
self._model_runner_output.pooler_output = [
out if include else None
for out, include in zip(self._raw_pooler_output_cpu, self._finished_mask)
]
return self._model_runner_output return self._model_runner_output
...@@ -2537,8 +2535,7 @@ class GPUModelRunner( ...@@ -2537,8 +2535,7 @@ class GPUModelRunner(
model = cast(VllmModelForPooling, self.model) model = cast(VllmModelForPooling, self.model)
raw_pooler_output: PoolerOutput = model.pooler( raw_pooler_output: PoolerOutput = model.pooler(
hidden_states=hidden_states, hidden_states=hidden_states, pooling_metadata=pooling_metadata
pooling_metadata=pooling_metadata,
) )
finished_mask = [ finished_mask = [
...@@ -2568,12 +2565,12 @@ class GPUModelRunner( ...@@ -2568,12 +2565,12 @@ class GPUModelRunner(
lambda x: None if x is None else x.to("cpu", non_blocking=True), lambda x: None if x is None else x.to("cpu", non_blocking=True),
raw_pooler_output, raw_pooler_output,
) )
self._sync_device()
model_runner_output.pooler_output = [ model_runner_output.pooler_output = [
out if include else None out if include else None
for out, include in zip(raw_pooler_output, finished_mask) for out, include in zip(raw_pooler_output, finished_mask)
] ]
self._sync_device()
return model_runner_output return model_runner_output
def _pad_for_sequence_parallelism(self, num_scheduled_tokens: int) -> int: def _pad_for_sequence_parallelism(self, num_scheduled_tokens: int) -> int:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment