Unverified Commit 6ace6fba authored by Robert Shaw's avatar Robert Shaw Committed by GitHub
Browse files

[V1] `AsyncLLM` Implementation (#9826)


Signed-off-by: default avatarNick Hill <nickhill@us.ibm.com>
Signed-off-by: default avatarrshaw@neuralmagic.com <rshaw@neuralmagic.com>
Signed-off-by: default avatarNick Hill <nhill@redhat.com>
Co-authored-by: default avatarNick Hill <nickhill@us.ibm.com>
Co-authored-by: default avatarVarun Sundar Rabindranath <varun@neuralmagic.com>
Co-authored-by: default avatarNick Hill <nhill@redhat.com>
Co-authored-by: default avatarTyler Michael Smith <tyler@neuralmagic.com>
parent 08f93e74
import asyncio
from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union
from vllm.config import ModelConfig, VllmConfig
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.metrics_types import StatLoggerBase
from vllm.engine.protocol import EngineClient
from vllm.inputs import INPUT_REGISTRY, InputRegistry, PromptType
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.outputs import EmbeddingRequestOutput, RequestOutput
from vllm.pooling_params import PoolingParams
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.async_stream import AsyncStream
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.processor import Processor
from vllm.v1.executor.gpu_executor import GPUExecutor
logger = init_logger(__name__)
class AsyncLLM(EngineClient):
def __init__(
self,
vllm_config: VllmConfig,
executor_class: Type[GPUExecutor],
log_stats: bool,
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
stat_loggers: Optional[Dict[str, StatLoggerBase]] = None,
input_registry: InputRegistry = INPUT_REGISTRY,
use_cached_outputs: bool = False,
log_requests: bool = True,
start_engine_loop: bool = True,
) -> None:
assert start_engine_loop
self.log_requests = log_requests
self.log_stats = log_stats
self.stat_loggers = stat_loggers
self.model_config = vllm_config.model_config
# Tokenizer (+ ensure liveness if running in another process).
self.tokenizer = init_tokenizer_from_configs(
model_config=vllm_config.model_config,
scheduler_config=vllm_config.scheduler_config,
parallel_config=vllm_config.parallel_config,
enable_lora=bool(vllm_config.lora_config))
self.tokenizer.ping()
# Request streams (map of request_id -> AsyncStream).
self.request_streams: Dict[str, AsyncStream] = {}
# List of cancelled request ids to be aborted.
self.client_aborted_requests: List[str] = []
# Processor (converts Inputs --> EngineCoreRequests).
self.processor = Processor(vllm_config.model_config,
vllm_config.lora_config, self.tokenizer,
input_registry)
# Detokenizer (converts EngineCoreOutputs --> RequestOutput).
self.detokenizer = Detokenizer(vllm_config.model_config.tokenizer)
# EngineCore (starts the engine in background process).
self.engine_core = EngineCoreClient.make_client(
vllm_config=vllm_config,
executor_class=executor_class,
usage_context=usage_context,
multiprocess_mode=True,
asyncio_mode=True,
)
self.output_handler = None
def __del__(self):
self.shutdown()
@classmethod
def from_engine_args(
cls,
engine_args: AsyncEngineArgs,
engine_config: Optional[VllmConfig] = None,
start_engine_loop: bool = True,
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
stat_loggers: Optional[Dict[str, StatLoggerBase]] = None,
) -> "AsyncLLMEngine":
"""Create an AsyncLLM from the EngineArgs."""
# Create the engine configs.
if engine_config is None:
vllm_config = engine_args.create_engine_config()
else:
vllm_config = engine_config
executor_class = cls._get_executor_cls(vllm_config)
# Create the AsyncLLM.
return cls(
vllm_config=vllm_config,
executor_class=executor_class,
log_requests=not engine_args.disable_log_requests,
log_stats=not engine_args.disable_log_stats,
start_engine_loop=start_engine_loop,
usage_context=usage_context,
stat_loggers=stat_loggers,
)
def shutdown(self):
"""Shutdown, cleaning up the background proc and IPC."""
self.engine_core.shutdown()
if handler := getattr(self, "output_handler", None):
handler.cancel()
@classmethod
def _get_executor_cls(cls, vllm_config: VllmConfig):
return GPUExecutor
async def add_request(
self,
request_id: str,
prompt: PromptType,
params: Union[SamplingParams, PoolingParams],
arrival_time: Optional[float] = None,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]:
"""Add new request to the AsyncLLM."""
if self.detokenizer.is_request_active(request_id):
raise KeyError(f"Request {request_id} already exists.")
# 1) Create a new AsyncStream for the request.
stream = self._add_request_to_streams(request_id)
# 2) Convert input --> DetokenizerRequest / EngineCoreRequest.
detokenizer_req, engine_core_req = self.processor.process_inputs(
request_id, prompt, params, arrival_time, lora_request,
trace_headers, prompt_adapter_request, priority)
# 3) Add the request to Detokenizer (this process).
self.detokenizer.add_request(detokenizer_req)
# 4) Add the EngineCoreRequest to EngineCore (separate process).
await self.engine_core.add_request_async(engine_core_req)
# 5) Return the generator.
return stream.generator()
# TODO: we should support multiple prompts in one call, as you
# can do with LLM.generate. So that for multi-prompt completion
# requests we don't need to send multiple messages to core proc,
# and so we don't need multiple streams which then get
# re-multiplexed in the API server anyhow.
async def generate(
self,
prompt: PromptType,
sampling_params: SamplingParams,
request_id: str,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> AsyncGenerator[RequestOutput, None]:
"""
Main function called by the API server to kick off a request
* 1) Making an AsyncStream corresponding to the Request.
# 2) Processing the Input.
* 3) Adding the Request to the Detokenizer.
* 4) Adding the Request to the EngineCore (separate process).
A separate output_handler loop runs in a background AsyncIO task,
pulling outputs from EngineCore and putting them into the
per-request AsyncStream.
The caller of generate() iterates the returned AsyncGenerator,
returning the RequestOutput back to the caller.
"""
# We start the output_handler on the first call to generate() so that
# we can call __init__ before the event loop starts, which enables us
# to handle startup failure gracefully in the OpenAI server.
if self.output_handler is None:
self.output_handler = asyncio.create_task(
self._run_output_handler())
async for output in await self.add_request(
request_id,
prompt,
sampling_params,
lora_request=lora_request,
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=priority,
):
yield output
def _finish_stream(self, request_id: str):
stream = self.request_streams.pop(request_id, None)
if stream is not None:
stream.finish()
def _add_request_to_streams(
self,
request_id: str,
) -> AsyncStream:
if request_id in self.request_streams:
raise ValueError(f"Request id {request_id} already running.")
# Avoid streams having circular ref to parent AsyncLLM object.
aborted_reqs = self.client_aborted_requests
stream = AsyncStream(request_id, aborted_reqs.append)
self.request_streams[request_id] = stream
if self.log_requests:
logger.info("Added request %s.", request_id)
return stream
async def _process_cancellations(self) -> None:
"""
Process requests cancelled from user disconnecting.
When a client disconnects, AsyncStream._cancel() is called.
We passed a callback to AsyncStream(), which appends to
self.client_aborted_requests.
As a result, if any requests are canceled from the user side
the request_id will show up in self.client_aborted_requests.
"""
# Avoid streams having circular ref to parent AsyncLLM object.
if not self.client_aborted_requests:
return
reqs_to_abort = self.client_aborted_requests.copy()
self.client_aborted_requests.clear()
# Remove from Detokenizer.
self.detokenizer.abort_requests(reqs_to_abort)
# Remove from RequestStreams.
for request_id in reqs_to_abort:
if self.log_requests:
logger.info("User-cancelled request %s.", request_id)
self._finish_stream(request_id)
# Remove from EngineCore.
await self.engine_core.abort_requests_async(reqs_to_abort)
def _process_request_outputs(self, request_outputs: List[RequestOutput]):
"""Process outputs by putting them into per-request AsyncStreams."""
for request_output in request_outputs:
request_id = request_output.request_id
assert request_id in self.request_streams
# Each request in the API server pulls from the per-request stream.
stream = self.request_streams.get(request_id)
if stream is not None:
stream.put(request_output)
# If finished, remove from the tracker.
if request_output.finished:
if self.log_requests:
logger.info("Finished request %s.", request_id)
self._finish_stream(request_id)
async def _run_output_handler(self):
"""Background loop: pulls from EngineCore and pushes to AsyncStreams."""
try:
while True:
# 1) Pull EngineCoreOutput from the EngineCore.
outputs = await self.engine_core.get_output_async()
# 2) Detokenize based on the output.
request_outputs, reqs_to_abort = self.detokenizer.step(outputs)
# 3) Put the RequestOutputs into the per-request AsyncStreams.
self._process_request_outputs(request_outputs)
# 4) Abort any requests that finished due to stop strings.
await self.engine_core.abort_requests_async(reqs_to_abort)
# 5) Abort any requests due to client cancellations.
await self._process_cancellations()
except BaseException as e:
logger.error(e)
raise e
# TODO: can we eliminate these?
async def abort(self, request_id: str) -> None:
# Note: Who Calls this? I dont think this is actually used.
raise ValueError("Not Supported on V1 yet.")
def encode(
self,
prompt: PromptType,
pooling_params: PoolingParams,
request_id: str,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
priority: int = 0,
):
raise ValueError("Not Supported on V1 yet.")
async def get_model_config(self) -> ModelConfig:
return self.model_config
async def get_decoding_config(self):
raise ValueError("Not Supported on V1 yet.")
async def get_tokenizer(
self,
lora_request: Optional[LoRARequest] = None,
) -> AnyTokenizer:
assert lora_request is None
return self.detokenizer.tokenizer
async def is_tracing_enabled(self) -> bool:
return False
async def do_log_stats(
self,
scheduler_outputs=None,
model_output=None,
) -> None:
logger.debug("Called do_log_stats.")
async def check_health(self) -> None:
logger.debug("Called check_health.")
async def start_profile(self) -> None:
raise ValueError("Not supported on V1 yet.")
async def stop_profile(self) -> None:
raise ValueError("Not supported on V1 yet.")
@property
def is_running(self) -> bool:
return True
@property
def is_stopped(self) -> bool:
return False
@property
def errored(self) -> bool:
return False
@property
def dead_error(self) -> BaseException:
return Exception
# Retain V0 name for backwards compatibility.
AsyncLLMEngine = AsyncLLM
import asyncio
from typing import Any, AsyncGenerator, Callable, Optional, Type, Union
from vllm.outputs import EmbeddingRequestOutput, RequestOutput
class AsyncStream:
"""A stream of RequestOutputs or EmbeddingRequestOutputs for a request
that can be iterated over asynchronously via an async generator."""
STOP_ITERATION = Exception() # Sentinel
def __init__(self, request_id: str, cancel: Callable[[str], None]) -> None:
self.request_id = request_id
self._cancel = cancel
self._queue: asyncio.Queue = asyncio.Queue()
self._finished = False
def put(self, item: Union[RequestOutput, EmbeddingRequestOutput,
Exception]) -> None:
if not self._finished:
self._queue.put_nowait(item)
def finish(
self,
exception: Optional[Union[BaseException, Type[BaseException]]] = None,
) -> None:
if not self._finished:
self._finished = True
self._queue.put_nowait(exception if self._is_raisable(exception)
else AsyncStream.STOP_ITERATION)
async def generator(
self
) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]:
finished = False
try:
while True:
result = await self._queue.get()
if self._is_raisable(result):
finished = True
if result == AsyncStream.STOP_ITERATION:
return
raise result
yield result
finally:
self._finished = True
if not finished:
self._cancel(self.request_id)
@staticmethod
def _is_raisable(value: Any):
return isinstance(value, BaseException) or \
(isinstance(value, type) and \
issubclass(value, BaseException))
import multiprocessing
import queue
import threading
import time
from contextlib import contextmanager
from multiprocessing.process import BaseProcess
from multiprocessing.sharedctypes import Synchronized
from typing import Any, Iterator, List, Tuple, Type, Union
import zmq
import zmq.asyncio
from msgspec import msgpack
from vllm.config import CacheConfig, VllmConfig
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.v1.core.scheduler import Scheduler
from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs,
EngineCoreRequest, EngineCoreRequestType)
from vllm.v1.executor.gpu_executor import GPUExecutor
from vllm.v1.request import Request, RequestStatus
from vllm.version import __version__ as VLLM_VERSION
logger = init_logger(__name__)
POLLING_TIMEOUT_MS = 5000
POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000
LOGGING_TIME_S = 5000
class EngineCore:
"""Inner loop of vLLM's Engine."""
def __init__(
self,
vllm_config: VllmConfig,
executor_class: Type[GPUExecutor],
usage_context: UsageContext,
):
# Override the configs for V1.
# FIXME
if usage_context == UsageContext.LLM_CLASS:
vllm_config.scheduler_config.max_num_seqs = 1024
vllm_config.scheduler_config.max_num_batched_tokens = 8192
elif usage_context == UsageContext.OPENAI_API_SERVER:
vllm_config.scheduler_config.max_num_seqs = 1024
vllm_config.scheduler_config.max_num_batched_tokens = 2048
# TODO (ywang96): Enable APC by default when VLM supports it.
if not vllm_config.model_config.is_multimodal_model:
vllm_config.cache_config.enable_prefix_caching = True
assert vllm_config.model_config.task != "embedding"
logger.info("Initializing an LLM engine (v%s) with config: %s",
VLLM_VERSION, vllm_config)
# Setup Model.
self.model_executor = executor_class(vllm_config)
# Setup KV Caches and update CacheConfig after profiling.
num_gpu_blocks, num_cpu_blocks = self._initialize_kv_caches(
vllm_config.cache_config)
vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks
vllm_config.cache_config.num_cpu_blocks = num_cpu_blocks
# Setup scheduler.
self.scheduler = Scheduler(vllm_config.scheduler_config,
vllm_config.cache_config,
vllm_config.lora_config)
self._last_logging_time = time.time()
def _initialize_kv_caches(self,
cache_config: CacheConfig) -> Tuple[int, int]:
num_gpu_blocks, _ = self.model_executor.determine_num_available_blocks(
)
if cache_config.num_gpu_blocks_override is not None:
num_gpu_blocks_override = cache_config.num_gpu_blocks_override
logger.info(
"Overriding num_gpu_blocks=%d with "
"num_gpu_blocks_override=%d", num_gpu_blocks,
num_gpu_blocks_override)
num_gpu_blocks = num_gpu_blocks_override
num_cpu_blocks = 0
self.model_executor.initialize_cache(num_gpu_blocks)
return num_gpu_blocks, num_cpu_blocks
def add_request(self, request: EngineCoreRequest):
"""Add request to the scheduler."""
req = Request.from_engine_core_request(request)
self.scheduler.add_request(req)
def abort_requests(self, request_ids: List[str]):
"""Abort requests from the scheduler."""
# TODO: The scheduler doesn't really need to know the
# specific finish reason, TBD whether we propagate that
# (i.e. client-aborted vs stop criteria met).
self.scheduler.finish_requests(request_ids,
RequestStatus.FINISHED_ABORTED)
def step(self) -> List[EngineCoreOutput]:
"""Schedule, execute, and make output."""
if not self.scheduler.has_unfinished_requests():
return []
scheduler_output = self.scheduler.schedule()
output = self.model_executor.execute_model(scheduler_output)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, output)
return engine_core_outputs
class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""
READY_STR = "READY"
def __init__(
self,
vllm_config: VllmConfig,
executor_class: Type[GPUExecutor],
usage_context: UsageContext,
input_path: str,
output_path: str,
ready_path: str,
should_shutdown: Synchronized,
):
super().__init__(vllm_config, executor_class, usage_context)
# Signal from main process to shutdown (multiprocessing.Value).
self.should_shutdown = should_shutdown
# Background Threads and Queues for IO. These enable us to
# overlap ZMQ socket IO with GPU since they release the GIL,
# and to overlap some serialization/deserialization with the
# model forward pass.
# Threads handle Socket <-> Queues and core_busy_loop uses Queue.
self.input_queue = queue.Queue()
self.output_queue = queue.Queue()
threading.Thread(target=self.process_input_socket,
args=(input_path, ),
daemon=True).start()
threading.Thread(target=self.process_output_socket,
args=(output_path, ),
daemon=True).start()
# Send Readiness signal to EngineClient.
with self.make_socket(ready_path, zmq.constants.PUSH) as ready_socket:
ready_socket.send_string(EngineCoreProc.READY_STR)
@contextmanager
def make_socket(self, path: str, type: Any) -> Iterator[zmq.Socket]:
"""Context manager for use """
ctx = zmq.Context()
try:
socket = ctx.socket(type)
if type == zmq.constants.PULL:
socket.connect(path)
elif type == zmq.constants.PUSH:
socket.bind(path)
else:
raise ValueError(f"Unknown Socket Type: {type}")
yield socket
except KeyboardInterrupt:
logger.debug("EngineCore had Keyboard Interrupt.")
finally:
ctx.destroy(linger=0)
@staticmethod
def wait_for_startup(
proc: BaseProcess,
ready_path: str,
) -> None:
"""Wait until the EngineCore is ready."""
try:
sync_ctx = zmq.Context() # type: ignore[attr-defined]
socket = sync_ctx.socket(zmq.constants.PULL)
socket.connect(ready_path)
# Wait for EngineCore to send EngineCoreProc.READY_STR.
while socket.poll(timeout=POLLING_TIMEOUT_MS) == 0:
logger.debug("Waiting for EngineCoreProc to startup.")
if not proc.is_alive():
raise RuntimeError("EngineCoreProc failed to start.")
message = socket.recv_string()
assert message == EngineCoreProc.READY_STR
except BaseException as e:
logger.exception(e)
raise e
finally:
sync_ctx.destroy(linger=0)
@staticmethod
def make_engine_core_process(
vllm_config: VllmConfig,
executor_class: Type[GPUExecutor],
usage_context: UsageContext,
input_path: str,
output_path: str,
ready_path: str,
should_shutdown: Synchronized,
) -> BaseProcess:
# The current process might have CUDA context,
# so we need to spawn a new process.
# NOTE(rob): this is a problem for using EngineCoreProc w/
# LLM, since we need a if __name__ == "__main__" guard.
context = multiprocessing.get_context("spawn")
process_kwargs = {
"input_path": input_path,
"output_path": output_path,
"ready_path": ready_path,
"vllm_config": vllm_config,
"executor_class": executor_class,
"usage_context": usage_context,
"should_shutdown": should_shutdown
}
# Run EngineCore busy loop in background process.
proc = context.Process(target=EngineCoreProc.run_engine_core,
kwargs=process_kwargs)
proc.start()
# Wait for startup
EngineCoreProc.wait_for_startup(proc, ready_path)
return proc
@staticmethod
def run_engine_core(*args, **kwargs):
"""Launch EngineCore busy loop in background process."""
try:
engine_core = EngineCoreProc(*args, **kwargs)
engine_core.run_busy_loop()
except KeyboardInterrupt:
logger.debug("EngineCore interrupted.")
except BaseException as e:
logger.exception(e)
raise e
def run_busy_loop(self):
"""Core busy loop of the EngineCore."""
# Loop until we get a shutdown signal.
while not self.should_shutdown:
# 1) Poll the input queue until there is work to do.
if not self.scheduler.has_unfinished_requests():
while True:
try:
req = self.input_queue.get(timeout=POLLING_TIMEOUT_S)
self._handle_client_request(req)
break
except queue.Empty:
self._log_stats()
logger.debug("EngineCore busy loop waiting.")
if self.should_shutdown:
return
# 2) Handle any new client requests (Abort or Add).
while not self.input_queue.empty():
req = self.input_queue.get_nowait()
self._handle_client_request(req)
# 3) Step the engine core.
outputs = self.step()
# 4) Put EngineCoreOutputs into the output queue.
self.output_queue.put_nowait(outputs)
self._log_stats()
def _log_stats(self):
"""Log basic stats every LOGGING_TIME_S"""
now = time.time()
if now - self._last_logging_time > LOGGING_TIME_S:
logger.info(
"RUNNING: %s | WAITING: %s",
len(self.scheduler.running),
len(self.scheduler.waiting),
)
self._last_logging_time = now
def _handle_client_request(
self, request: Union[EngineCoreRequest, List[str]]) -> None:
"""Handle EngineCoreRequest or EngineCoreABORT from Client."""
if isinstance(request, EngineCoreRequest):
self.add_request(request)
else:
# TODO: make an EngineCoreAbort wrapper
assert isinstance(request, list)
self.abort_requests(request)
def process_input_socket(self, input_path: str):
"""Input socket IO thread."""
# Msgpack serialization decoding.
decoder_add_req = msgpack.Decoder(EngineCoreRequest)
decoder_abort_req = msgpack.Decoder(list[str])
with self.make_socket(input_path, zmq.constants.PULL) as socket:
while True:
# (RequestType, RequestData)
type_frame, data_frame = socket.recv_multipart(copy=False)
request_type = type_frame.buffer
request_data = data_frame.buffer
# Deserialize the request data.
if request_type == EngineCoreRequestType.ADD.value:
request = decoder_add_req.decode(request_data)
elif request_type == EngineCoreRequestType.ABORT.value:
request = decoder_abort_req.decode(request_data)
else:
raise ValueError(f"Unknown RequestType: {request_type}")
# Push to input queue for core busy loop.
self.input_queue.put_nowait(request)
def process_output_socket(self, output_path: str):
"""Output socket IO thread."""
# Msgpack serialization encoding.
encoder = msgpack.Encoder()
# Reuse send buffer.
buffer = bytearray()
with self.make_socket(output_path, zmq.constants.PUSH) as socket:
while True:
engine_core_outputs = self.output_queue.get()
outputs = EngineCoreOutputs(outputs=engine_core_outputs)
encoder.encode_into(outputs, buffer)
socket.send_multipart((buffer, ), copy=False)
import multiprocessing
import time
from typing import List, Union
import msgspec
import zmq
import zmq.asyncio
from vllm.logger import init_logger
from vllm.utils import get_open_zmq_ipc_path
from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs,
EngineCoreRequest, EngineCoreRequestType)
from vllm.v1.engine.core import EngineCore, EngineCoreProc
logger = init_logger(__name__)
class EngineCoreClient:
"""
EngineCoreClient: subclasses handle different methods for pushing
and pulling from the EngineCore for asyncio / multiprocessing.
Subclasses:
* InprocClient: In process EngineCore (for V0-style LLMEngine use)
* SyncMPClient: ZMQ + background proc EngineCore (for LLM)
* AsyncMPClient: ZMQ + background proc EngineCore w/ asyncio (for AsyncLLM)
"""
@staticmethod
def make_client(
*args,
multiprocess_mode: bool,
asyncio_mode: bool,
**kwargs,
) -> "EngineCoreClient":
# TODO: support this for debugging purposes.
if asyncio_mode and not multiprocess_mode:
raise NotImplementedError(
"Running EngineCore in asyncio without multiprocessing "
"is not currently supported.")
if multiprocess_mode and asyncio_mode:
return AsyncMPClient(*args, **kwargs)
if multiprocess_mode and not asyncio_mode:
return SyncMPClient(*args, **kwargs)
return InprocClient(*args, **kwargs)
def shutdown(self):
pass
def get_output(self) -> List[EngineCoreOutput]:
raise NotImplementedError
def add_request(self, request: EngineCoreRequest) -> None:
raise NotImplementedError
def abort_requests(self, request_ids: List[str]) -> None:
raise NotImplementedError
async def get_output_async(self) -> List[EngineCoreOutput]:
raise NotImplementedError
async def add_request_async(self, request: EngineCoreRequest) -> None:
raise NotImplementedError
async def abort_requests_async(self, request_ids: List[str]) -> None:
raise NotImplementedError
class InprocClient(EngineCoreClient):
"""
InprocClient: client for in-process EngineCore. Intended
for use in LLMEngine for V0-style add_request() and step()
EngineCore setup in this process (no busy loop).
* pushes EngineCoreRequest directly into the EngineCore
* pulls EngineCoreOutputs by stepping the EngineCore
TODO: support asyncio-mode for debugging.
"""
def __init__(self, *args, **kwargs):
self.engine_core = EngineCore(*args, **kwargs)
def get_output(self) -> List[EngineCoreOutput]:
return self.engine_core.step()
def add_request(self, request: EngineCoreRequest) -> None:
self.engine_core.add_request(request)
def abort_requests(self, request_ids: List[str]) -> None:
self.engine_core.abort_requests(request_ids)
class MPClient(EngineCoreClient):
"""
MPClient: base client for multi-proc EngineCore.
EngineCore runs in a background process busy loop, getting
new EngineCoreRequests and returning EngineCoreOutputs
* pushes EngineCoreRequests via input_socket
* pulls EngineCoreOutputs via output_socket
* AsyncMPClient subclass for AsyncLLM usage
* SyncMPClient subclass for LLM usage
"""
def __init__(
self,
*args,
asyncio_mode: bool,
**kwargs,
):
# Serialization setup.
self.encoder = msgspec.msgpack.Encoder()
self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs)
# ZMQ setup.
self.ctx = (zmq.asyncio.Context() if asyncio_mode else zmq.Context())
# Path for IPC.
ready_path = get_open_zmq_ipc_path()
output_path = get_open_zmq_ipc_path()
input_path = get_open_zmq_ipc_path()
# Get output (EngineCoreOutput) from EngineCore.
self.output_socket = self.ctx.socket(zmq.constants.PULL)
self.output_socket.connect(output_path)
# Send input (EngineCoreRequest) to EngineCore.
self.input_socket = self.ctx.socket(zmq.constants.PUSH)
self.input_socket.bind(input_path)
# Start EngineCore in background process.
self.should_shutdown = multiprocessing.Value('b', False, lock=False)
self.proc = EngineCoreProc.make_engine_core_process(
*args,
input_path=input_path,
output_path=output_path,
ready_path=ready_path,
should_shutdown=self.should_shutdown,
**kwargs,
)
def shutdown(self):
# Send shutdown signal to background process.
self.should_shutdown = True
# Shut down the zmq context.
self.ctx.destroy(linger=0)
# Shutdown the process if needed.
if hasattr(self, "proc") and self.proc.is_alive():
self.proc.terminate()
time.sleep(5)
if self.proc.is_alive():
self.proc.kill()
def __del__(self):
self.shutdown()
class SyncMPClient(MPClient):
"""Synchronous client for multi-proc EngineCore."""
def __init__(self, *args, **kwargs):
super().__init__(*args, asyncio_mode=False, **kwargs)
def get_output(self) -> List[EngineCoreOutput]:
(frame, ) = self.output_socket.recv_multipart(copy=False)
engine_core_outputs = self.decoder.decode(frame.buffer).outputs
return engine_core_outputs
def _send_input(self, request_type: EngineCoreRequestType,
request: Union[EngineCoreRequest, List[str]]) -> None:
# (RequestType, SerializedRequest)
msg = (request_type.value, self.encoder.encode(request))
self.input_socket.send_multipart(msg, copy=False)
def add_request(self, request: EngineCoreRequest) -> None:
self._send_input(EngineCoreRequestType.ADD, request)
def abort_requests(self, request_ids: List[str]) -> None:
self._send_input(EngineCoreRequestType.ABORT, request_ids)
class AsyncMPClient(MPClient):
"""Asyncio-compatible client for multi-proc EngineCore."""
def __init__(self, *args, **kwargs):
super().__init__(*args, asyncio_mode=True, **kwargs)
async def get_output_async(self) -> List[EngineCoreOutput]:
frames = await self.output_socket.recv_multipart(copy=False)
engine_core_outputs = self.decoder.decode(frames[0].buffer).outputs
return engine_core_outputs
async def _send_input(
self, request_type: EngineCoreRequestType,
request: Union[EngineCoreRequest, List[str]]) -> None:
msg = (request_type.value, self.encoder.encode(request))
await self.input_socket.send_multipart(msg, copy=False)
async def add_request_async(self, request: EngineCoreRequest) -> None:
await self._send_input(EngineCoreRequestType.ADD, request)
async def abort_requests_async(self, request_ids: List[str]) -> None:
if len(request_ids) > 0:
await self._send_input(EngineCoreRequestType.ABORT, request_ids)
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple
from vllm.engine.output_processor.stop_checker import StopChecker
from vllm.logger import init_logger
from vllm.outputs import RequestOutput
from vllm.sampling_params import RequestOutputKind
from vllm.transformers_utils.detokenizer_utils import (
AnyTokenizer, convert_prompt_ids_to_tokens, detokenize_incrementally)
from vllm.transformers_utils.tokenizer import get_tokenizer
from vllm.v1.engine import DetokenizerRequest, EngineCoreOutput
logger = init_logger(__name__)
@dataclass
class IncrementalDetokenizer:
# Generation data
output_text: str
tokens: List[str]
token_ids: List[int]
# Stop strings
stop: List[str]
include_stop_str_in_output: bool
# Metadata for incremental detokenization
prefix_offset: int
read_offset: int
# Parameters for detokenization
skip_special_tokens: bool
spaces_between_special_tokens: bool
output_kind: RequestOutputKind
# TODO: Probably decouple these
request_id: str
prompt: Optional[str]
prompt_token_ids: List[int]
# Tokenizer for this request
tokenizer: AnyTokenizer
# Accounting for stop string buffering
stop_buffer_length: int
_last_output_text_offset: int = 0
@property
def output_token_ids(self) -> List[int]:
assert len(self.token_ids) >= len(self.prompt_token_ids)
return self.token_ids[len(self.prompt_token_ids):]
@classmethod
def from_new_request(
cls,
tokenizer: AnyTokenizer,
request: DetokenizerRequest,
) -> "IncrementalDetokenizer":
tokens, prefix_offset, read_offset = convert_prompt_ids_to_tokens(
tokenizer=tokenizer,
prompt_ids=request.prompt_token_ids,
skip_special_tokens=request.skip_special_tokens,
)
stops = request.stop
# Number of chars to hold back when stop strings are to be excluded
# from streamed output.
if stops and not request.include_stop_str_in_output:
stop_buffer_length = max(len(s) for s in stops) - 1
else:
stop_buffer_length = 0
return cls(
output_text="",
tokens=tokens,
# Detokenizer mutates this list, so need a unique copy.
# NOTE(Nick): could we take ownership of it though?
token_ids=request.prompt_token_ids.copy(),
stop=stops,
include_stop_str_in_output=request.include_stop_str_in_output,
prefix_offset=prefix_offset,
read_offset=read_offset,
skip_special_tokens=request.skip_special_tokens,
spaces_between_special_tokens=request.
spaces_between_special_tokens,
output_kind=request.output_kind,
request_id=request.request_id,
prompt=request.prompt,
prompt_token_ids=request.prompt_token_ids,
tokenizer=tokenizer,
stop_buffer_length=stop_buffer_length,
)
def add_tokens(
self,
new_token_ids: List[int],
finish_reason: Optional[str],
stop_reason: Optional[str],
) -> Optional[RequestOutput]:
"""
Update RequestState for the request_id by:
1) Detokenize the new token ids incrementally.
2) Update the RequestOutput with the new text.
"""
# 1) Detokenize the new token ids incrementally.
# TODO(woosuk): This method becomes very inefficient when the number of
# new_token_ids is more than 1. We need to optimize this.
decoded_text = ""
for new_token_id in new_token_ids:
self.token_ids.append(new_token_id)
(new_tokens, new_decoded_token_text, prefix_offset,
read_offset) = detokenize_incrementally(
tokenizer=self.tokenizer,
all_input_ids=self.token_ids,
prev_tokens=self.tokens,
prefix_offset=self.prefix_offset,
read_offset=self.read_offset,
skip_special_tokens=self.skip_special_tokens,
spaces_between_special_tokens=self.
spaces_between_special_tokens,
)
self.tokens.extend(new_tokens)
self.prefix_offset = prefix_offset
self.read_offset = read_offset
self.output_text += new_decoded_token_text
decoded_text += new_decoded_token_text
# 2) Evaluate stop criteria.
if self.stop:
stop = StopChecker.check_stop_strings(
output_text=self.output_text,
new_char_count=len(decoded_text),
stop=self.stop,
include_in_output=self.include_stop_str_in_output,
)
if stop is not None:
stop_str, truncate_to = stop
if truncate_to != -1:
self.output_text = self.output_text[:truncate_to]
finish_reason = "stop" # TODO: use constant
stop_reason = stop_str
# TODO: handle stop_token_ids here too?
# 3) Update the RequestOutput object with the new text.
finished = bool(finish_reason)
if self.output_kind == RequestOutputKind.FINAL_ONLY \
and not finished:
return None
delta = self.output_kind == RequestOutputKind.DELTA
output_text = self._get_next_output_text(finished, delta)
token_ids = new_token_ids if delta else self.output_token_ids
request_output = RequestOutput.new(
self.request_id,
self.prompt,
self.prompt_token_ids,
output_text,
token_ids,
finished,
)
if finished:
completion_output = request_output.outputs[0]
completion_output.finish_reason = finish_reason
completion_output.stop_reason = stop_reason
return request_output
def _get_next_output_text(self, finished: bool, delta: bool) -> str:
"""If delta is True, only new text since the last call to
this method is returned"""
# We return the full output text if the sequence is finished.
buffer_length = 0 if finished else self.stop_buffer_length
if not delta:
return self.output_text[:-buffer_length] if buffer_length else (
self.output_text)
length = len(self.output_text) - buffer_length
last_offset = self._last_output_text_offset
if last_offset < length:
self._last_output_text_offset = length
return self.output_text[last_offset:length]
return ""
class Detokenizer:
def __init__(self, tokenizer_name: str):
# TODO: once we support LoRA, we should should pass the tokenizer
# here. We currently have two copies (this + in the LLMEngine).
self.tokenizer = get_tokenizer(tokenizer_name)
# Request id -> IncrementalDetokenizer
self.request_states: Dict[str, IncrementalDetokenizer] = {}
def is_request_active(self, request_id: str):
return request_id in self.request_states
def get_num_unfinished_requests(self):
return len(self.request_states)
def has_unfinished_requests(self) -> bool:
return len(self.request_states) > 0
def abort_requests(
self,
request_ids: Iterable[str],
) -> None:
"""Remove the request_ids from the Detokenizer."""
for request_id in request_ids:
self.request_states.pop(request_id, None)
def add_request(
self,
request: DetokenizerRequest,
):
"""Add new request to the Detokenizer."""
assert (request.request_id not in self.request_states)
request_state = IncrementalDetokenizer.from_new_request(
self.tokenizer, request)
self.request_states[request.request_id] = request_state
def step(
self, encore_core_outputs: List[EngineCoreOutput]
) -> Tuple[List[RequestOutput], List[str]]:
"""Update state and request the RequestOutputs to the LLMEngine."""
request_outputs: List[RequestOutput] = []
requests_to_abort: List[str] = []
for engine_core_output in encore_core_outputs:
request_id = engine_core_output.request_id
detokenizer = self.request_states.get(request_id)
if detokenizer is None:
# Ignore output for already-aborted request.
continue
# Detokenize and update state.
request_output = detokenizer.add_tokens(
new_token_ids=engine_core_output.new_token_ids,
finish_reason=engine_core_output.finish_reason,
stop_reason=engine_core_output.stop_reason,
)
if request_output is not None:
# Add to RequestOutputs list.
request_outputs.append(request_output)
# Free completed requests.
if request_output.finished:
self.request_states.pop(request_id)
if not engine_core_output.finished:
requests_to_abort.append(request_id)
# Return to EngineClient.
return request_outputs, requests_to_abort
This diff is collapsed.
import time
from typing import Any, Dict, Mapping, Optional, Tuple, Union
from vllm.config import LoRAConfig, ModelConfig
from vllm.inputs import (INPUT_REGISTRY, DecoderOnlyInputs,
EncoderDecoderLLMInputs, InputRegistry, PromptType)
from vllm.inputs.preprocess import InputPreprocessor
from vllm.lora.request import LoRARequest
from vllm.pooling_params import PoolingParams
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.config import try_get_generation_config
from vllm.transformers_utils.tokenizer_group import AnyTokenizer
from vllm.v1.engine import DetokenizerRequest, EngineCoreRequest
class Processor:
def __init__(
self,
model_config: ModelConfig,
lora_config: Optional[LoRAConfig],
tokenizer: AnyTokenizer,
input_registry: InputRegistry = INPUT_REGISTRY,
):
self.model_config = model_config
self.lora_config = lora_config
self.tokenizer = tokenizer
self.generation_config_fields = _load_generation_config_dict(
model_config)
self.input_preprocessor = InputPreprocessor(model_config,
self.tokenizer)
self.input_processor = input_registry.create_input_processor(
model_config)
# TODO: run in an ThreadpoolExecutor or BackgroundProcess.
# This ideally should releases the GIL, so we should not block the
# asyncio loop while this is running.
def process_inputs(
self,
request_id: str,
prompt: PromptType,
params: Union[SamplingParams, PoolingParams],
arrival_time: float,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> Tuple[DetokenizerRequest, EngineCoreRequest]:
# TODO(woosuk): Support embedding mode.
# TODO(woosuk): Check max_logprobs
# TODO(woosuk): Support encoder-decoder models.
if lora_request is not None and not self.lora_config:
raise ValueError(f"Got lora_request {lora_request} but LoRA is "
"not enabled!")
if arrival_time is None:
arrival_time = time.time()
assert priority == 0, "vLLM V1 does not support priority at the moment."
assert trace_headers is None, "vLLM V1 does not support tracing yet."
# Process inputs.
preprocessed_inputs = self.input_preprocessor.preprocess(
prompt,
request_id=request_id,
lora_request=lora_request,
prompt_adapter_request=prompt_adapter_request,
)
processed_inputs = self.input_processor(preprocessed_inputs)
self._validate_model_inputs(processed_inputs)
eos_token_id = self.input_preprocessor.get_eos_token_id(lora_request)
assert isinstance(params, SamplingParams)
# TODO: can we avoid cloning here in multiproc case
sampling_params = params.clone()
sampling_params.update_from_generation_config(
self.generation_config_fields, eos_token_id)
# Make Request for Detokenizer.
detokenizer_request = DetokenizerRequest(
request_id, processed_inputs.get("prompt"),
processed_inputs.get("prompt_token_ids"),
sampling_params.skip_special_tokens,
sampling_params.spaces_between_special_tokens,
sampling_params.output_kind, sampling_params.stop,
sampling_params.include_stop_str_in_output)
# Make Request for EngineCore.
engine_core_request = EngineCoreRequest(
request_id, processed_inputs.get("prompt"),
processed_inputs.get("prompt_token_ids"), sampling_params,
eos_token_id, arrival_time, lora_request)
return detokenizer_request, engine_core_request
def _validate_model_inputs(self, inputs: Union[DecoderOnlyInputs,
EncoderDecoderLLMInputs]):
prompt_ids = inputs.get("prompt_token_ids")
if prompt_ids is None or len(prompt_ids) == 0:
raise ValueError("Prompt cannot be empty")
if self.model_config.is_multimodal_model:
max_prompt_len = self.model_config.max_model_len
if len(prompt_ids) > max_prompt_len:
raise ValueError(
f"The prompt (total length {len(prompt_ids)}) is too long "
f"to fit into the model (context length {max_prompt_len}). "
"Make sure that `max_model_len` is no smaller than the "
"number of text tokens plus multimodal tokens. For image "
"inputs, the number of image tokens depends on the number "
"of images, and possibly their aspect ratios as well.")
def _load_generation_config_dict(model_config: ModelConfig) -> Dict[str, Any]:
config = try_get_generation_config(
model_config.model,
trust_remote_code=model_config.trust_remote_code,
revision=model_config.revision,
)
if config is None:
return {}
return config.to_diff_dict()
import enum import enum
from typing import TYPE_CHECKING, List, Optional, Union from typing import TYPE_CHECKING, List, Optional, Union
from vllm.inputs.data import DecoderOnlyInputs
from vllm.lora.request import LoRARequest from vllm.lora.request import LoRARequest
from vllm.sampling_params import SamplingParams from vllm.sampling_params import SamplingParams
from vllm.sequence import RequestMetrics from vllm.sequence import RequestMetrics
from vllm.v1.engine import EngineCoreRequest
from vllm.v1.utils import ConstantList from vllm.v1.utils import ConstantList
if TYPE_CHECKING: if TYPE_CHECKING:
...@@ -43,9 +45,22 @@ class Request: ...@@ -43,9 +45,22 @@ class Request:
self.num_prompt_tokens = len(self.prompt_token_ids) self.num_prompt_tokens = len(self.prompt_token_ids)
self._output_token_ids: List[int] = [] self._output_token_ids: List[int] = []
self._all_token_ids: List[int] = self.prompt_token_ids.copy() self._all_token_ids: List[int] = self.prompt_token_ids.copy()
self.output_text = ""
self.num_computed_tokens = 0 self.num_computed_tokens = 0
@classmethod
def from_engine_core_request(cls, request: EngineCoreRequest) -> "Request":
return cls(
request_id=request.request_id,
inputs=DecoderOnlyInputs(type="token",
prompt_token_ids=request.prompt_token_ids,
prompt=request.prompt),
sampling_params=request.sampling_params,
eos_token_id=request.eos_token_id,
arrival_time=request.arrival_time,
lora_request=request.lora_request,
)
@property @property
def output_token_ids(self) -> ConstantList[int]: def output_token_ids(self) -> ConstantList[int]:
# Prevent directly appending to the output_token_ids since # Prevent directly appending to the output_token_ids since
......
import multiprocessing
from dataclasses import dataclass
from typing import Dict, List, Optional
import msgspec
import zmq
from msgspec import msgpack
from vllm.transformers_utils.detokenizer_utils import (
convert_prompt_ids_to_tokens, detokenize_incrementally)
from vllm.transformers_utils.tokenizer import get_tokenizer
from vllm.utils import get_open_port
class DetokenizerInputs(msgspec.Struct):
# [num_reqs]
req_ids: List[str]
# A request's prompt token ids is sent to the detokenizer only when
# the request is first detokenized. Otherwise, an empty list is sent.
prompt_token_ids: List[List[int]]
new_token_ids: List[List[int]]
skip_special_tokens: List[bool]
spaces_between_special_tokens: List[bool]
# [num_free_reqs]
free_req_ids: List[str]
class DetokenizerOutputs(msgspec.Struct):
# [num_reqs]
req_ids: List[str]
detokenized_texts: List[str]
# NOTE(woosuk): The number of the output token ids of each request
# at the time of detokenization. The detokenizer returns this to the engine
# because the request state (including the output token ids) is
# asynchronously updated in the engine, while RequestOutput requires the
# output token ids to be consistent with the detokenized text.
num_output_token_ids: List[int]
class Detokenizer:
def __init__(self, tokenizer_name: str, tokenizer_mode: str,
trust_remote_code: bool):
# FIXME(woosuk): Currently, the detokenizer is just a hacky prototype.
# For example, it does not terminate properly. We need to improve this.
self.push_port = get_open_port()
self.pull_port = get_open_port()
# NOTE: The push port of the engine process should be the same as the
# pull port of the detokenizer process. Vice versa.
self.detokenizer = DetokenizerProc(tokenizer_name=tokenizer_name,
tokenizer_mode=tokenizer_mode,
trust_remote_code=trust_remote_code,
push_port=self.pull_port,
pull_port=self.push_port)
self.detokenizer.start()
self.zmq_context = zmq.Context()
self.push_socket = self.zmq_context.socket(zmq.PUSH)
self.push_socket.connect(f"tcp://localhost:{self.push_port}")
self.pull_socket = self.zmq_context.socket(zmq.PULL)
self.pull_socket.connect(f"tcp://localhost:{self.pull_port}")
self.poller = zmq.Poller()
self.poller.register(self.pull_socket, zmq.POLLIN)
self.msgpack_encoder = msgpack.Encoder()
self.msgpack_decoder = msgpack.Decoder(DetokenizerOutputs)
def send(self, inputs: DetokenizerInputs) -> None:
self.push_socket.send(self.msgpack_encoder.encode(inputs),
flags=zmq.NOBLOCK)
def recv(self) -> Optional[DetokenizerOutputs]:
socks = dict(self.poller.poll(timeout=0))
if self.pull_socket in socks and socks[self.pull_socket] == zmq.POLLIN:
msg = self.pull_socket.recv()
return self.msgpack_decoder.decode(msg)
return None
def terminate(self) -> None:
self.detokenizer.kill()
self.detokenizer.join()
class DetokenizerProc(multiprocessing.Process):
def __init__(
self,
tokenizer_name: str,
tokenizer_mode: str,
trust_remote_code: bool,
pull_port: int,
push_port: int,
):
super().__init__()
self.tokenizer_name = tokenizer_name
self.tokenizer_mode = tokenizer_mode
self.trust_remote_code = trust_remote_code
# NOTE: The pull_port of the detokenizer process should be the same as
# the push_port of the engine process. Vice versa.
self.pull_port = pull_port
self.push_port = push_port
def run(self):
# Initialize these objects after the process is forked since they are
# not picklable.
self.msgpack_encoder = msgpack.Encoder()
self.msgpack_decoder = msgpack.Decoder(DetokenizerInputs)
self.tokenizer = get_tokenizer(
tokenizer_name=self.tokenizer_name,
tokenizer_mode=self.tokenizer_mode,
trust_remote_code=self.trust_remote_code)
# req_id -> RequestState
self.request_states: Dict[str, RequestState] = {}
self.zmq_context = zmq.Context()
self.pull_socket = self.zmq_context.socket(zmq.PULL)
self.pull_socket.bind(f"tcp://*:{self.pull_port}")
self.push_socket = self.zmq_context.socket(zmq.PUSH)
self.push_socket.bind(f"tcp://*:{self.push_port}")
while True:
if self.pull_socket.poll(timeout=1000) == 0:
# Nothing to read
continue
message = self.pull_socket.recv()
inputs = self.msgpack_decoder.decode(message)
for req_id in inputs.free_req_ids:
self.free(req_id)
detokenized_texts: List[str] = []
num_output_token_ids: List[int] = []
num_reqs = len(inputs.req_ids)
for i in range(num_reqs):
req_id = inputs.req_ids[i]
if req_id not in self.request_states:
self.add_request(
request_id=req_id,
prompt_token_ids=inputs.prompt_token_ids[i],
skip_special_tokens=inputs.skip_special_tokens[i],
spaces_between_special_tokens=inputs.
spaces_between_special_tokens[i],
)
new_str = self.detokenize(req_id, inputs.new_token_ids[i])
detokenized_texts.append(new_str)
req_state = self.request_states[req_id]
num_output_token_ids.append(
len(req_state.token_ids) - req_state.num_prompt_tokens)
detokenized = DetokenizerOutputs(
req_ids=inputs.req_ids,
detokenized_texts=detokenized_texts,
num_output_token_ids=num_output_token_ids,
)
self.push_socket.send(self.msgpack_encoder.encode(detokenized),
flags=zmq.NOBLOCK)
def add_request(
self,
request_id: str,
prompt_token_ids: List[int],
skip_special_tokens: bool,
spaces_between_special_tokens: bool,
) -> None:
tokens, prefix_offset, read_offset = convert_prompt_ids_to_tokens(
tokenizer=self.tokenizer,
prompt_ids=prompt_token_ids,
skip_special_tokens=skip_special_tokens,
)
self.request_states[request_id] = RequestState(
req_id=request_id,
token_ids=prompt_token_ids,
tokens=tokens,
num_prompt_tokens=len(prompt_token_ids),
prefix_offset=prefix_offset,
read_offset=read_offset,
skip_special_tokens=skip_special_tokens,
spaces_between_special_tokens=spaces_between_special_tokens,
)
def free(self, request_id: str) -> None:
del self.request_states[request_id]
def detokenize(self, request_id: str, new_token_ids: List[int]) -> str:
# TODO(woosuk): This method becomes very inefficient when the number of
# new_token_ids is more than 1. We need to optimize this.
req_state = self.request_states[request_id]
decoded_text = ""
for new_token_id in new_token_ids:
req_state.token_ids.append(new_token_id)
(new_tokens, new_decoded_token_text, prefix_offset,
read_offset) = detokenize_incrementally(
tokenizer=self.tokenizer,
all_input_ids=req_state.token_ids,
prev_tokens=req_state.tokens,
prefix_offset=req_state.prefix_offset,
read_offset=req_state.read_offset,
skip_special_tokens=req_state.skip_special_tokens,
spaces_between_special_tokens=req_state.
spaces_between_special_tokens,
)
req_state.tokens.extend(new_tokens)
req_state.prefix_offset = prefix_offset
req_state.read_offset = read_offset
req_state.output_text += new_decoded_token_text
decoded_text += new_decoded_token_text
return decoded_text
@dataclass
class RequestState:
req_id: str
token_ids: List[int]
tokens: List[str]
num_prompt_tokens: int
prefix_offset: int
read_offset: int
skip_special_tokens: bool
spaces_between_special_tokens: bool
output_text: str = ""
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment