Unverified Commit 07a64744 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

refactor: remove vllm prefill worker background loop check (#3529)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 9414e3be
...@@ -129,43 +129,6 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -129,43 +129,6 @@ class DecodeWorkerHandler(BaseWorkerHandler):
super().__init__(runtime, component, engine, default_sampling_params) super().__init__(runtime, component, engine, default_sampling_params)
self.prefill_worker_client = prefill_worker_client self.prefill_worker_client = prefill_worker_client
self.prefill_router_client = prefill_router_client self.prefill_router_client = prefill_router_client
self.can_prefill = 0
self._prefill_check_task = None
if self.prefill_worker_client or self.prefill_router_client:
self._prefill_check_task = asyncio.create_task(self._prefill_check_loop())
async def _prefill_check_loop(self):
"""Background task that checks prefill router/worker availability every 5 seconds."""
while True:
try:
router_count = (
len(self.prefill_router_client.instance_ids())
if self.prefill_router_client is not None
else 0
)
worker_count = (
len(self.prefill_worker_client.instance_ids())
if self.prefill_worker_client is not None
else 0
)
self.can_prefill = max(router_count, worker_count)
logger.debug(
f"Prefill availability - Routers: {router_count}, Workers: {worker_count}"
)
except asyncio.CancelledError:
logger.warning("Prefill check loop cancelled.")
raise
except Exception as e:
logger.error(f"Error in prefill check loop: {e}")
await asyncio.sleep(5)
def cleanup(self):
"""Cancel background tasks."""
if self._prefill_check_task is not None:
self._prefill_check_task.cancel()
super().cleanup()
async def generate(self, request, context): async def generate(self, request, context):
request_id = str(uuid.uuid4().hex) request_id = str(uuid.uuid4().hex)
...@@ -185,7 +148,11 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -185,7 +148,11 @@ class DecodeWorkerHandler(BaseWorkerHandler):
setattr(sampling_params, key, value) setattr(sampling_params, key, value)
# Use prefill router or worker if available # Use prefill router or worker if available
if self.can_prefill: can_prefill = (
self.prefill_worker_client is not None
) and self.prefill_worker_client.instance_ids()
if can_prefill:
# Create prefill sampling params with modifications # Create prefill sampling params with modifications
prefill_sampling_params = deepcopy(sampling_params) prefill_sampling_params = deepcopy(sampling_params)
if prefill_sampling_params.extra_args is None: if prefill_sampling_params.extra_args is None:
...@@ -217,15 +184,13 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -217,15 +184,13 @@ class DecodeWorkerHandler(BaseWorkerHandler):
prefill_request, context=context prefill_request, context=context
) )
) )
elif self.prefill_worker_client is not None: else:
# Fallback to direct worker with same format # Fallback to direct worker with same format
prefill_response = await anext( prefill_response = await anext(
await self.prefill_worker_client.round_robin( await self.prefill_worker_client.round_robin(
prefill_request, context=context prefill_request, context=context
) )
) )
else:
raise ValueError("No prefill router or worker available")
prefill_output = prefill_response.data() prefill_output = prefill_response.data()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment