# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import time from typing import AsyncIterator, List, Optional, Protocol, Union, runtime_checkable from vllm.config import ModelConfig from vllm.engine.arg_utils import AsyncEngineArgs from vllm.entrypoints.chat_utils import ConversationMessage from vllm.entrypoints.openai.protocol import ( ChatCompletionRequest, CompletionRequest, RequestResponseMetadata, ) from vllm.entrypoints.openai.serving_chat import OpenAIServingChat from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion from vllm.entrypoints.openai.serving_engine import RequestPrompt from vllm.inputs.data import TokensPrompt from vllm.sampling_params import SamplingParams from vllm.transformers_utils.tokenizer import AnyTokenizer @runtime_checkable class ProcessMixInRequired(Protocol): engine_args: AsyncEngineArgs chat_processor: "ChatProcessor | None" completions_processor: "CompletionsProcessor | None" model_config: ModelConfig default_sampling_params: SamplingParams class ProcessMixIn(ProcessMixInRequired): """ Mixin for pre and post processing for vLLM """ engine_args: AsyncEngineArgs chat_processor: "ChatProcessor | None" completions_processor: "CompletionsProcessor | None" model_config: ModelConfig default_sampling_params: SamplingParams def __init__(self): pass def _get_processor( self, raw_request: Union[CompletionRequest, ChatCompletionRequest] ): # Determine the processor type based on the request structure return ( self.chat_processor if isinstance(raw_request, ChatCompletionRequest) else self.completions_processor ) async def _parse_raw_request( self, raw_request: Union[CompletionRequest, ChatCompletionRequest] ): processor = self._get_processor(raw_request) if processor is None: raise RuntimeError("Processor has not been initialized") request = processor.parse_raw_request(raw_request) preprocess_result = await processor.preprocess(raw_request) default_max_tokens = self.model_config.max_model_len - len( preprocess_result.engine_prompt["prompt_token_ids"] ) sampling_params = request.to_sampling_params( default_max_tokens, self.model_config.logits_processor_pattern, self.default_sampling_params, ) return ( request, preprocess_result.conversation, preprocess_result.request_prompt, preprocess_result.engine_prompt, sampling_params, ) async def _stream_response(self, request, generator, request_id, conversation): processor = self._get_processor(request) if processor is None: raise RuntimeError("processor has not been initialized") return processor.stream_response( request, generator, request_id, conversation, ) class PreprocessResult: def __init__( self, conversation: Optional[ConversationMessage], request_prompt: RequestPrompt, engine_prompt: TokensPrompt, ): self.conversation = conversation self.request_prompt = request_prompt self.engine_prompt = engine_prompt class ChatProcessor: def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig): self.tokenizer = tokenizer self.model_config = model_config self.openai_serving = OpenAIServingChat( engine_client=None, model_config=model_config, models=None, request_logger=None, response_role="assistant", chat_template=None, chat_template_content_format="auto", ) def parse_raw_request( self, raw_request: ChatCompletionRequest ) -> ChatCompletionRequest: return ChatCompletionRequest.parse_obj(raw_request) async def preprocess(self, raw_request: ChatCompletionRequest) -> PreprocessResult: request = self.parse_raw_request(raw_request) # TODO: Revisit this later when adding multi-modal support for the frontend. # If no chat template is provided and tokenizer doesn't have one, # use a simple format that just concatenates messages if not request.chat_template and not self.tokenizer.chat_template: chat_template = "{% for message in messages %}{% if message['role'] == 'user' %}User: {{ message['content'] }}\n{% elif message['role'] == 'assistant' %}Assistant: {{ message['content'] }}\n{% endif %}{% endfor %}Assistant:" else: chat_template = request.chat_template or self.tokenizer.chat_template ( conversation, request_prompts, engine_prompts, ) = await self.openai_serving._preprocess_chat( request, self.tokenizer, request.messages, chat_template=chat_template, chat_template_content_format=self.openai_serving.chat_template_content_format, add_generation_prompt=request.add_generation_prompt, continue_final_message=request.continue_final_message, tool_dicts=None, documents=request.documents, chat_template_kwargs=request.chat_template_kwargs, tool_parser=self.openai_serving.tool_parser, truncate_prompt_tokens=request.truncate_prompt_tokens, add_special_tokens=request.add_special_tokens, ) return PreprocessResult(conversation[0], request_prompts[0], engine_prompts[0]) async def stream_response( self, request: ChatCompletionRequest, result_generator: AsyncIterator, request_id: str, conversation: List, ): request_metadata = RequestResponseMetadata(request_id=request_id) if request.stream: # Handle streaming response num_output_text_so_far = 0 async for raw_response in self.openai_serving.chat_completion_stream_generator( request, result_generator, request_id, request.model, conversation, self.tokenizer, request_metadata, enable_force_include_usage=False, ): if raw_response.startswith("data: [DONE]"): yield raw_response break # Parse the response response = json.loads(raw_response.lstrip("data: ")) # Process delta content to extract only new text if "choices" in response and len(response["choices"]) > 0: if "delta" in response["choices"][0]: content = response["choices"][0]["delta"].get("content", "") if content: # Extract only the new part from the full content new_content = content[num_output_text_so_far:] response["choices"][0]["delta"]["content"] = new_content num_output_text_so_far = len(content) # Yield the processed response yield f"data: {json.dumps(response)}\n\n" else: # Handle non-streaming response # Collect all chunks into a single response full_response = None num_output_text_so_far = 0 async for raw_response in self.openai_serving.chat_completion_stream_generator( request, result_generator, request_id, request.model, conversation, self.tokenizer, request_metadata, enable_force_include_usage=False, ): if raw_response.startswith("data: [DONE]"): break response = json.loads(raw_response.lstrip("data: ")) if full_response is None: # Initialize the full response structure full_response = { "id": response.get("id", ""), "object": "chat.completion", "created": int(time.time()), "model": request.model, "choices": [ { "index": response.get("index", 0), "message": {"role": "assistant", "content": ""}, "finish_reason": None, } ], } # Concatenate content if it exists. Each delta contains the full text so far. if "choices" in response and len(response["choices"]) > 0: if "delta" in response["choices"][0]: content = response["choices"][0]["delta"].get("content", "") if content: # Extract only the new part from the full content new_content = content[num_output_text_so_far:] full_response["choices"][0]["message"][ "content" ] += new_content num_output_text_so_far = len(content) # Update finish reason if present if "finish_reason" in response["choices"][0]: full_response["choices"][0]["finish_reason"] = response[ "choices" ][0]["finish_reason"] if full_response is not None: yield json.dumps(full_response) class CompletionsProcessor: def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig): self.tokenizer = tokenizer self.model_config = model_config self.openai_serving = OpenAIServingCompletion( engine_client=None, model_config=model_config, models=None, request_logger=None, ) def parse_raw_request(self, raw_request: CompletionRequest) -> CompletionRequest: return CompletionRequest.parse_obj(raw_request) async def preprocess(self, raw_request: CompletionRequest) -> PreprocessResult: request = self.parse_raw_request(raw_request) ( request_prompts, engine_prompts, ) = await self.openai_serving._preprocess_completion( request, self.tokenizer, input_or_inputs=request.prompt, truncate_prompt_tokens=request.truncate_prompt_tokens, add_special_tokens=request.add_special_tokens, ) return PreprocessResult(None, request_prompts[0], engine_prompts[0]) async def stream_response( self, request: CompletionRequest, result_generator: AsyncIterator, request_id: str, conversation: Optional[List[ConversationMessage]] = None, ): request_metadata = RequestResponseMetadata(request_id=request_id) if not request.stream: raise ValueError("Only streaming responses are supported") async for raw_response in self.openai_serving.completion_stream_generator( request, result_generator, request_id, int(time.time()), # created_time request.model, 1, # num_prompts self.tokenizer, request_metadata, ): if raw_response.startswith("data: [DONE]"): break response = json.loads(raw_response.lstrip("data: ")) yield response