Commit 30c5a79f authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: unified entry point for vllm-nixl (#83)


Co-authored-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 2340751b
...@@ -49,115 +49,93 @@ All of the commands below are run inside the same container. ...@@ -49,115 +49,93 @@ All of the commands below are run inside the same container.
## Run deployment ## Run deployment
Add model to dynamo and start http server. This figure shows an overview of the major components to deploy:
```
+----------------+
+------| prefill worker |-------+
notify | | (optional) | |
finished | +----------------+ | pull
v v
+------+ +-----------+ +------------------+ push +---------------+
| HTTP |----->| processor |----->| decode/monolith |------------>| prefill queue |
| |<-----| |<-----| worker | (if disagg) | (optional) |
+------+ +-----------+ +------------------+ +---------------+
| ^ |
query best | | return | publish kv events
worker | | worker_id v
| | +------------------+
| +---------| kv-router |
+------------->| (optional) |
+------------------+
```
Add model to dynamo and start http server.
``` ```
llmctl http add chat-models deepseek-ai/DeepSeek-R1-Distill-Llama-8B dynamo-init.process.chat/completions
TRT_LOG=DEBUG http --port 8181 TRT_LOG=DEBUG http --port 8181
``` ```
### Router-less Deployment ### Processor
Router-less deployment without kv router and disaggregated router. Processor routes the requests to the (decode) workers. Three scheduling strategies are supported: 1. random, 2. round-robin, 3. kv-aware.
For router-less deployment, the client should directly hit the vllm.generate endpoint,
``` ```
llmctl http add chat-models deepseek-ai/DeepSeek-R1-Distill-Llama-8B dynamo-init.vllm.generate # Processor must take the same args as the (decoer) worker
# This is temporary until we communicate the ModelDeploymentCard over etcd
RUST_LOG=info python3 processor.py \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--tokenizer deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--block-size 64 \
--max-model-len 16384 \
<--random-router / --round-robin-router / --kv-router>
``` ```
#### Monolithic Alternatively, the processor can be bypassed by directly hitting the worker endpoints:
``` ```
cd /workspace/examples/python_rs/llm/vllm_nixl llmctl http add chat-models deepseek-ai/DeepSeek-R1-Distill-Llama-8B dynamo-init.vllm.generate
# monolithic
CUDA_VISIBLE_DEVICES=0 python3 routerless/worker.py \ CUDA_VISIBLE_DEVICES=0 python3 routerless/worker.py \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--enforce-eager --enforce-eager
```
#### Disaggregated
In disaggregated router-less deployment, the decode worker will directly send requests to a random prefill worker. All the requests will be sent to prefill worker(s) for remote prefill. # disaggregated
In terminal 1:
```
cd /workspace/examples/python_rs/llm/vllm_nixl
CUDA_VISIBLE_DEVICES=0 python routerless/prefill_worker.py \ CUDA_VISIBLE_DEVICES=0 python routerless/prefill_worker.py \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--enforce-eager \ --enforce-eager \
--block-size 64 \ --kv-transfer-config '{"kv_connector":"DynamoNixlConnector"}'
--kv-transfer-config \ CUDA_VISIBLE_DEVICES=1 python3 routerless/worker.py \
'{"kv_connector":"DynamoNixlConnector"}'
```
In terminal 2:
```
cd /workspace/examples/python_rs/llm/vllm_nixl
CUDA_VISIBLE_DEVICES=1,2 python3 routerless/worker.py \
--remote-prefill \ --remote-prefill \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--enforce-eager \ --enforce-eager \
--block-size 64 \ --kv-transfer-config '{"kv_connector":"DynamoNixlConnector"}'
--tensor-parallel-size 2 \
--kv-transfer-config \
'{"kv_connector":"DynamoNixlConnector"}'
```
### Router-based Deployment
Router-based deployment use kv router to schedule the request to the best decode worker and disaggregated router to decide whether to prefill locally or remotely. The remote prefill requests will be sent to a global prefill queue to balance the prefill load.
For router deployment, the client should hit the endpoint of the processor,
```
llmctl http add chat-models deepseek-ai/DeepSeek-R1-Distill-Llama-8B dynamo-init.process.chat/completions
``` ```
To launch disaggregated vllm deployment, there are four major components: ### kv router
1. Processor
2. KV Router
3. Disaggregated Router
4. Prefill and Decode Workers
#### Processor The KV Router is a component that aggregates KV Events from all the workers and maintains
a prefix tree of the cached tokens. It makes decisions on which worker to route requests
``` to based on the length of the prefix match and the load on the workers.
# Processor must take the same args as the worker There are three steps needed to enable the kv router:
# This is temporary until we communicate the ModelDeploymentCard over etcd 1. Use `--kv-router` in the processor.
# Currently only block-size=64 is supported 2. Use `--kv-router` and `--enable-prefix-caching` in all the (decode) workers.
cd /workspace/examples/python_rs/llm/vllm_nixl 3. Launch the kv router in a separate terminal.
RUST_LOG=info python3 router/processor.py \ ```
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ RUST_LOG=info python3 kv_router.py \
--tokenizer deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --routing-strategy prefix \
--enable-prefix-caching \ --model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--block-size 64 \ --block-size 64 \
--max-model-len 16384 --min-workers 1
``` ```
where `--min-workers` is the number of (decode) workers.
#### KV Router There is also python-based customized router that can be enabled by `--custom-router`.
The KV Router is a component that aggregates KV Events from all the workers and maintains a prefix tree of the cached tokens. It makes decisions on which worker to route requests to based on the length of the prefix match and the load on the workers.
To launch the KV Router, run the following command:
```
RUST_LOG=info python3 router/kv_router.py \
--routing-strategy prefix \
--model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--min-workers 1
```
There is also a custom router that uses a cost function defined in python to make routing decisions. To launch the custom router, run the following command:
```
RUST_LOG=info python3 router/kv_router.py \
--routing-strategy prefix \
--model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--custom-router \
--min-workers 1
```
You can choose only the prefix strategy for now: You can choose only the prefix strategy for now:
- `prefix`: Route requests to the worker that has the longest prefix match. - `prefix`: Route requests to the worker that has the longest prefix match.
### Disaggregated Router
#### Disaggregated Router
The disaggregated router determines whether a request should be send to a The disaggregated router determines whether a request should be send to a
remote prefill engine or a local prefill engine for prefilling based on the remote prefill engine or a local prefill engine for prefilling based on the
...@@ -185,20 +163,39 @@ There are two types of disaggregated router implementations: ...@@ -185,20 +163,39 @@ There are two types of disaggregated router implementations:
kv router as the rust kv router does not report kv cache hit ratio. kv router as the rust kv router does not report kv cache hit ratio.
To use the python disaggregated router, add the following commands when launching To use the python disaggregated router, add the following commands when launching
the decode worker: the decode worker:
```
python worker.py \
--custom-disagg-router \
--max-local-prefill-length <length> \
--max-remote-prefill-cache-hit-ratio <ratio>
```
#### Workers To enable the disaggregated router, add the following commands in the decode workers:
```
python worker.py \
...
--conditional-disagg \
<optional: --custom-disagg-router> \
--max-local-prefill-length <length>
```
### Worker
#### Monolithic
Only kv router is supported for monolithic deployment.
```
CUDA_VISIBLE_DEVICES=0 python3 worker.py \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--enforce-eager \
--block-size 64 \
--max-model-len 16384 \
<optional kv router args: --kv-router --enable-prefix-caching>
```
#### Disaggregated
Kv router and disaggregated router are supported and can be turned on/off individually.
``` ```
# start prefill worker in Terminal 1 # start prefill worker in one terminal
# Note: prefix caching is not supported in the prefill for now # Note: prefix caching is not supported in the prefill for now
cd /workspace/examples/python_rs/llm/vllm_nixl CUDA_VISIBLE_DEVICES=0 python3 prefill_worker.py \
CUDA_VISIBLE_DEVICES=0 python3 router/prefill_worker.py \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--enforce-eager \ --enforce-eager \
--kv-transfer-config '{"kv_connector":"DynamoNixlConnector"}' \ --kv-transfer-config '{"kv_connector":"DynamoNixlConnector"}' \
...@@ -206,25 +203,18 @@ CUDA_VISIBLE_DEVICES=0 python3 router/prefill_worker.py \ ...@@ -206,25 +203,18 @@ CUDA_VISIBLE_DEVICES=0 python3 router/prefill_worker.py \
--max-num-batched-tokens 16384 \ --max-num-batched-tokens 16384 \
--max-model-len 16384 --max-model-len 16384
# start decode worker in Terminal 2 # start decode worker in another terminal
cd /workspace/examples/python_rs/llm/vllm_nixl CUDA_VISIBLE_DEVICES=1 python3 worker.py \
CUDA_VISIBLE_DEVICES=1 python3 router/worker.py \
--remote-prefill \ --remote-prefill \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--enforce-eager \ --enforce-eager \
--tensor-parallel-size 1 \ --tensor-parallel-size 1 \
--kv-transfer-config '{"kv_connector":"DynamoNixlConnector"}' \ --kv-transfer-config '{"kv_connector":"DynamoNixlConnector"}' \
--enable-prefix-caching \
--block-size 64 \ --block-size 64 \
--max-num-batched-tokens 16384 \ --max-num-batched-tokens 16384 \
--max-model-len 16384 --max-model-len 16384 \
``` <optional kv router args: --kv-router --enable-prefix-caching>
<optional disaggregated router args: --conditional-disagg --custom-disagg-router --max-local-prefill-length <length>>
Alternatively, we also provide a script to launch all workers in one go (with the python customized router):
```
# this TODO: change to dynamo-deploy functionality
./start_single_node.sh
# Usage [--model <model>] [--p_tensor_parallel_size <size>] [--d_tensor_parallel_size <size>] [--max_model_len <len>] [--max_num_batched_tokens <tokens>] [--max_num_seqs <seqs>] [--gpu_memory_utilization <utilization>] [--enable_chunked_prefill <True/False>] [--num_p <p>] [--num_d <d>]
``` ```
### Common Issues ### Common Issues
...@@ -294,7 +284,6 @@ pkill -9 -f python ...@@ -294,7 +284,6 @@ pkill -9 -f python
- [ ] Add etcd for discovery - [ ] Add etcd for discovery
- [ ] Multi-node deployment support - [ ] Multi-node deployment support
- [ ] Enable chunked prefill - [ ] Enable chunked prefill
- [ ] Support mixed tp
- [ ] Process many remote prefill in one iteration - [ ] Process many remote prefill in one iteration
- [ ] Support recompute preemption - [ ] Support recompute preemption
- [ ] Make sure decode does not preempt blocks before xfer finishes - [ ] Make sure decode does not preempt blocks before xfer finishes
...@@ -304,6 +293,7 @@ pkill -9 -f python ...@@ -304,6 +293,7 @@ pkill -9 -f python
- [ ] Support pp > 1 - [ ] Support pp > 1
- [ ] Check why adding extra seed input is crashing vllm with remote prefill - [ ] Check why adding extra seed input is crashing vllm with remote prefill
- [ ] Unified worker for both prefill and decode - [ ] Unified worker for both prefill and decode
- [x] Support mixed tp
- [x] Require sending two parallel requests to start decode for the first time - [x] Require sending two parallel requests to start decode for the first time
- [x] Concurrency > 2 is not working - [x] Concurrency > 2 is not working
- [x] Parse cmdline args - [x] Parse cmdline args
......
...@@ -173,13 +173,14 @@ async def worker(runtime: DistributedRuntime, args: Namespace): ...@@ -173,13 +173,14 @@ async def worker(runtime: DistributedRuntime, args: Namespace):
endpoint = router_component.endpoint("generate") endpoint = router_component.endpoint("generate")
if args.custom_router: if args.custom_router:
indexer = KvIndexer(kv_listener) indexer = KvIndexer(kv_listener, args.block_size)
metrics_aggregator = KvMetricsAggregator(kv_listener) metrics_aggregator = KvMetricsAggregator(kv_listener)
await endpoint.serve_endpoint( await endpoint.serve_endpoint(
CustomRouter(indexer, metrics_aggregator).generate CustomRouter(indexer, metrics_aggregator).generate
) )
else: else:
router = KvRouter(runtime, kv_listener) # TODO Read block_size from MDC
router = KvRouter(runtime, kv_listener, args.block_size)
await endpoint.serve_endpoint(Router(router, args.routing_strategy).generate) await endpoint.serve_endpoint(Router(router, args.routing_strategy).generate)
...@@ -208,6 +209,11 @@ if __name__ == "__main__": ...@@ -208,6 +209,11 @@ if __name__ == "__main__":
default="deepseek-ai/DeepSeek-R1-Distill-Llama-8B", default="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
help="Model that is being served", help="Model that is being served",
) )
parser.add_argument(
"--block-size",
type=int,
help="KV block size",
)
parser.add_argument( parser.add_argument(
"--custom-router", "--custom-router",
type=bool, type=bool,
......
...@@ -62,6 +62,7 @@ class Processor(ProcessMixIn): ...@@ -62,6 +62,7 @@ class Processor(ProcessMixIn):
) )
self.router_client = router_client self.router_client = router_client
self.workers_client = workers_client self.workers_client = workers_client
self.router_mode = engine_args.router
def _create_tokenizer(self, engine_args: AsyncEngineArgs) -> AnyTokenizer: def _create_tokenizer(self, engine_args: AsyncEngineArgs) -> AnyTokenizer:
"""Create a TokenizerGroup using engine arguments similar to VLLM's approach""" """Create a TokenizerGroup using engine arguments similar to VLLM's approach"""
...@@ -91,17 +92,36 @@ class Processor(ProcessMixIn): ...@@ -91,17 +92,36 @@ class Processor(ProcessMixIn):
engine_prompt, engine_prompt,
sampling_params, sampling_params,
) = await self._parse_raw_request(raw_request) ) = await self._parse_raw_request(raw_request)
worker_id_generator: AsyncIterator = await self.router_client.generate(
Tokens(tokens=engine_prompt["prompt_token_ids"]).model_dump_json()
)
worker_id = ( if self.router_mode == "kv":
await worker_id_generator.__anext__() worker_id_generator: AsyncIterator = await self.router_client.generate(
) # only one worker id is returned Tokens(tokens=engine_prompt["prompt_token_ids"]).model_dump_json()
worker_id = worker_id.data() )
vllm_logger.info(f"Worker ID: {worker_id}")
if worker_id == "": worker_id = (
await worker_id_generator.__anext__()
) # only one worker id is returned
worker_id = worker_id.data()
vllm_logger.info(f"Worker ID: {worker_id}")
if worker_id == "":
engine_generator = await self.workers_client.random(
vLLMGenerateRequest(
engine_prompt=engine_prompt,
sampling_params=sampling_params,
request_id=request_id,
).model_dump_json()
)
else:
engine_generator = await self.workers_client.direct(
vLLMGenerateRequest(
engine_prompt=engine_prompt,
sampling_params=sampling_params,
request_id=request_id,
).model_dump_json(),
int(worker_id),
)
elif self.router_mode == "random":
engine_generator = await self.workers_client.random( engine_generator = await self.workers_client.random(
vLLMGenerateRequest( vLLMGenerateRequest(
engine_prompt=engine_prompt, engine_prompt=engine_prompt,
...@@ -109,14 +129,13 @@ class Processor(ProcessMixIn): ...@@ -109,14 +129,13 @@ class Processor(ProcessMixIn):
request_id=request_id, request_id=request_id,
).model_dump_json() ).model_dump_json()
) )
else: elif self.router_mode == "round-robin":
engine_generator = await self.workers_client.direct( engine_generator = await self.workers_client.round_robin(
vLLMGenerateRequest( vLLMGenerateRequest(
engine_prompt=engine_prompt, engine_prompt=engine_prompt,
sampling_params=sampling_params, sampling_params=sampling_params,
request_id=request_id, request_id=request_id,
).model_dump_json(), ).model_dump_json()
int(worker_id),
) )
output = self._generate_responses(engine_generator, request_type) output = self._generate_responses(engine_generator, request_type)
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# default values
model=deepseek-ai/DeepSeek-R1-Distill-Llama-8B
p_tensor_parallel_size=1
d_tensor_parallel_size=1
max_model_len=16384
max_num_batched_tokens=16384
max_num_seqs=1024
gpu_memory_utilization=0.9
enable_chunked_prefill=False
block_size=64
num_p=2
num_d=2
total_rank=$((num_p + num_d))
curr_kv_rank=0
# Function to display usage
usage() {
echo "Usage: $0 [--model <model>] [--p_tensor_parallel_size <size>] [--d_tensor_parallel_size <size>] [--max_model_len <len>] [--max_num_batched_tokens <tokens>] [--max_num_seqs <seqs>] [--gpu_memory_utilization <utilization>] [--enable_chunked_prefill <True/False>] [--num_p <p>] [--num_d <d>]"
exit 1
}
# Parse the command-line arguments
while [[ $# -gt 0 ]]; do
case "$1" in
--model)
model="$2"
shift 2
;;
--p_tensor_parallel_size)
p_tensor_parallel_size="$2"
shift 2
;;
--d_tensor_parallel_size)
d_tensor_parallel_size="$2"
shift 2
;;
--max_model_len)
max_model_len="$2"
shift 2
;;
--max_num_batched_tokens)
max_num_batched_tokens="$2"
shift 2
;;
--max_num_seqs)
max_num_seqs="$2"
shift 2
;;
--gpu_memory_utilization)
gpu_memory_utilization="$2"
shift 2
;;
--enable_chunked_prefill)
enable_chunked_prefill="$2"
shift 2
;;
--num_p)
num_p="$2"
shift 2
;;
--num_d)
num_d="$2"
shift 2
;;
--total_rank)
total_rank="$2"
shift 2
;;
--curr_kv_rank)
curr_kv_rank="$2"
shift 2
;;
--block_size)
block_size="$2"
shift 2
;;
*)
usage
;;
esac
done
# rank here is GPU rank
curr_rank=0
echo "total rank: "${total_rank}
for (( i=1; i<=num_d; i++ )); do
cuda_devices=$(seq $curr_rank $(($curr_rank + $d_tensor_parallel_size - 1)))
cuda_devices=$(echo $cuda_devices | tr ' ' ',')
echo "starting gpu rank "${cuda_devices}" (decode)"
CUDA_VISIBLE_DEVICES=${cuda_devices} python3 worker.py \
--remote-prefill \
--model ${model} \
--max-model-len ${max_model_len} \
--max-num-batched-tokens ${max_num_batched_tokens} \
--enable-chunked-prefill ${enable_chunked_prefill} \
--gpu-memory-utilization ${gpu_memory_utilization} \
--enforce-eager \
--enable-prefix-caching \
--tensor-parallel-size ${d_tensor_parallel_size} \
--block-size ${block_size} \
--kv-transfer-config '{"kv_connector":"dynamoNixlConnector"}' & disown
curr_rank=$((curr_rank + d_tensor_parallel_size))
curr_kv_rank=$((curr_kv_rank + 1))
done
for (( i=1; i<=num_p; i++ )); do
cuda_devices=$(seq $curr_rank $(($curr_rank + $p_tensor_parallel_size - 1)))
cuda_devices=$(echo $cuda_devices | tr ' ' ',')
echo "starting gpu rank "${cuda_devices}" (prefill)"
CUDA_VISIBLE_DEVICES=${cuda_devices} python3 prefill_worker.py \
--model ${model} \
--max-model-len ${max_model_len} \
--max-num-batched-tokens ${max_num_batched_tokens} \
--enable-chunked-prefill ${enable_chunked_prefill} \
--gpu-memory-utilization ${gpu_memory_utilization} \
--enforce-eager \
--tensor-parallel-size ${p_tensor_parallel_size} \
--block-size ${block_size} \
--kv-transfer-config '{"kv_connector":"dynamoNixlConnector"}' & disown
curr_rank=$((curr_rank + p_tensor_parallel_size))
curr_kv_rank=$((curr_kv_rank + 1))
done
...@@ -20,6 +20,13 @@ from vllm.utils import FlexibleArgumentParser ...@@ -20,6 +20,13 @@ from vllm.utils import FlexibleArgumentParser
def parse_vllm_args() -> AsyncEngineArgs: def parse_vllm_args() -> AsyncEngineArgs:
parser = FlexibleArgumentParser() parser = FlexibleArgumentParser()
parser.add_argument(
"--router",
type=str,
choices=["random", "round-robin", "kv"],
default="random",
help="Router type to use for scheduling requests to workers",
)
parser.add_argument( parser.add_argument(
"--remote-prefill", action="store_true", help="Enable remote prefill" "--remote-prefill", action="store_true", help="Enable remote prefill"
) )
...@@ -49,6 +56,7 @@ def parse_vllm_args() -> AsyncEngineArgs: ...@@ -49,6 +56,7 @@ def parse_vllm_args() -> AsyncEngineArgs:
parser = AsyncEngineArgs.add_cli_args(parser) parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args() args = parser.parse_args()
engine_args = AsyncEngineArgs.from_cli_args(args) engine_args = AsyncEngineArgs.from_cli_args(args)
engine_args.router = args.router
engine_args.remote_prefill = args.remote_prefill engine_args.remote_prefill = args.remote_prefill
engine_args.conditional_disagg = args.conditional_disagg engine_args.conditional_disagg = args.conditional_disagg
engine_args.custom_disagg_router = args.custom_disagg_router engine_args.custom_disagg_router = args.custom_disagg_router
......
...@@ -54,10 +54,6 @@ class RequestHandler: ...@@ -54,10 +54,6 @@ class RequestHandler:
do_remote_prefill # remote prefill is still controlled by the router do_remote_prefill # remote prefill is still controlled by the router
) )
self.disaggregated_router = disaggregated_router self.disaggregated_router = disaggregated_router
if do_remote_prefill:
assert (
disaggregated_router is not None
), "Disaggregated router is required for remote prefill"
self._prefill_queue_nats_server = os.getenv( self._prefill_queue_nats_server = os.getenv(
"NATS_SERVER", "nats://localhost:4222" "NATS_SERVER", "nats://localhost:4222"
...@@ -90,6 +86,7 @@ class RequestHandler: ...@@ -90,6 +86,7 @@ class RequestHandler:
else: else:
# always prefill remotely if no disaggregated router is provided # always prefill remotely if no disaggregated router is provided
disagg_router_decision = True disagg_router_decision = True
if self.do_remote_prefill and disagg_router_decision: if self.do_remote_prefill and disagg_router_decision:
remote_prefill_params = RemotePrefillParams( remote_prefill_params = RemotePrefillParams(
is_remote_prefill=True, is_remote_prefill=True,
...@@ -130,25 +127,29 @@ async def worker(runtime: DistributedRuntime, engine_args: AsyncEngineArgs): ...@@ -130,25 +127,29 @@ async def worker(runtime: DistributedRuntime, engine_args: AsyncEngineArgs):
endpoint = component.endpoint("generate") endpoint = component.endpoint("generate")
prefill_client = ( if engine_args.remote_prefill:
await runtime.namespace("dynamo-init") prefill_client = (
.component("prefill") await runtime.namespace("dynamo-init")
.endpoint("generate") .component("prefill")
.client() .endpoint("generate")
) .client()
)
else:
prefill_client = None
# TODO: do we need these env vars? if engine_args.kv_router:
VLLM_WORKER_ID = endpoint.lease_id() # TODO: do we need these env vars?
os.environ["VLLM_WORKER_ID"] = str(VLLM_WORKER_ID) VLLM_WORKER_ID = endpoint.lease_id()
vllm_logger.info(f"Generate endpoint ID: {VLLM_WORKER_ID}") os.environ["VLLM_WORKER_ID"] = str(VLLM_WORKER_ID)
vllm_logger.info(f"Generate endpoint ID: {VLLM_WORKER_ID}")
VLLM_KV_NAMESPACE = "dynamo-init" VLLM_KV_NAMESPACE = "dynamo-init"
os.environ["VLLM_KV_NAMESPACE"] = str(VLLM_KV_NAMESPACE) os.environ["VLLM_KV_NAMESPACE"] = str(VLLM_KV_NAMESPACE)
VLLM_KV_COMPONENT = "vllm" VLLM_KV_COMPONENT = "vllm"
os.environ["VLLM_KV_COMPONENT"] = str(VLLM_KV_COMPONENT) os.environ["VLLM_KV_COMPONENT"] = str(VLLM_KV_COMPONENT)
metrics_publisher = KvMetricsPublisher() metrics_publisher = KvMetricsPublisher()
async with build_async_engine_client_from_engine_args(engine_args) as engine_client: async with build_async_engine_client_from_engine_args(engine_args) as engine_client:
served_model_name = ( served_model_name = (
...@@ -156,57 +157,66 @@ async def worker(runtime: DistributedRuntime, engine_args: AsyncEngineArgs): ...@@ -156,57 +157,66 @@ async def worker(runtime: DistributedRuntime, engine_args: AsyncEngineArgs):
if engine_args.served_model_name is not None if engine_args.served_model_name is not None
else "vllm" else "vllm"
) )
disaggregated_router = PyDisaggregatedRouter(
runtime,
served_model_name,
custom_disagg_router=engine_args.custom_disagg_router,
max_local_prefill_length=engine_args.max_local_prefill_length,
max_remote_prefill_cache_hit_ratio=engine_args.max_remote_prefill_cache_hit_ratio,
)
engine_client.set_metrics_publisher(metrics_publisher) if engine_args.kv_router:
engine_client.set_metrics_publisher(metrics_publisher)
# Initially send dummy metrics to kick start, # Initially send dummy metrics to kick start,
# vLLM will not update stat until forward pass is triggered # vLLM will not update stat until forward pass is triggered
metrics_publisher.publish( metrics_publisher.publish(
0, 0,
1024, 1024,
0, 0,
1024, 1024,
) )
metadata = engine_client.nixl_metadata if engine_args.remote_prefill:
metadata_store = NixlMetadataStore("dynamo-init", runtime) metadata = engine_client.nixl_metadata
await metadata_store.put(metadata.engine_id, metadata) metadata_store = NixlMetadataStore("dynamo-init", runtime)
await metadata_store.put(metadata.engine_id, metadata)
if engine_args.conditional_disagg:
disaggregated_router = PyDisaggregatedRouter(
runtime,
served_model_name,
custom_disagg_router=engine_args.custom_disagg_router,
max_local_prefill_length=engine_args.max_local_prefill_length,
max_remote_prefill_cache_hit_ratio=engine_args.max_remote_prefill_cache_hit_ratio,
)
else:
disaggregated_router = None
await asyncio.gather( endpoints = [
endpoint.serve_endpoint( endpoint.serve_endpoint(
RequestHandler( RequestHandler(
model_name=served_model_name, model_name=served_model_name,
engine_client=engine_client, engine_client=engine_client,
prefill_client=prefill_client, prefill_client=prefill_client,
do_remote_prefill=True, do_remote_prefill=engine_args.remote_prefill,
disaggregated_router=disaggregated_router, disaggregated_router=disaggregated_router,
).generate ).generate
), )
metrics_publisher.create_endpoint(component), ]
) if engine_args.kv_router:
endpoints.append(metrics_publisher.create_endpoint(component))
await asyncio.gather(*endpoints)
if __name__ == "__main__": if __name__ == "__main__":
uvloop.install() uvloop.install()
engine_args = parse_vllm_args() engine_args = parse_vllm_args()
if engine_args.enable_chunked_prefill is not False: if engine_args.remote_prefill:
print("Chunked prefill is not supported yet, setting to False") if engine_args.enable_chunked_prefill is not False:
engine_args.enable_chunked_prefill = False print("Chunked prefill is not supported yet, setting to False")
engine_args.enable_chunked_prefill = False
if engine_args.preemption_mode != "swap": if engine_args.preemption_mode != "swap":
print("Preemption mode is not supported yet, setting to swap") print("Preemption mode is not supported yet, setting to swap")
engine_args.preemption_mode = "swap" engine_args.preemption_mode = "swap"
if engine_args.pipeline_parallel_size != 1: if engine_args.pipeline_parallel_size != 1:
print("Pipeline parallel size is not supported yet, setting to 1") print("Pipeline parallel size is not supported yet, setting to 1")
engine_args.pipeline_parallel_size = 1 engine_args.pipeline_parallel_size = 1
asyncio.run(worker(engine_args)) asyncio.run(worker(engine_args))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment