args.py 23.3 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
# SPDX-License-Identifier: Apache-2.0
import argparse
import contextlib
5
import json
6
7
8
9
import logging
import os
import socket
import sys
10
import tempfile
11
12
13
from argparse import Namespace
from dataclasses import dataclass
from enum import Enum
14
from pathlib import Path
15
from typing import Any, Dict, Generator, List, Optional
16

17
import yaml
18
from sglang.srt.server_args import ServerArgs
19
from sglang.srt.server_args_config_parser import ConfigArgumentMerger
20

21
from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
22
from dynamo.common.config_dump import register_encoder
23
from dynamo.llm import fetch_llm
24
from dynamo.runtime.logging import configure_dynamo_logging
25
26
from dynamo.sglang import __version__

27
28
configure_dynamo_logging()

29
30
31
DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
DEFAULT_ENDPOINT = f"dyn://{DYN_NAMESPACE}.backend.generate"

32
33
34
35
36
37
38
39
40
41
42
43
DYNAMO_ARGS: Dict[str, Dict[str, Any]] = {
    "endpoint": {
        "flags": ["--endpoint"],
        "type": str,
        "help": f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Example: {DEFAULT_ENDPOINT}",
    },
    "migration-limit": {
        "flags": ["--migration-limit"],
        "type": int,
        "default": 0,
        "help": "Maximum number of times a request may be migrated to a different engine worker",
    },
44
45
46
47
    "tool-call-parser": {
        "flags": ["--dyn-tool-call-parser"],
        "type": str,
        "default": None,
48
49
        "choices": get_tool_parser_names(),
        "help": "Tool call parser name for the model.",
50
51
52
53
54
    },
    "reasoning-parser": {
        "flags": ["--dyn-reasoning-parser"],
        "type": str,
        "default": None,
55
        "choices": get_reasoning_parser_names(),
56
        "help": "Reasoning parser name for the model. If not specified, no reasoning parsing is performed.",
57
    },
58
59
60
61
62
63
    "custom-jinja-template": {
        "flags": ["--custom-jinja-template"],
        "type": str,
        "default": None,
        "help": "Path to a custom Jinja template file to override the model's default chat template. This template will take precedence over any template found in the model repository. This template will be applied by Dynamo's preprocessor and cannot be used with --use-sglang-tokenizer.",
    },
64
65
66
67
68
69
    "endpoint-types": {
        "flags": ["--dyn-endpoint-types"],
        "type": str,
        "default": "chat,completions",
        "help": "Comma-separated list of endpoint types to enable. Options: 'chat', 'completions'. Default: 'chat,completions'. Use 'completions' for models without chat templates.",
    },
70
71
72
73
    "use-sglang-tokenizer": {
        "flags": ["--use-sglang-tokenizer"],
        "action": "store_true",
        "default": False,
74
        "help": "Use SGLang's tokenizer for pre and post processing. This bypasses Dynamo's preprocessor and only v1/chat/completions will be available through the Dynamo frontend. Cannot be used with --custom-jinja-template.",
75
    },
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
    "multimodal-processor": {
        "flags": ["--multimodal-processor"],
        "action": "store_true",
        "default": False,
        "help": "Run as multimodal processor component for handling multimodal requests",
    },
    "multimodal-encode-worker": {
        "flags": ["--multimodal-encode-worker"],
        "action": "store_true",
        "default": False,
        "help": "Run as multimodal encode worker component for processing images/videos",
    },
    "multimodal-worker": {
        "flags": ["--multimodal-worker"],
        "action": "store_true",
        "default": False,
        "help": "Run as multimodal worker component for LLM inference with multimodal data",
    },
94
95
96
97
98
99
    "embedding-worker": {
        "flags": ["--embedding-worker"],
        "action": "store_true",
        "default": False,
        "help": "Run as embedding worker component (Dynamo flag, also sets SGLang's --is-embedding)",
    },
100
101
102
103
104
105
    "dump-config-to": {
        "flags": ["--dump-config-to"],
        "type": str,
        "default": None,
        "help": "Dump debug config to the specified file path. If not specified, the config will be dumped to stdout at INFO level.",
    },
106
107
108
    "store-kv": {
        "flags": ["--store-kv"],
        "type": str,
109
        "choices": ["etcd", "file", "mem"],
110
        "default": os.environ.get("DYN_STORE_KV", "etcd"),
111
        "help": "Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENDPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
112
    },
113
114
115
116
    "request-plane": {
        "flags": ["--request-plane"],
        "type": str,
        "choices": ["nats", "http", "tcp"],
117
        "default": os.environ.get("DYN_REQUEST_PLANE", "tcp"),
118
119
        "help": "Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
    },
120
121
122
123
124
125
126
    "event-plane": {
        "flags": ["--event-plane"],
        "type": str,
        "choices": ["nats", "zmq"],
        "default": os.environ.get("DYN_EVENT_PLANE", "nats"),
        "help": "Determines how events are published [nats|zmq]",
    },
127
128
129
130
131
132
133
    "enable-local-indexer": {
        "flags": ["--enable-local-indexer"],
        "type": str,
        "choices": ["true", "false"],
        "default": os.environ.get("DYN_LOCAL_INDEXER", "false"),
        "help": "Enable worker-local KV indexer for tracking this worker's own KV cache state (can also be toggled with env var DYN_LOCAL_INDEXER).",
    },
134
135
136
137
138
139
140
141
142
}


@dataclass
class DynamoArgs:
    namespace: str
    component: str
    endpoint: str
    migration_limit: int
143
    store_kv: str
144
    request_plane: str
145
    event_plane: str
146

147
148
149
    # tool and reasoning parser options
    tool_call_parser: Optional[str] = None
    reasoning_parser: Optional[str] = None
150
    custom_jinja_template: Optional[str] = None
151

152
153
154
    # endpoint types to enable
    dyn_endpoint_types: str = "chat,completions"

155
156
157
    # preprocessing options
    use_sglang_tokenizer: bool = False

158
159
160
161
162
    # multimodal options
    multimodal_processor: bool = False
    multimodal_encode_worker: bool = False
    multimodal_worker: bool = False

163
164
    # embedding options
    embedding_worker: bool = False
165
166
167
168

    # diffusion language model options (derived from server_args.dllm_algorithm)
    diffusion_worker: bool = False

169
170
    # config dump options
    dump_config_to: Optional[str] = None
171
172
    # local indexer option
    enable_local_indexer: bool = False
173
174
    # Whether to enable NATS for KV events (derived from server_args.kv_events_config)
    use_kv_events: bool = False
175

176
177
178
179
180
181
182
183

class DisaggregationMode(Enum):
    AGGREGATED = "agg"
    PREFILL = "prefill"
    DECODE = "decode"


class Config:
184
185
    """Combined configuration container for SGLang server and Dynamo args."""

186
187
188
189
190
191
192
193
194
195
196
197
    def __init__(self, server_args: ServerArgs, dynamo_args: DynamoArgs) -> None:
        self.server_args = server_args
        self.dynamo_args = dynamo_args
        self.serving_mode = self._set_serving_strategy()

    def _set_serving_strategy(self):
        if self.server_args.disaggregation_mode == "null":
            return DisaggregationMode.AGGREGATED
        elif self.server_args.disaggregation_mode == "prefill":
            return DisaggregationMode.PREFILL
        elif self.server_args.disaggregation_mode == "decode":
            return DisaggregationMode.DECODE
198
199
        else:
            return DisaggregationMode.AGGREGATED
200
201


202
203
204
205
206
# Register SGLang-specific encoders with the shared system
@register_encoder(Config)
def _preprocess_for_encode_config(
    config: Config,
) -> Dict[str, Any]:  # pyright: ignore[reportUnusedFunction]
207
    """Convert Config object to dictionary for encoding."""
208
209
210
211
212
213
214
215
216
    return {
        "server_args": config.server_args,
        "dynamo_args": config.dynamo_args,
        "serving_mode": config.serving_mode.value
        if config.serving_mode is not None
        else "None",
    }


217
218
219
220
221
def _set_parser(
    sglang_str: Optional[str],
    dynamo_str: Optional[str],
    arg_name: str = "tool-call-parser",
) -> Optional[str]:
222
223
224
225
226
227
228
229
230
231
232
233
234
    """Resolve parser name from SGLang and Dynamo arguments.

    Args:
        sglang_str: Parser value from SGLang argument.
        dynamo_str: Parser value from Dynamo argument.
        arg_name: Name of the parser argument for logging.

    Returns:
        Resolved parser name, preferring Dynamo's value if both set.

    Raises:
        ValueError: If parser name is not valid.
    """
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
    # If both are present, give preference to dynamo_str
    if sglang_str is not None and dynamo_str is not None:
        logging.warning(
            f"--dyn-{arg_name} and --{arg_name} are both set. Giving preference to --dyn-{arg_name}"
        )
        return dynamo_str
    # If dynamo_str is not set, use try to use sglang_str if it matches with the allowed parsers
    elif sglang_str is not None:
        logging.warning(f"--dyn-{arg_name} is not set. Using --{arg_name}.")
        if arg_name == "tool-call-parser" and sglang_str not in get_tool_parser_names():
            raise ValueError(
                f"--{arg_name} is not a valid tool call parser. Valid parsers are: {get_tool_parser_names()}"
            )
        elif (
            arg_name == "reasoning-parser"
            and sglang_str not in get_reasoning_parser_names()
        ):
            raise ValueError(
                f"--{arg_name} is not a valid reasoning parser. Valid parsers are: {get_reasoning_parser_names()}"
            )
        return sglang_str
    else:
        return dynamo_str


260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def _extract_config_section(
    args: List[str], config_path: str, config_key: str
) -> tuple[List[str], str]:
    """
    Extract a section from nested YAML and create temp flat file.

    Args:
        args: CLI arguments list
        config_path: Path to the YAML config file
        config_key: Key to extract from nested YAML

    Returns:
        tuple: (modified args with temp file path, temp file path for cleanup)

    Raises:
        ValueError: If config file not found, key missing, or invalid format
    """
    logging.info(f"Extracting config section '{config_key}' from {config_path}")

    path = Path(config_path)
    if not path.exists():
        raise ValueError(f"Config file not found: {config_path}")

    with open(config_path, "r") as f:
        config_data = yaml.safe_load(f)

    if not isinstance(config_data, dict):
        raise ValueError(
            f"Config file must contain a dictionary, got {type(config_data).__name__}"
        )

    available_keys = list(config_data.keys())
    logging.info(f"Available config keys in {config_path}: {available_keys}")

    if config_key not in config_data:
        raise ValueError(
            f"Config key '{config_key}' not found in {config_path}. "
            f"Available keys: {available_keys}"
        )

    section_data = config_data[config_key]

    if not isinstance(section_data, dict):
        raise ValueError(
            f"Config section '{config_key}' must be a dictionary, got {type(section_data).__name__}"
        )

    temp_fd, temp_path = tempfile.mkstemp(suffix=".yaml", prefix="dynamo_config_")

    try:
        with os.fdopen(temp_fd, "w") as f:
            yaml.dump(section_data, f)
        logging.info(f"Successfully wrote config section '{config_key}' to temp file")
    except Exception:
        os.unlink(temp_path)
        raise

    config_index = args.index("--config")
    args = list(args)
    args[config_index + 1] = temp_path

    return args, temp_path


324
async def parse_args(args: list[str]) -> Config:
325
    """Parse CLI arguments and return combined configuration.
326
    Download the model if necessary.
327
328
329
330
331
332
333
334
335

    Args:
        args: Command-line argument strings.

    Returns:
        Config object with server_args and dynamo_args.

    Raises:
        SystemExit: If arguments are invalid or incompatible.
336
337
338
339
340
341
342
343
344
    """
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--version", action="version", version=f"Dynamo Backend SGLang {__version__}"
    )

    # Dynamo args
    for info in DYNAMO_ARGS.values():
345
346
347
348
349
350
351
352
353
354
355
356
        kwargs = {
            "default": info["default"] if "default" in info else None,
            "help": info["help"],
        }
        if "type" in info:
            kwargs["type"] = info["type"]
        if "choices" in info:
            kwargs["choices"] = info["choices"]
        if "action" in info:
            kwargs["action"] = info["action"]

        parser.add_argument(*info["flags"], **kwargs)
357

358
359
360
361
362
363
364
365
    # Config key argument (for nested configs)
    parser.add_argument(
        "--config-key",
        type=str,
        default=None,
        help="Key to select from nested config file (e.g., 'prefill', 'decode')",
    )

366
367
368
369
    # SGLang args
    bootstrap_port = _reserve_disaggregation_bootstrap_port()
    ServerArgs.add_cli_args(parser)

370
371
372
373
374
375
376
    # Add "gms" to --load-format choices so it passes argparse validation.
    # The actual loader class is set in main.py when load_format == "gms".
    for action in parser._actions:
        if getattr(action, "dest", None) == "load_format" and action.choices:
            action.choices = list(action.choices) + ["gms"]
            break

377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
    # Handle config file if present
    temp_config_file = None  # Track temp file for cleanup
    if "--config" in args:
        # Check if --config-key is also present
        if "--config-key" in args:
            key_index = args.index("--config-key")
            config_key = args[key_index + 1]
            config_index = args.index("--config")
            config_path = args[config_index + 1]

            # Extract nested section to temp file
            args, temp_config_file = _extract_config_section(
                args, config_path, config_key
            )

            # Remove --config-key from args (not recognized by SGLang)
            args = args[:key_index] + args[key_index + 2 :]

395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
        # Merge config file arguments with CLI arguments.
        # ConfigArgumentMerger API changed after SGLang v0.5.7:
        # - New API (post-v0.5.7): accepts parser= for proper store_true detection
        # - Old API (v0.5.7 and earlier): only accepts boolean_actions=
        # We use inspect.signature to detect the API rather than version checking
        # since unreleased builds may have the new API while still reporting v0.5.7.
        # Related upstream issue: https://github.com/sgl-project/sglang/issues/16256
        # Upstream fix PR: https://github.com/sgl-project/sglang/pull/16638
        import inspect

        sig = inspect.signature(ConfigArgumentMerger.__init__)
        if "parser" in sig.parameters:
            config_merger = ConfigArgumentMerger(parser=parser)
        else:
            # Legacy path: extract store_true actions manually
            boolean_actions = [
                action.dest
                for action in parser._actions
                if isinstance(action, argparse._StoreTrueAction)
            ]
            config_merger = ConfigArgumentMerger(boolean_actions=boolean_actions)
416
417
        args = config_merger.merge_config_with_args(args)

418
419
    parsed_args = parser.parse_args(args)

420
421
422
423
424
425
426
    # Clean up temp file if created
    if temp_config_file and os.path.exists(temp_config_file):
        try:
            os.unlink(temp_config_file)
        except Exception:
            logging.warning(f"Failed to clean up temp config file: {temp_config_file}")

427
428
429
430
431
432
433
434
435
    # Auto-set bootstrap port if not provided
    if not any(arg.startswith("--disaggregation-bootstrap-port") for arg in args):
        args_dict = vars(parsed_args)
        args_dict["disaggregation_bootstrap_port"] = bootstrap_port
        parsed_args = Namespace(**args_dict)

    # Dynamo argument processing
    # If an endpoint is provided, validate and use it
    # otherwise fall back to default endpoints
436
    namespace = os.environ.get("DYN_NAMESPACE", "dynamo")
437

438
439
440
441
    # If --embedding-worker is set, also set SGLang's --is-embedding flag
    if parsed_args.embedding_worker:
        parsed_args.is_embedding = True

442
443
    endpoint = parsed_args.endpoint
    if endpoint is None:
444
445
446
        if parsed_args.embedding_worker:
            endpoint = f"dyn://{namespace}.backend.generate"
        elif (
447
448
449
450
            hasattr(parsed_args, "disaggregation_mode")
            and parsed_args.disaggregation_mode == "prefill"
        ):
            endpoint = f"dyn://{namespace}.prefill.generate"
451
452
453
454
455
456
457
458
459
        elif parsed_args.multimodal_processor:
            endpoint = f"dyn://{namespace}.processor.generate"
        elif parsed_args.multimodal_encode_worker:
            endpoint = f"dyn://{namespace}.encoder.generate"
        elif (
            parsed_args.multimodal_worker
            and parsed_args.disaggregation_mode == "prefill"
        ):
            endpoint = f"dyn://{namespace}.prefill.generate"
460
461
462
463
464
465
466
467
468
469
470
471
472
473
        else:
            endpoint = f"dyn://{namespace}.backend.generate"

    # Always parse the endpoint (whether auto-generated or user-provided)
    endpoint_str = endpoint.replace("dyn://", "", 1)
    endpoint_parts = endpoint_str.split(".")
    if len(endpoint_parts) != 3:
        logging.error(
            f"Invalid endpoint format: '{endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
        )
        sys.exit(1)

    parsed_namespace, parsed_component_name, parsed_endpoint_name = endpoint_parts

474
475
476
477
478
479
480
481
482
483
484
    tool_call_parser = _set_parser(
        parsed_args.tool_call_parser,
        parsed_args.dyn_tool_call_parser,
        "tool-call-parser",
    )
    reasoning_parser = _set_parser(
        parsed_args.reasoning_parser,
        parsed_args.dyn_reasoning_parser,
        "reasoning-parser",
    )

485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
    if parsed_args.custom_jinja_template and parsed_args.use_sglang_tokenizer:
        logging.error(
            "Cannot use --custom-jinja-template and --use-sglang-tokenizer together. "
            "--custom-jinja-template requires Dynamo's preprocessor to apply the template, "
            "while --use-sglang-tokenizer bypasses Dynamo's preprocessor entirely."
            "If you want to use the SGLang tokenizer with a custom chat template, "
            "please use the --chat-template argument from SGLang."
        )
        sys.exit(1)

    # Replaces any environment variables or home dir (~) to get absolute path
    expanded_template_path = None
    if parsed_args.custom_jinja_template:
        expanded_template_path = os.path.expandvars(
            os.path.expanduser(parsed_args.custom_jinja_template)
        )
501
502
503
504
505
        # Validate custom Jinja template file exists
        if not os.path.isfile(expanded_template_path):
            raise FileNotFoundError(
                f"Custom Jinja template file not found: {expanded_template_path}"
            )
506

507
    model_path = parsed_args.model_path
508
    # Name the model
509
510
    if not parsed_args.served_model_name:
        parsed_args.served_model_name = model_path
511
512
513
514
515
516
517
    # Download the model if necessary using modelexpress.
    # We don't set `parsed_args.model_path` to the local path fetch_llm returns
    # because sglang will send this to its pipeline-parallel workers, which may
    # not have the local path.
    # sglang will attempt to download the model again, but find it in the HF cache.
    # For non-HF models use a path instead of an HF name, and ensure all workers have
    # that path (ideally via a shared folder).
518
    if not os.path.exists(model_path):
519
        await fetch_llm(model_path)
520

521
522
523
    # TODO: sglang downloads the model in `from_cli_args`, which means we had to
    # fetch_llm (download the model) here, in `parse_args`. `parse_args` should not
    # contain code to download a model, it should only parse the args.
524
525
    server_args = ServerArgs.from_cli_args(parsed_args)

526
527
528
529
530
531
    # Dynamo's streaming handlers expect disjoint output_ids from SGLang (only new
    # tokens since last output), not cumulative tokens. When stream_output=True,
    # SGLang sends disjoint segments which Dynamo passes through directly.
    # Force stream_output=True for optimal streaming performance.
    server_args.stream_output = True

532
533
534
535
536
537
538
539
    if parsed_args.use_sglang_tokenizer:
        logging.info(
            "Using SGLang's built in tokenizer. Setting skip_tokenizer_init to False"
        )
        server_args.skip_tokenizer_init = False
    else:
        logging.info(
            "Using dynamo's built in tokenizer. Setting skip_tokenizer_init to True"
540
541
542
        )
        server_args.skip_tokenizer_init = True

543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
    # Derive use_kv_events from server_args.kv_events_config
    # Check that kv_events_config exists AND publisher is not "null" ("zmq" or any future publishers)
    use_kv_events = False
    if server_args.kv_events_config:
        try:
            kv_cfg = json.loads(server_args.kv_events_config)
            use_kv_events = kv_cfg.get("publisher", "null") != "null"
        except json.JSONDecodeError:
            logging.warning(
                f"Failed to parse kv_events_config: {server_args.kv_events_config}"
            )
    logging.info(
        f"Derived use_kv_events={use_kv_events} from kv_events_config={server_args.kv_events_config}"
    )

558
559
560
    # Auto-detect diffusion worker mode if dllm_algorithm
    diffusion_worker = server_args.dllm_algorithm is not None

561
562
563
564
565
566
567
    dynamo_args = DynamoArgs(
        namespace=parsed_namespace,
        component=parsed_component_name,
        endpoint=parsed_endpoint_name,
        migration_limit=parsed_args.migration_limit,
        store_kv=parsed_args.store_kv,
        request_plane=parsed_args.request_plane,
568
        event_plane=parsed_args.event_plane,
569
570
571
572
573
574
575
576
577
        tool_call_parser=tool_call_parser,
        reasoning_parser=reasoning_parser,
        custom_jinja_template=expanded_template_path,
        dyn_endpoint_types=parsed_args.dyn_endpoint_types,
        use_sglang_tokenizer=parsed_args.use_sglang_tokenizer,
        multimodal_processor=parsed_args.multimodal_processor,
        multimodal_encode_worker=parsed_args.multimodal_encode_worker,
        multimodal_worker=parsed_args.multimodal_worker,
        embedding_worker=parsed_args.embedding_worker,
578
        diffusion_worker=diffusion_worker,
579
580
581
582
583
584
        dump_config_to=parsed_args.dump_config_to,
        enable_local_indexer=str(parsed_args.enable_local_indexer).lower() == "true",
        use_kv_events=use_kv_events,
    )
    logging.debug(f"Dynamo args: {dynamo_args}")

585
586
587
588
    return Config(server_args, dynamo_args)


@contextlib.contextmanager
589
590
591
592
593
594
595
596
def reserve_free_port(host: str = "localhost") -> Generator[int, None, None]:
    """Find and reserve a free port until context exits.

    Args:
        host: Host address to bind to.

    Yields:
        Available port number.
597
598
599
600
601
602
603
604
605
606
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        sock.bind((host, 0))
        _, port = sock.getsockname()
        yield port
    finally:
        sock.close()


607
def parse_endpoint(endpoint: str) -> List[str]:
608
609
610
611
612
613
614
615
616
617
618
    """Parse endpoint string into namespace, component, and endpoint parts.

    Args:
        endpoint: Endpoint string in 'dyn://namespace.component.endpoint' format.

    Returns:
        List of [namespace, component, endpoint] strings.

    Raises:
        ValueError: If endpoint format is invalid.
    """
619
620
621
622
623
624
625
626
627
628
629
630
631
    endpoint_str = endpoint.replace("dyn://", "", 1)
    endpoint_parts = endpoint_str.split(".")
    if len(endpoint_parts) != 3:
        error_msg = (
            f"Invalid endpoint format: '{endpoint}'. "
            f"Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
        )
        logging.error(error_msg)
        raise ValueError(error_msg)

    return endpoint_parts


632
633
634
635
636
def _reserve_disaggregation_bootstrap_port() -> int:
    """Reserve a unique port for disaggregation bootstrap.

    Returns:
        Available port number.
637
638
639
    """
    with reserve_free_port() as port:
        return port