Unverified Commit e2159c0d authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

fix: add event-plane argument and nats initialization (#5717)

parent eee7ec41
......@@ -300,6 +300,13 @@ def parse_args():
default=os.environ.get("DYN_REQUEST_PLANE", "tcp"),
help="Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
)
parser.add_argument(
"--event-plane",
type=str,
choices=["nats", "zmq"],
default=os.environ.get("DYN_EVENT_PLANE", "nats"),
help="Determines how events are published [nats|zmq]",
)
parser.add_argument(
"--exp-python-factory",
action="store_true",
......@@ -334,7 +341,7 @@ async def async_main():
os.environ.pop("DYN_SYSTEM_PORT", None)
flags = parse_args()
dump_config(flags.dump_config_to, flags)
os.environ["DYN_EVENT_PLANE"] = flags.event_plane
# Warn if DYN_SYSTEM_PORT is set (frontend doesn't use system metrics server)
if os.environ.get("DYN_SYSTEM_PORT"):
logger.warning(
......@@ -351,8 +358,14 @@ async def async_main():
if prefix:
os.environ["DYN_METRICS_PREFIX"] = flags.metrics_prefix
# Enable NATS for KV router mode when kv_events are used (when --no-kv-events is not set)
enable_nats = (flags.router_mode == "kv") and flags.use_kv_events
# NATS is needed when:
# 1. Request plane is NATS, OR
# 2. Event plane is NATS AND KV router mode AND (KV events OR replica sync enabled)
enable_nats = flags.request_plane == "nats" or (
flags.event_plane == "nats"
and flags.router_mode == "kv"
and (flags.use_kv_events or flags.router_replica_sync)
)
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, flags.store_kv, flags.request_plane, enable_nats)
......
......@@ -117,6 +117,13 @@ DYNAMO_ARGS: Dict[str, Dict[str, Any]] = {
"default": os.environ.get("DYN_REQUEST_PLANE", "tcp"),
"help": "Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
},
"event-plane": {
"flags": ["--event-plane"],
"type": str,
"choices": ["nats", "zmq"],
"default": os.environ.get("DYN_EVENT_PLANE", "nats"),
"help": "Determines how events are published [nats|zmq]",
},
"enable-local-indexer": {
"flags": ["--enable-local-indexer"],
"type": str,
......@@ -135,6 +142,7 @@ class DynamoArgs:
migration_limit: int
store_kv: str
request_plane: str
event_plane: str
# tool and reasoning parser options
tool_call_parser: Optional[str] = None
......@@ -550,6 +558,7 @@ async def parse_args(args: list[str]) -> Config:
migration_limit=parsed_args.migration_limit,
store_kv=parsed_args.store_kv,
request_plane=parsed_args.request_plane,
event_plane=parsed_args.event_plane,
tool_call_parser=tool_call_parser,
reasoning_parser=reasoning_parser,
custom_jinja_template=expanded_template_path,
......
......@@ -71,12 +71,22 @@ async def worker():
dump_config(config.dynamo_args.dump_config_to, config)
loop = asyncio.get_running_loop()
# Enable NATS based on use_kv_events flag (derived from kv_events_config)
# Set DYN_EVENT_PLANE environment variable based on config
os.environ["DYN_EVENT_PLANE"] = config.dynamo_args.event_plane
# NATS is needed when:
# 1. Request plane is NATS, OR
# 2. Event plane is NATS AND use_kv_events is True
enable_nats = config.dynamo_args.request_plane == "nats" or (
config.dynamo_args.event_plane == "nats" and config.dynamo_args.use_kv_events
)
runtime = DistributedRuntime(
loop,
config.dynamo_args.store_kv,
config.dynamo_args.request_plane,
config.dynamo_args.use_kv_events,
enable_nats,
)
def signal_handler():
......
......@@ -132,9 +132,18 @@ async def worker():
# Create shutdown event
shutdown_event = asyncio.Event()
# Enable NATS based on use_kv_events flag (derived from publish_events_and_metrics)
# Set DYN_EVENT_PLANE environment variable based on config
os.environ["DYN_EVENT_PLANE"] = config.event_plane
# NATS is needed when:
# 1. Request plane is NATS, OR
# 2. Event plane is NATS AND use_kv_events is True
enable_nats = config.request_plane == "nats" or (
config.event_plane == "nats" and config.use_kv_events
)
runtime = DistributedRuntime(
loop, config.store_kv, config.request_plane, config.use_kv_events
loop, config.store_kv, config.request_plane, enable_nats
)
# Set up signal handler for graceful shutdown
......
......@@ -61,6 +61,7 @@ class Config:
self.dyn_endpoint_types: str = "chat,completions"
self.store_kv: str = ""
self.request_plane: str = ""
self.event_plane: str = ""
self.enable_local_indexer: bool = False
# Whether to enable NATS for KV events (derived from publish_events_and_metrics)
self.use_kv_events: bool = False
......@@ -97,6 +98,7 @@ class Config:
f"custom_jinja_template={self.custom_jinja_template}, "
f"store_kv={self.store_kv}, "
f"request_plane={self.request_plane}, "
f"event_plane={self.event_plane}, "
f"enable_local_indexer={self.enable_local_indexer}, "
f"use_kv_events={self.use_kv_events}"
)
......@@ -333,6 +335,13 @@ def cmd_line_args():
default=os.environ.get("DYN_REQUEST_PLANE", "tcp"),
help="Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
)
parser.add_argument(
"--event-plane",
type=str,
choices=["nats", "zmq"],
default=os.environ.get("DYN_EVENT_PLANE", "nats"),
help="Determines how events are published [nats|zmq]",
)
parser.add_argument(
"--enable-local-indexer",
type=str,
......@@ -402,6 +411,7 @@ def cmd_line_args():
config.dyn_endpoint_types = args.dyn_endpoint_types
config.store_kv = args.store_kv
config.request_plane = args.request_plane
config.event_plane = args.event_plane
config.enable_local_indexer = str(args.enable_local_indexer).lower() == "true"
# Derive use_kv_events from publish_events_and_metrics
config.use_kv_events = config.publish_events_and_metrics
......
......@@ -40,6 +40,7 @@ class Config:
custom_jinja_template: Optional[str] = None
store_kv: str
request_plane: str
event_plane: str
enable_local_indexer: bool = False
# mirror vLLM
......@@ -258,6 +259,13 @@ def parse_args() -> Config:
default=os.environ.get("DYN_REQUEST_PLANE", "tcp"),
help="Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
)
parser.add_argument(
"--event-plane",
type=str,
choices=["nats", "zmq"],
default=os.environ.get("DYN_EVENT_PLANE", "nats"),
help="Determines how events are published [nats|zmq]",
)
parser.add_argument(
"--enable-local-indexer",
type=str,
......@@ -401,6 +409,7 @@ def parse_args() -> Config:
config.ec_consumer_mode = args.ec_consumer_mode
config.store_kv = args.store_kv
config.request_plane = args.request_plane
config.event_plane = args.event_plane
config.enable_local_indexer = args.enable_local_indexer
config.use_vllm_tokenizer = args.use_vllm_tokenizer
# use_kv_events is set later in overwrite_args() based on kv_events_config
......@@ -578,6 +587,7 @@ def overwrite_args(config):
defaults["kv_events_config"] = kv_cfg
# Derive use_kv_events from whether kv_events_config is set AND enable_kv_cache_events is True
config.use_kv_events = kv_cfg is not None and kv_cfg.enable_kv_cache_events
logger.info(
f"Using kv_events_config for publishing vLLM kv events over zmq: {kv_cfg} "
f"(use_kv_events={config.use_kv_events})"
......
......@@ -79,9 +79,18 @@ async def worker():
loop = asyncio.get_running_loop()
overwrite_args(config)
# Enable NATS based on use_kv_events flag (derived from kv_events_config)
# Set DYN_EVENT_PLANE environment variable based on config
os.environ["DYN_EVENT_PLANE"] = config.event_plane
# NATS is needed when:
# 1. Request plane is NATS, OR
# 2. Event plane is NATS AND use_kv_events is True
enable_nats = config.request_plane == "nats" or (
config.event_plane == "nats" and config.use_kv_events
)
runtime = DistributedRuntime(
loop, config.store_kv, config.request_plane, config.use_kv_events
loop, config.store_kv, config.request_plane, enable_nats
)
# Set up signal handler for graceful shutdown
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment