"...ssh:/git@developer.sourcefind.cn:2222/OpenDAS/dynamo.git" did not exist on "9ea3acad1d05d92824a27cccfd961db2be6594eb"
Unverified Commit 35b0ce62 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore(frontend): Remove the multi-processing vllm processor path (#7005)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 8381e28a
...@@ -228,7 +228,7 @@ async def async_main(): ...@@ -228,7 +228,7 @@ async def async_main():
if config.chat_processor == "vllm": if config.chat_processor == "vllm":
assert ( assert (
vllm_flags is not None vllm_flags is not None
), "vllm_flags is required when chat_processor is vllm" ), "vllm_flags is required when chat processor is vllm"
chat_engine_factory = setup_engine_factory( chat_engine_factory = setup_engine_factory(
runtime, router_config, config, vllm_flags runtime, router_config, config, vllm_flags
).chat_engine_factory ).chat_engine_factory
......
...@@ -187,43 +187,6 @@ async def preprocess_chat_request( ...@@ -187,43 +187,6 @@ async def preprocess_chat_request(
) )
def preprocess_chat_request_sync(
request: dict[str, Any] | ChatCompletionRequest,
*,
tokenizer: TokenizerLike,
renderer,
tool_parser_class: type[ToolParser] | None,
) -> PreprocessResult:
"""Sync version of preprocess_chat_request for worker processes."""
(
request_for_sampling,
tool_parser,
chat_template_kwargs,
messages,
chat_params,
) = _prepare_request(
request, tokenizer=tokenizer, tool_parser_class=tool_parser_class
)
_, engine_prompt = renderer.render_messages(messages, chat_params)
if "prompt_token_ids" in engine_prompt:
tokens = list(engine_prompt["prompt_token_ids"])
else:
tokens = tokenizer.encode(
engine_prompt["prompt"],
add_special_tokens=request_for_sampling.add_special_tokens,
)
return PreprocessResult(
request_for_sampling=request_for_sampling,
tool_parser=tool_parser,
chat_template_kwargs=chat_template_kwargs,
engine_prompt=engine_prompt,
prompt_token_ids=tokens,
)
class StreamingPostProcessor: class StreamingPostProcessor:
def __init__( def __init__(
self, self,
......
...@@ -12,9 +12,6 @@ import time ...@@ -12,9 +12,6 @@ import time
import uuid import uuid
from argparse import Namespace from argparse import Namespace
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait as _futures_wait
from dataclasses import dataclass
from typing import Any from typing import Any
from vllm.config import CacheConfig, LoadConfig, ModelConfig, VllmConfig from vllm.config import CacheConfig, LoadConfig, ModelConfig, VllmConfig
...@@ -39,11 +36,7 @@ from dynamo.llm import ( ...@@ -39,11 +36,7 @@ from dynamo.llm import (
) )
from dynamo.runtime import Client, DistributedRuntime from dynamo.runtime import Client, DistributedRuntime
from .prepost import ( from .prepost import StreamingPostProcessor, preprocess_chat_request
StreamingPostProcessor,
preprocess_chat_request,
preprocess_chat_request_sync,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -79,194 +72,6 @@ def map_finish_reason(raw_reason: str | None) -> FinishReason | None: ...@@ -79,194 +72,6 @@ def map_finish_reason(raw_reason: str | None) -> FinishReason | None:
return mapped return mapped
# --- Worker process globals (initialized once per process by _init_worker) ---
_w_input_processor: InputProcessor | None = None
_w_tokenizer: Any = None
_w_tool_parser_class: type[ToolParser] | None = None
class _PreprocessError(Exception):
"""Raised by _preprocess_worker for user-facing errors (e.g., n!=1)."""
def __init__(self, error_dict: dict[str, Any]):
self.error_dict = error_dict
super().__init__(str(error_dict))
@dataclass
class PreprocessWorkerResult:
"""Picklable return value from the preprocess worker."""
dynamo_preproc: dict[str, Any]
tokens: list[int]
vllm_preproc: EngineCoreRequest
sampling_params: SamplingParams
request_for_sampling: Any # ChatCompletionRequest (Pydantic model, picklable)
chat_template_kwargs: dict[str, Any]
def _init_worker(
model_path: str,
tokenizer_mode: str,
config_format: str,
load_format: str,
tool_parser_name: str | None,
) -> None:
"""Initialize a worker process with its own VllmConfig and InputProcessor."""
global _w_input_processor, _w_tokenizer, _w_tool_parser_class
global _w_reasoning_parser_class
model_config = ModelConfig(
model=model_path,
tokenizer_mode=tokenizer_mode,
config_format=config_format,
)
vllm_config = VllmConfig(
model_config=model_config,
load_config=LoadConfig(load_format=load_format),
cache_config=CacheConfig(),
)
_w_input_processor = InputProcessor(vllm_config)
_w_tokenizer = _w_input_processor.get_tokenizer()
if tool_parser_name:
_w_tool_parser_class = ToolParserManager.get_tool_parser(tool_parser_name)
else:
_w_tool_parser_class = None
def _worker_warmup() -> bool:
"""Dummy task to ensure worker process is fully initialized."""
return True
def _preprocess_worker(
request: dict[str, Any],
request_id: str,
model_name: str,
) -> PreprocessWorkerResult:
"""Preprocess a request in a worker process and return a picklable result."""
assert _w_input_processor is not None
pre = preprocess_chat_request_sync(
request,
tokenizer=_w_tokenizer,
renderer=_w_input_processor.renderer,
tool_parser_class=_w_tool_parser_class,
)
request_for_sampling = pre.request_for_sampling
engine_prompt = pre.engine_prompt
tokens = pre.prompt_token_ids
if request_for_sampling.max_completion_tokens is not None:
max_tokens = request_for_sampling.max_completion_tokens
elif request_for_sampling.max_tokens is not None:
max_tokens = request_for_sampling.max_tokens
else:
max_tokens = None
sampling_params = SamplingParams(
output_kind=RequestOutputKind.DELTA,
max_tokens=max_tokens,
)
for k, v in _w_input_processor.generation_config_fields.items():
if hasattr(sampling_params, k):
setattr(sampling_params, k, v)
sampling_fields = (
set(getattr(SamplingParams, "__annotations__", ()))
& set(type(request_for_sampling).model_fields)
) - {"max_tokens", "logprobs", "output_kind"}
for k in sorted(sampling_fields):
v = getattr(request_for_sampling, k, None)
if v is not None:
setattr(sampling_params, k, v)
logprobs = request_for_sampling.logprobs
top_logprobs = request_for_sampling.top_logprobs
if logprobs is True:
sampling_params.logprobs = top_logprobs or 1
elif isinstance(logprobs, int) and not isinstance(logprobs, bool):
sampling_params.logprobs = logprobs
elif top_logprobs not in (None, 0):
sampling_params.logprobs = top_logprobs
prompt_inputs = TokensPrompt(prompt_token_ids=tokens)
if "multi_modal_data" in engine_prompt:
prompt_inputs["multi_modal_data"] = engine_prompt["multi_modal_data"]
if "multi_modal_uuids" in engine_prompt:
prompt_inputs["multi_modal_uuids"] = engine_prompt["multi_modal_uuids"]
if request_for_sampling.cache_salt is not None:
prompt_inputs["cache_salt"] = request_for_sampling.cache_salt
if request_for_sampling.mm_processor_kwargs is not None:
prompt_inputs["mm_processor_kwargs"] = request_for_sampling.mm_processor_kwargs
vllm_preproc: EngineCoreRequest = _w_input_processor.process_inputs(
request_id,
prompt_inputs,
sampling_params,
)
InputProcessor.assign_request_id(vllm_preproc)
sp = vllm_preproc.sampling_params
if sp.n != 1:
raise _PreprocessError(
{
"error": {
"message": (
f"Unsupported value: 'n={sp.n}'. "
"This endpoint currently supports only n=1."
),
"type": "invalid_request_error",
"param": "n",
"code": "unsupported_value",
}
}
)
dynamo_preproc = {
"model": model_name,
"token_ids": tokens,
"stop_conditions": {
"max_tokens": sp.max_tokens,
"stop": sp.stop,
"stop_token_ids": sp.stop_token_ids,
"min_tokens": sp.min_tokens,
"ignore_eos": sp.ignore_eos,
},
"sampling_options": {
"n": sp.n,
"presence_penalty": sp.presence_penalty,
"frequency_penalty": sp.frequency_penalty,
"repetition_penalty": sp.repetition_penalty,
"temperature": sp.temperature,
"top_p": sp.top_p,
"top_k": sp.top_k,
"min_p": sp.min_p,
"seed": sp.seed,
},
"output_options": {
"logprobs": sp.logprobs,
"prompt_logprobs": sp.prompt_logprobs,
"skip_special_tokens": sp.skip_special_tokens,
},
"eos_token_ids": [vllm_preproc.eos_token_id]
if vllm_preproc.eos_token_id is not None
else [],
"annotations": [],
}
return PreprocessWorkerResult(
dynamo_preproc=dynamo_preproc,
tokens=tokens,
vllm_preproc=vllm_preproc,
sampling_params=sampling_params,
request_for_sampling=request_for_sampling,
chat_template_kwargs=pre.chat_template_kwargs,
)
class VllmProcessor: class VllmProcessor:
def __init__( def __init__(
self, self,
...@@ -277,8 +82,6 @@ class VllmProcessor: ...@@ -277,8 +82,6 @@ class VllmProcessor:
tool_parser_class: type[ToolParser] | None, tool_parser_class: type[ToolParser] | None,
reasoning_parser_class: type[ReasoningParser] | None, reasoning_parser_class: type[ReasoningParser] | None,
debug_perf: bool = False, debug_perf: bool = False,
preprocess_pool: ProcessPoolExecutor | None = None,
preprocess_workers: int = 0,
): ):
self.tokenizer = tokenizer self.tokenizer = tokenizer
self.input_processor = input_processor self.input_processor = input_processor
...@@ -288,15 +91,6 @@ class VllmProcessor: ...@@ -288,15 +91,6 @@ class VllmProcessor:
self.tool_parser_class = tool_parser_class self.tool_parser_class = tool_parser_class
self.reasoning_parser_class = reasoning_parser_class self.reasoning_parser_class = reasoning_parser_class
self.debug_perf = debug_perf self.debug_perf = debug_perf
self.preprocess_pool = preprocess_pool
if preprocess_pool is not None:
# Allow a small buffer beyond the worker count so the pool's
# internal queue always has work ready when a worker finishes.
self._worker_semaphore: asyncio.Semaphore | None = asyncio.Semaphore(
preprocess_workers + 2
)
else:
self._worker_semaphore = None
# Ideally we would map NVCreateChatCompletionRequest into Python so it can be type checked, but # Ideally we would map NVCreateChatCompletionRequest into Python so it can be type checked, but
# it has a lot of fields. # it has a lot of fields.
...@@ -306,7 +100,7 @@ class VllmProcessor: ...@@ -306,7 +100,7 @@ class VllmProcessor:
) -> AsyncGenerator[dict[str, Any], None]: ) -> AsyncGenerator[dict[str, Any], None]:
""" """
Run a single request through the engine. Does pre and post processing on this machine, delegates Run a single request through the engine. Does pre and post processing on this machine, delegates
model inference to a worker using the router. model inference to a backend using the router.
""" """
# ** VllmProcessor.generator called: {'messages': [{'role': 'user', 'content': 'What is the capital of Tuvalu?'}], 'model': '/home/grahamk/llms/Qwen3-0.6B', 'max_completion_tokens': 1000, 'stream': False} # ** VllmProcessor.generator called: {'messages': [{'role': 'user', 'content': 'What is the capital of Tuvalu?'}], 'model': '/home/grahamk/llms/Qwen3-0.6B', 'max_completion_tokens': 1000, 'stream': False}
...@@ -319,14 +113,8 @@ class VllmProcessor: ...@@ -319,14 +113,8 @@ class VllmProcessor:
logger.info("[perf] generator enter: active_requests=%d", active) logger.info("[perf] generator enter: active_requests=%d", active)
try: try:
if self.preprocess_pool is None: async for item in self._generator_inner(request):
# Single process yield item
async for item in self._generator_inner(request):
yield item
else:
# Multi process
async for item in self._generator_inner_pool(request):
yield item
finally: finally:
if self.debug_perf: if self.debug_perf:
active = exit_generator() active = exit_generator()
...@@ -525,7 +313,6 @@ class VllmProcessor: ...@@ -525,7 +313,6 @@ class VllmProcessor:
vllm_preproc: EngineCoreRequest, vllm_preproc: EngineCoreRequest,
post: StreamingPostProcessor, post: StreamingPostProcessor,
) -> AsyncGenerator[dict[str, Any], None]: ) -> AsyncGenerator[dict[str, Any], None]:
"""Shared streaming logic for both single-process and pool paths."""
self.output_processor.add_request(vllm_preproc, None) self.output_processor.add_request(vllm_preproc, None)
token_count = 0 token_count = 0
...@@ -632,77 +419,6 @@ class VllmProcessor: ...@@ -632,77 +419,6 @@ class VllmProcessor:
post_proc_total_ms / token_count, post_proc_total_ms / token_count,
) )
async def _generator_inner_pool(
self, request: dict[str, Any]
) -> AsyncGenerator[dict[str, Any], None]:
"""Process a request using the worker pool.
Phase 1: Preprocess in a worker process (semaphore held).
Phase 2: Remote inference via router (no worker held).
Phase 3: Post-process tokens in the main process.
"""
request_id = random_uuid()
# --- Phase 1: Preprocess (semaphore held) ---
try:
assert self._worker_semaphore is not None
async with self._worker_semaphore:
assert self.preprocess_pool is not None
future = self.preprocess_pool.submit(
_preprocess_worker, request, request_id, request["model"]
)
preproc_result: PreprocessWorkerResult = await asyncio.wrap_future(
future
)
# Semaphore + worker released here
except _PreprocessError as exc:
yield exc.error_dict
return
except Exception as exc:
logger.exception("Worker preprocessing failed for request %s", request_id)
yield {
"error": {
"message": f"Worker error: {exc}",
"type": "internal_error",
}
}
return
# --- Between phases: reconstruct main-process objects ---
dynamo_preproc = preproc_result.dynamo_preproc
tokens = preproc_result.tokens
vllm_preproc = preproc_result.vllm_preproc
sampling_params = preproc_result.sampling_params
request_for_sampling = preproc_result.request_for_sampling
tool_parser = None
if (
self.tool_parser_class
and request_for_sampling.tools
and request_for_sampling.tool_choice != "none"
):
tool_parser = self.tool_parser_class(self.tokenizer)
post = StreamingPostProcessor(
tokenizer=self.tokenizer,
request_for_sampling=request_for_sampling,
sampling_params=sampling_params,
prompt_token_ids=tokens,
tool_parser=tool_parser,
reasoning_parser_class=self.reasoning_parser_class,
chat_template_kwargs=preproc_result.chat_template_kwargs,
)
async for item in self._generate_and_stream(
request_id,
request,
dynamo_preproc,
tokens,
vllm_preproc,
post,
):
yield item
class EngineFactory: class EngineFactory:
def __init__( def __init__(
...@@ -713,6 +429,11 @@ class EngineFactory: ...@@ -713,6 +429,11 @@ class EngineFactory:
flags: Namespace, flags: Namespace,
debug_perf: bool = False, debug_perf: bool = False,
): ):
if config.preprocess_workers != 0:
raise RuntimeError(
"preprocess_workers > 0 is not supported by vllm preprocessor"
)
self.runtime = runtime self.runtime = runtime
self.router_config = router_config self.router_config = router_config
self.config = config self.config = config
...@@ -808,46 +529,6 @@ class EngineFactory: ...@@ -808,46 +529,6 @@ class EngineFactory:
router_mode=self.router_config.router_mode router_mode=self.router_config.router_mode
) )
preprocess_pool = None
preprocess_workers = self.config.preprocess_workers
if preprocess_workers > 0:
logger.info(
"Creating preprocess worker pool with %d workers for model %s",
preprocess_workers,
source_path,
)
preprocess_pool = ProcessPoolExecutor(
max_workers=preprocess_workers,
initializer=_init_worker,
initargs=(
source_path,
tokenizer_mode,
config_format,
load_format,
tool_parser_name,
),
)
# Warm up all workers to ensure initialization completes
futures = [
preprocess_pool.submit(_worker_warmup)
for _ in range(preprocess_workers)
]
done, not_done = _futures_wait(futures, timeout=120)
if not_done:
for f in not_done:
f.cancel()
preprocess_pool.shutdown(wait=False, cancel_futures=True)
raise RuntimeError(
"Timed out waiting for preprocess worker pool warmup"
)
try:
for f in done:
f.result() # Raises if initializer failed
except Exception:
preprocess_pool.shutdown(wait=False, cancel_futures=True)
raise
logger.info("Preprocess worker pool ready (%d workers)", preprocess_workers)
gen = VllmProcessor( gen = VllmProcessor(
tokenizer, tokenizer,
input_processor, input_processor,
...@@ -856,8 +537,6 @@ class EngineFactory: ...@@ -856,8 +537,6 @@ class EngineFactory:
tool_parser_class, tool_parser_class,
reasoning_parser_class, reasoning_parser_class,
debug_perf=self.debug_perf, debug_perf=self.debug_perf,
preprocess_pool=preprocess_pool,
preprocess_workers=preprocess_workers,
) )
return PythonAsyncEngine(gen.generator, loop) return PythonAsyncEngine(gen.generator, loop)
...@@ -566,75 +566,3 @@ class TestVllmRendererApi: ...@@ -566,75 +566,3 @@ class TestVllmRendererApi:
"ReasoningParser.is_reasoning_end_streaming signature changed; " "ReasoningParser.is_reasoning_end_streaming signature changed; "
f"expected ['self', 'input_ids', 'delta_ids'], got {end_params}" f"expected ['self', 'input_ids', 'delta_ids'], got {end_params}"
) )
def test_preprocess_worker_result_picklability(self):
"""Verify PreprocessWorkerResult survives pickle round-trip.
_preprocess_worker returns this dataclass via a ProcessPoolExecutor
Future. If any field becomes unpicklable, the pool path breaks.
"""
import pickle
from dynamo.frontend.vllm_processor import PreprocessWorkerResult
result = PreprocessWorkerResult(
dynamo_preproc={
"model": "test-model",
"token_ids": [1, 2, 3],
"stop_conditions": {
"max_tokens": 100,
"stop": [],
"stop_token_ids": [2],
"min_tokens": 0,
"ignore_eos": False,
},
"sampling_options": {
"n": 1,
"presence_penalty": 0.0,
"frequency_penalty": 0.0,
"repetition_penalty": 1.0,
"temperature": 1.0,
"top_p": 1.0,
"top_k": 0,
"min_p": 0.0,
"seed": None,
},
"output_options": {
"logprobs": None,
"prompt_logprobs": None,
"skip_special_tokens": True,
},
"eos_token_ids": [2],
"annotations": [],
},
tokens=[1, 2, 3],
vllm_preproc=EngineCoreRequest(
request_id="test-123",
prompt_token_ids=[1, 2, 3],
mm_features=None,
sampling_params=SamplingParams(),
pooling_params=None,
eos_token_id=2,
arrival_time=0.0,
lora_request=None,
cache_salt=None,
data_parallel_rank=None,
prompt_embeds=None,
client_index=0,
current_wave=0,
priority=0,
trace_headers=None,
),
sampling_params=SamplingParams(),
request_for_sampling={"model": "test-model", "tools": None},
chat_template_kwargs={"reasoning_effort": None},
)
data = pickle.dumps(result)
restored = pickle.loads(data)
assert restored.dynamo_preproc == result.dynamo_preproc
assert restored.tokens == result.tokens
assert restored.vllm_preproc.request_id == "test-123"
assert restored.request_for_sampling == result.request_for_sampling
assert restored.chat_template_kwargs == result.chat_template_kwargs
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment