Unverified Commit fe921763 authored by Yinghai Lu's avatar Yinghai Lu Committed by GitHub
Browse files

Add collective_rpc to llm engine (#16999)


Signed-off-by: default avatarYinghai Lu <yinghai@thinkingmachines.ai>
parent 6d0df0eb
...@@ -528,6 +528,13 @@ class _AsyncLLMEngine(LLMEngine): ...@@ -528,6 +528,13 @@ class _AsyncLLMEngine(LLMEngine):
async def check_health_async(self) -> None: async def check_health_async(self) -> None:
self.model_executor.check_health() self.model_executor.check_health()
async def collective_rpc_async(self,
method: str,
timeout: Optional[float] = None,
args: tuple = (),
kwargs: Optional[dict] = None):
raise NotImplementedError
async def build_guided_decoding_logits_processor_async( async def build_guided_decoding_logits_processor_async(
sampling_params: SamplingParams, tokenizer: AnyTokenizer, sampling_params: SamplingParams, tokenizer: AnyTokenizer,
...@@ -1236,6 +1243,17 @@ class AsyncLLMEngine(EngineClient): ...@@ -1236,6 +1243,17 @@ class AsyncLLMEngine(EngineClient):
async def add_lora(self, lora_request: LoRARequest) -> None: async def add_lora(self, lora_request: LoRARequest) -> None:
self.engine.add_lora(lora_request) self.engine.add_lora(lora_request)
async def collective_rpc(self,
method: str,
timeout: Optional[float] = None,
args: tuple = (),
kwargs: Optional[dict] = None):
"""
Perform a collective RPC call to the given path.
"""
return await self.engine.collective_rpc_async(method, timeout, args,
kwargs)
# TODO(v1): Remove this class proxy when V1 goes default. # TODO(v1): Remove this class proxy when V1 goes default.
if envs.is_set("VLLM_USE_V1") and envs.VLLM_USE_V1: if envs.is_set("VLLM_USE_V1") and envs.VLLM_USE_V1:
......
...@@ -492,6 +492,17 @@ class AsyncLLM(EngineClient): ...@@ -492,6 +492,17 @@ class AsyncLLM(EngineClient):
"""Prevent an adapter from being evicted.""" """Prevent an adapter from being evicted."""
return await self.engine_core.pin_lora_async(lora_id) return await self.engine_core.pin_lora_async(lora_id)
async def collective_rpc(self,
method: str,
timeout: Optional[float] = None,
args: tuple = (),
kwargs: Optional[dict] = None):
"""
Perform a collective RPC call to the given path.
"""
return await self.engine_core.collective_rpc_async(
method, timeout, args, kwargs)
@property @property
def is_running(self) -> bool: def is_running(self) -> bool:
# Is None before the loop is started. # Is None before the loop is started.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment