Commit dd238a26 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: add routerless processor based monolith example (#180)

parent a509b8f6
...@@ -95,6 +95,12 @@ cd /workspace/deploy/examples/llm ...@@ -95,6 +95,12 @@ cd /workspace/deploy/examples/llm
dynamo serve monolith.routerless_deployment:Frontend -f ./configs/monolith/routerless_deployment.yaml dynamo serve monolith.routerless_deployment:Frontend -f ./configs/monolith/routerless_deployment.yaml
``` ```
#### Routerless processor based monolith
```bash
dynamo serve monolith.routerless_processor_deployment:Frontend -f ./configs/monolith/routerless_processor_deployment.yaml
```
#### Router based disaggregated serving #### Router based disaggregated serving
```bash ```bash
cd /workspace/deploy/examples/llm cd /workspace/deploy/examples/llm
...@@ -104,7 +110,7 @@ dynamo serve disaggregated.router_based_deployment:Frontend -f ./configs/disaggr ...@@ -104,7 +110,7 @@ dynamo serve disaggregated.router_based_deployment:Frontend -f ./configs/disaggr
#### Routerless disaggregated serving #### Routerless disaggregated serving
```bash ```bash
cd /workspace/deploy/examples/llm cd /workspace/deploy/examples/llm
dynamo serve disaggregated.routerless_deployment:Frontend -f ./configs/disaggregated/routerles_deployment.yaml dynamo serve disaggregated.routerless_deployment:Frontend -f ./configs/disaggregated/routerless_deployment.yaml
``` ```
### Client ### Client
......
...@@ -35,7 +35,6 @@ class FrontendConfig(BaseModel): ...@@ -35,7 +35,6 @@ class FrontendConfig(BaseModel):
resources={"cpu": "10", "memory": "20Gi"}, resources={"cpu": "10", "memory": "20Gi"},
workers=1, workers=1,
) )
# todo this should be called ApiServer # todo this should be called ApiServer
class Frontend: class Frontend:
worker = depends(VllmWorker) worker = depends(VllmWorker)
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import asyncio
import uuid import uuid
from enum import Enum from enum import Enum
from typing import AsyncIterator, Tuple, Union from typing import AsyncIterator, Tuple, Union
...@@ -29,7 +30,7 @@ from vllm.logger import logger as vllm_logger ...@@ -29,7 +30,7 @@ from vllm.logger import logger as vllm_logger
from vllm.outputs import RequestOutput from vllm.outputs import RequestOutput
from vllm.transformers_utils.tokenizer import AnyTokenizer from vllm.transformers_utils.tokenizer import AnyTokenizer
from dynamo.sdk import depends, dynamo_context, dynamo_endpoint, service from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service
class RequestType(Enum): class RequestType(Enum):
...@@ -63,6 +64,7 @@ class Processor(ProcessMixIn): ...@@ -63,6 +64,7 @@ class Processor(ProcessMixIn):
self.tokenizer, self.model_config self.tokenizer, self.model_config
) )
self.router_mode = self.engine_args.router self.router_mode = self.engine_args.router
self.min_workers = 1
def _create_tokenizer(self, engine_args: AsyncEngineArgs) -> AnyTokenizer: def _create_tokenizer(self, engine_args: AsyncEngineArgs) -> AnyTokenizer:
"""Create a TokenizerGroup using engine arguments similar to VLLM's approach""" """Create a TokenizerGroup using engine arguments similar to VLLM's approach"""
...@@ -78,6 +80,25 @@ class Processor(ProcessMixIn): ...@@ -78,6 +80,25 @@ class Processor(ProcessMixIn):
) )
return base_tokenizer return base_tokenizer
@async_on_start
async def async_init(self):
runtime = dynamo_context["runtime"]
comp_ns, comp_name = VllmWorker.dynamo_address() # type: ignore
print(f"[Processor] comp_ns: {comp_ns}, comp_name: {comp_name}")
self.worker_client = (
await runtime.namespace(comp_ns)
.component(comp_name)
.endpoint("generate")
.client()
)
while len(self.worker_client.endpoint_ids()) < self.min_workers:
print(
f"Waiting for workers to be ready.\n"
f" Current: {len(self.worker_client.endpoint_ids())},"
f" Required: {self.min_workers}"
)
await asyncio.sleep(2)
async def _generate( async def _generate(
self, self,
raw_request: Union[CompletionRequest, ChatCompletionRequest], raw_request: Union[CompletionRequest, ChatCompletionRequest],
...@@ -92,15 +113,6 @@ class Processor(ProcessMixIn): ...@@ -92,15 +113,6 @@ class Processor(ProcessMixIn):
engine_prompt, engine_prompt,
sampling_params, sampling_params,
) = await self._parse_raw_request(raw_request) ) = await self._parse_raw_request(raw_request)
runtime = dynamo_context["runtime"]
comp_ns, comp_name = VllmWorker.dynamo_address() # type: ignore
print(f"[Processor] comp_ns: {comp_ns}, comp_name: {comp_name}")
worker_client = (
await runtime.namespace(comp_ns)
.component(comp_name)
.endpoint("generate")
.client()
)
if self.router_mode == "kv": if self.router_mode == "kv":
async for route_response in self.router.generate( async for route_response in self.router.generate(
Tokens(tokens=engine_prompt["prompt_token_ids"]).model_dump_json() Tokens(tokens=engine_prompt["prompt_token_ids"]).model_dump_json()
...@@ -113,7 +125,7 @@ class Processor(ProcessMixIn): ...@@ -113,7 +125,7 @@ class Processor(ProcessMixIn):
break break
if worker_id == "": if worker_id == "":
engine_generator = await worker_client.generate( engine_generator = await self.worker_client.generate(
vLLMGenerateRequest( vLLMGenerateRequest(
engine_prompt=engine_prompt, engine_prompt=engine_prompt,
sampling_params=sampling_params, sampling_params=sampling_params,
...@@ -122,7 +134,7 @@ class Processor(ProcessMixIn): ...@@ -122,7 +134,7 @@ class Processor(ProcessMixIn):
).model_dump_json() ).model_dump_json()
) )
else: else:
engine_generator = await worker_client.direct( engine_generator = await self.worker_client.direct(
vLLMGenerateRequest( vLLMGenerateRequest(
engine_prompt=engine_prompt, engine_prompt=engine_prompt,
sampling_params=sampling_params, sampling_params=sampling_params,
...@@ -132,22 +144,21 @@ class Processor(ProcessMixIn): ...@@ -132,22 +144,21 @@ class Processor(ProcessMixIn):
int(worker_id), int(worker_id),
) )
elif self.router_mode == "random": elif self.router_mode == "random":
engine_generator = await worker_client.generate( engine_generator = await self.worker_client.generate(
vLLMGenerateRequest(
engine_prompt=engine_prompt,
sampling_params=sampling_params,
request_id=request_id,
).model_dump_json()
)
elif self.router_mode == "round-robin":
engine_generator = await self.worker_client.round_robin(
vLLMGenerateRequest( vLLMGenerateRequest(
engine_prompt=engine_prompt, engine_prompt=engine_prompt,
sampling_params=sampling_params, sampling_params=sampling_params,
request_id=request_id, request_id=request_id,
).model_dump_json() ).model_dump_json()
) )
# TODO: add round-robin mode
# elif self.router_mode == "round-robin":
# engine_generator = await self.worker.round_robin(
# vLLMGenerateRequest(
# engine_prompt=engine_prompt,
# sampling_params=sampling_params,
# request_id=request_id,
# ).model_dump_json()
# )
output = self._generate_responses(engine_generator, request_type) output = self._generate_responses(engine_generator, request_type)
......
...@@ -90,14 +90,14 @@ class VllmWorker: ...@@ -90,14 +90,14 @@ class VllmWorker:
os.environ["VLLM_KV_NAMESPACE"] = "dynamo-init" os.environ["VLLM_KV_NAMESPACE"] = "dynamo-init"
os.environ["VLLM_KV_COMPONENT"] = class_name os.environ["VLLM_KV_COMPONENT"] = class_name
vllm_logger.info(f"Generate endpoint ID: {VLLM_WORKER_ID}") vllm_logger.info(f"Generate endpoint ID: {VLLM_WORKER_ID}")
# note: worker_index is 1-based, but CUDA_VISIBLE_DEVICES is 0-based # note: worker_index is 1-based, but CUDA_VISIBLE_DEVICES is 0-based
gpu_idx = ( gpu_idx = (
self.engine_args.cuda_visible_device_offset self.engine_args.cuda_visible_device_offset
+ server_context.worker_index + server_context.worker_index
- 1 - 1
) )
os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_idx}" os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_idx}"
self.metrics_publisher = KvMetricsPublisher() self.metrics_publisher = KvMetricsPublisher()
@async_on_start @async_on_start
async def async_init(self): async def async_init(self):
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Frontend:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo-init.Processor.chat/completions
port: 8000
Processor:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
router: random
VllmWorker:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
enforce-eager: true
block-size: 64
max-model-len: 16384
max-num-batched-tokens: 16384
enable-prefix-caching: true
router: random
tensor-parallel-size: 1
ServiceArgs:
workers: 1
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from components.frontend import Frontend
from components.processor import Processor
from components.worker import VllmWorker
Frontend.link(Processor).link(VllmWorker)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment