Unverified Commit cda10fa3 authored by jennyyyyzhen's avatar jennyyyyzhen Committed by GitHub
Browse files

[Multi Modal] Add an env var for message queue max chunk bytes (#19242)


Signed-off-by: default avataryZhen <yZhen@fb.com>
Co-authored-by: default avataryZhen <yZhen@fb.com>
parent c123bc33
......@@ -123,6 +123,7 @@ if TYPE_CHECKING:
VLLM_MAX_TOKENS_PER_EXPERT_FP4_MOE: int = 163840
VLLM_TOOL_PARSE_REGEX_TIMEOUT_SECONDS: int = 1
VLLM_SLEEP_WHEN_IDLE: bool = False
VLLM_MQ_MAX_CHUNK_BYTES_MB: int = 16
def get_default_cache_root():
......@@ -847,6 +848,12 @@ environment_variables: dict[str, Callable[[], Any]] = {
# latency penalty when a request eventually comes.
"VLLM_SLEEP_WHEN_IDLE":
lambda: bool(int(os.getenv("VLLM_SLEEP_WHEN_IDLE", "0"))),
# Control the max chunk bytes (in MB) for the rpc message queue.
# Object larger than this threshold will be broadcast to worker
# processes via zmq.
"VLLM_MQ_MAX_CHUNK_BYTES_MB":
lambda: int(os.getenv("VLLM_MQ_MAX_CHUNK_BYTES_MB", "16")),
}
# --8<-- [end:env-vars-definition]
......
......@@ -20,6 +20,7 @@ from typing import Any, Callable, Optional, Union, cast
import cloudpickle
import vllm.envs as envs
from vllm.config import VllmConfig
from vllm.distributed import (destroy_distributed_environment,
destroy_model_parallel)
......@@ -72,7 +73,10 @@ class MultiprocExecutor(Executor):
# Initialize worker and set up message queues for SchedulerOutputs
# and ModelRunnerOutputs
self.rpc_broadcast_mq = MessageQueue(self.world_size, self.world_size)
max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
self.rpc_broadcast_mq = MessageQueue(self.world_size,
self.world_size,
max_chunk_bytes=max_chunk_bytes)
scheduler_output_handle = self.rpc_broadcast_mq.export_handle()
# Create workers
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment