Commit efe82b86 authored by Alec's avatar Alec Committed by GitHub
Browse files

feat: make vllm baseline support both chat and completions (#70)

parent 5944dbed
......@@ -63,7 +63,8 @@ By default the server will run on port 8080.
Add model to the server:
```bash
llmctl http add chat-models deepseek-ai/DeepSeek-R1-Distill-Llama-8B dynamo.vllm.generate
llmctl http add chat deepseek-ai/DeepSeek-R1-Distill-Llama-8B dynamo.vllm.chat/completions
llmctl http add completions deepseek-ai/DeepSeek-R1-Distill-Llama-8B dynamo.vllm.completions
```
##### Example Output
......
......@@ -13,63 +13,120 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import uuid
import json
from typing import AsyncGenerator, AsyncIterator
import uvloop
from common.base_engine import BaseVllmEngine
from common.chat_processor import ProcessMixIn
from common.parser import parse_vllm_args
from vllm.config import ModelConfig
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.protocol import EngineClient
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
from vllm.entrypoints.openai.protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionStreamResponse,
CompletionRequest,
CompletionResponse,
CompletionStreamResponse,
ErrorResponse,
)
from vllm.logger import logger as vllm_logger
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.entrypoints.openai.serving_models import BaseModelPath, OpenAIServingModels
from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
class VllmEngine(BaseVllmEngine, ProcessMixIn):
"""
Request handler for the generate endpoint
"""
class VllmEngine:
def __init__(
self, engine_client: AsyncIterator[EngineClient], model_config: ModelConfig
):
self.engine_client = engine_client
self.model_config = model_config
# Ensure served_model_name matches the openai model name
# Use --served-model-name to explicitly set this or it will fallback to --model
models = OpenAIServingModels(
engine_client=engine_client,
model_config=model_config,
base_model_paths=[
BaseModelPath(
name=model_config.served_model_name,
model_path=model_config.model,
)
],
)
def __init__(self, engine_args: AsyncEngineArgs):
super().__init__(engine_args)
self.chat_serving = OpenAIServingChat(
engine_client=self.engine_client,
model_config=self.model_config,
models=models,
response_role="assistant",
request_logger=None,
chat_template=None,
chat_template_content_format="auto",
)
self.completion_serving = OpenAIServingCompletion(
engine_client=self.engine_client,
model_config=self.model_config,
models=models,
request_logger=None,
)
@dynamo_endpoint(ChatCompletionRequest, ChatCompletionStreamResponse)
async def generate(self, raw_request):
if self.engine_client is None:
await self.initialize()
vllm_logger.debug(f"Got raw request: {raw_request}")
(
request,
conversation,
_,
engine_prompt,
sampling_params,
) = await self._parse_raw_request(raw_request)
request_id = str(uuid.uuid4())
vllm_logger.debug(
f"Running generate with engine_prompt: {engine_prompt}, sampling_params: {sampling_params}, request_id: {request_id}"
async def generate_chat(self, request):
result = await self.chat_serving.create_chat_completion(request)
if isinstance(result, AsyncGenerator):
async for raw_response in result:
if raw_response.startswith("data: [DONE]"):
break
response = json.loads(raw_response.lstrip("data: "))
yield response
# We should always be streaming so should never get here
elif isinstance(result, ChatCompletionResponse):
raise RuntimeError("ChatCompletionResponse support not implemented")
elif isinstance(result, ErrorResponse):
error = result.dict()
raise RuntimeError(
f"Error {error['code']}: {error['message']} "
f"(type: {error['type']}, param: {error['param']})"
)
if self.engine_client is None:
raise RuntimeError("Engine client not initialized")
else:
generator = self.engine_client.generate(
engine_prompt, sampling_params, request_id
)
raise TypeError(f"Unexpected response type: {type(result)}")
async for response in await self._stream_response(
request, generator, request_id, conversation
):
vllm_logger.debug(f"Generated response: {response}")
@dynamo_endpoint(CompletionRequest, CompletionStreamResponse)
async def generate_completions(self, request):
result = await self.completion_serving.create_completion(request)
if isinstance(result, AsyncGenerator):
async for raw_response in result:
if raw_response.startswith("data: [DONE]"):
break
response = json.loads(raw_response.lstrip("data: "))
yield response
# We should always be streaming so should never get here
elif isinstance(result, CompletionResponse):
raise RuntimeError("CompletionResponse support not implemented")
elif isinstance(result, ErrorResponse):
error = result.dict()
raise RuntimeError(
f"Error {error['code']}: {error['message']} "
f"(type: {error['type']}, param: {error['param']})"
)
else:
raise TypeError(f"Unexpected response type: {type(result)}")
@dynamo_worker()
async def worker(runtime: DistributedRuntime, engine_args: AsyncEngineArgs):
......@@ -80,10 +137,17 @@ async def worker(runtime: DistributedRuntime, engine_args: AsyncEngineArgs):
component = runtime.namespace("dynamo").component("vllm")
await component.create_service()
endpoint = component.endpoint("generate")
chat_endpoint = component.endpoint("chat/completions")
completions_endpoint = component.endpoint("completions")
async with VllmEngine(engine_args) as engine:
await endpoint.serve_endpoint(engine.generate)
async with build_async_engine_client_from_engine_args(engine_args) as engine_client:
model_config = await engine_client.get_model_config()
engine = VllmEngine(engine_client, model_config)
await asyncio.gather(
chat_endpoint.serve_endpoint(engine.generate_chat),
completions_endpoint.serve_endpoint(engine.generate_completions),
)
if __name__ == "__main__":
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# LIMITATIONS:
# - Must use a single GPU for workers as CUDA_VISIBLE_DEVICES is set to a fixed value
# - Must use a single node
set -xe
if [ $# -lt 2 ]; then
echo "Usage: $0 <number_of_workers> <routing_strategy> <log_dir_name> [model_name] [model_args] [chat_endpoint_name] [completions_endpoint_name]"
echo "Error: Must specify at least number of workers and routing strategy and log_dir_name"
echo "Optional: model_name (default: deepseek-ai/DeepSeek-R1-Distill-Llama-8B)"
echo "Optional: model_args (quoted string with model arguments)"
echo "Optional: chat_endpoint_name (default: dynamo.vllm.chat/completions)"
echo "Optional: completions_endpoint_name (default: dynamo.vllm.completions)"
exit 1
fi
# If using Cache, can set this to not check HF
export HF_HUB_OFFLINE=1
export GLOO_SOCKET_IFNAME=lo
# Required for Qwen2.5 R1 Distilled in order to set --block-size 64 and --kv-cache-dtype fp8
uv pip install flashinfer-python -i https://flashinfer.ai/whl/cu124/torch2.5/
export VLLM_ATTENTION_BACKEND=FLASHINFER
NUM_WORKERS=$1
ROUTING_STRATEGY=$2
LOG_DIR_NAME=$3
MODEL_NAME=${4:-"deepseek-ai/DeepSeek-R1-Distill-Llama-8B"}
CUSTOM_MODEL_ARGS=$5
CHAT_ENDPOINT_NAME=${6:-"dynamo.vllm.chat/completions"}
COMPLETIONS_ENDPOINT_NAME=${7:-"dynamo.vllm.completions"}
VALID_STRATEGIES=("random")
SESSION_NAME="v"
WORKDIR="/workspace/examples/python_rs/llm/vllm"
INIT_CMD="source /opt/dynamo/venv/bin/activate && cd $WORKDIR"
# Default model args
DEFAULT_MODEL_ARGS="--model $MODEL_NAME \
--tokenizer $MODEL_NAME \
--enable-prefix-caching \
--block-size 64"
# Use custom model args if provided, otherwise use default
if [ -n "$CUSTOM_MODEL_ARGS" ]; then
MODEL_ARGS="$CUSTOM_MODEL_ARGS"
echo "Using custom model arguments"
else
MODEL_ARGS="$DEFAULT_MODEL_ARGS"
echo "Using default model arguments"
fi
# Create logs directory if it doesn't exist
LOGS_DIR="/logs/$LOG_DIR_NAME"
mkdir -p $LOGS_DIR
chmod -R 775 $LOGS_DIR
if [[ ! " ${VALID_STRATEGIES[@]} " =~ " ${ROUTING_STRATEGY} " ]]; then
echo "Error: Invalid routing strategy. Must be one of: ${VALID_STRATEGIES[*]}"
exit 1
fi
########################################################
# HTTP Server
########################################################
HTTP_CMD="DYN_LOG=DEBUG http |& tee $LOGS_DIR/http.log"
tmux new-session -d -s "$SESSION_NAME-http"
tmux send-keys -t "$SESSION_NAME-http" "$INIT_CMD && $HTTP_CMD" C-m
########################################################
# LLMCTL
########################################################
LLMCTL_CMD="sleep 5 && \
llmctl http remove chat $MODEL_NAME && \
llmctl http remove completions $MODEL_NAME && \
llmctl http add chat $MODEL_NAME $CHAT_ENDPOINT_NAME && \
llmctl http add completions $MODEL_NAME $COMPLETIONS_ENDPOINT_NAME && \
llmctl http list |& tee $LOGS_DIR/llmctl.log"
tmux new-session -d -s "$SESSION_NAME-llmctl"
tmux send-keys -t "$SESSION_NAME-llmctl" "$INIT_CMD && $LLMCTL_CMD" C-m
########################################################
# Workers
########################################################
WORKER_CMD="RUST_LOG=info python3 -m monolith.worker $MODEL_ARGS"
for i in $(seq 1 $NUM_WORKERS); do
tmux new-session -d -s "$SESSION_NAME-$i"
done
for i in $(seq 1 $NUM_WORKERS); do
tmux send-keys -t "$SESSION_NAME-$i" "$INIT_CMD && CUDA_VISIBLE_DEVICES=$((i-1)) $WORKER_CMD |& tee $LOGS_DIR/worker-$i.log" C-m
done
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment