"tests/schedulers/test_scheduler_flax.py" did not exist on "b0cc7c202ba0ad8839c5bd5d89aa14150f220695"
launch_router.py 23.9 KB
Newer Older
1
2
import argparse
import dataclasses
3
import logging
4
import sys
5
from typing import Dict, List, Optional
6
7
8
9
10

from sglang_router import Router
from sglang_router_rs import PolicyType


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def setup_logger():
    logger = logging.getLogger("router")
    logger.setLevel(logging.INFO)

    formatter = logging.Formatter(
        "[Router (Python)] %(asctime)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger


27
28
29
@dataclasses.dataclass
class RouterArgs:
    # Worker configuration
30
    worker_urls: List[str] = dataclasses.field(default_factory=list)
31
32
33
    host: str = "127.0.0.1"
    port: int = 30000

34
    # PD-specific configuration
35
    pd_disaggregation: bool = False  # Enable PD disaggregated mode
36
37
38
39
40
    prefill_urls: List[tuple] = dataclasses.field(
        default_factory=list
    )  # List of (url, bootstrap_port)
    decode_urls: List[str] = dataclasses.field(default_factory=list)

41
42
    # Routing policy
    policy: str = "cache_aware"
43
44
    prefill_policy: Optional[str] = None  # Specific policy for prefill nodes in PD mode
    decode_policy: Optional[str] = None  # Specific policy for decode nodes in PD mode
45
    worker_startup_timeout_secs: int = 300
46
    worker_startup_check_interval: int = 10
47
    cache_threshold: float = 0.5
48
49
    balance_abs_threshold: int = 32
    balance_rel_threshold: float = 1.0001
50
51
    eviction_interval: int = 60
    max_tree_size: int = 2**24
52
    max_payload_size: int = 256 * 1024 * 1024  # 256MB default for large batches
53
54
    dp_aware: bool = False
    api_key: Optional[str] = None
55
    log_dir: Optional[str] = None
56
    log_level: Optional[str] = None
57
58
59
60
61
    # Service discovery configuration
    service_discovery: bool = False
    selector: Dict[str, str] = dataclasses.field(default_factory=dict)
    service_discovery_port: int = 80
    service_discovery_namespace: Optional[str] = None
62
63
64
65
    # PD service discovery configuration
    prefill_selector: Dict[str, str] = dataclasses.field(default_factory=dict)
    decode_selector: Dict[str, str] = dataclasses.field(default_factory=dict)
    bootstrap_port_annotation: str = "sglang.ai/bootstrap-port"
66
67
68
    # Prometheus configuration
    prometheus_port: Optional[int] = None
    prometheus_host: Optional[str] = None
69
70
    # Request ID headers configuration
    request_id_headers: Optional[List[str]] = None
71
72
73
74
75
76
    # Request timeout in seconds
    request_timeout_secs: int = 600
    # Max concurrent requests for rate limiting
    max_concurrent_requests: int = 64
    # CORS allowed origins
    cors_allowed_origins: List[str] = dataclasses.field(default_factory=list)
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

    @staticmethod
    def add_cli_args(
        parser: argparse.ArgumentParser,
        use_router_prefix: bool = False,
        exclude_host_port: bool = False,
    ):
        """
        Add router-specific arguments to an argument parser.

        Args:
            parser: The argument parser to add arguments to
            use_router_prefix: If True, prefix all arguments with 'router-' to avoid conflicts
            exclude_host_port: If True, don't add host and port arguments (used when inheriting from server)
        """
        prefix = "router-" if use_router_prefix else ""

        # Worker configuration
        if not exclude_host_port:
            parser.add_argument(
                "--host",
                type=str,
                default=RouterArgs.host,
                help="Host address to bind the router server",
            )
            parser.add_argument(
                "--port",
                type=int,
                default=RouterArgs.port,
                help="Port number to bind the router server",
            )

        parser.add_argument(
            "--worker-urls",
            type=str,
112
113
            nargs="*",
            default=[],
114
115
116
117
118
119
120
121
            help="List of worker URLs (e.g., http://worker1:8000 http://worker2:8000)",
        )

        # Routing policy configuration
        parser.add_argument(
            f"--{prefix}policy",
            type=str,
            default=RouterArgs.policy,
122
            choices=["random", "round_robin", "cache_aware", "power_of_two"],
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
            help="Load balancing policy to use. In PD mode, this is used for both prefill and decode unless overridden",
        )
        parser.add_argument(
            f"--{prefix}prefill-policy",
            type=str,
            default=None,
            choices=["random", "round_robin", "cache_aware", "power_of_two"],
            help="Specific policy for prefill nodes in PD mode. If not specified, uses the main policy",
        )
        parser.add_argument(
            f"--{prefix}decode-policy",
            type=str,
            default=None,
            choices=["random", "round_robin", "cache_aware", "power_of_two"],
            help="Specific policy for decode nodes in PD mode. If not specified, uses the main policy",
138
139
140
141
        )

        # PD-specific arguments
        parser.add_argument(
142
            f"--{prefix}pd-disaggregation",
143
144
145
146
147
            action="store_true",
            help="Enable PD (Prefill-Decode) disaggregated mode",
        )
        parser.add_argument(
            f"--{prefix}prefill",
148
            nargs="+",
149
            action="append",
150
151
152
            help="Prefill server URL and optional bootstrap port. Can be specified multiple times. "
            "Format: --prefill URL [BOOTSTRAP_PORT]. "
            "BOOTSTRAP_PORT can be a port number, 'none', or omitted (defaults to none).",
153
154
155
156
157
158
159
        )
        parser.add_argument(
            f"--{prefix}decode",
            nargs=1,
            action="append",
            metavar=("URL",),
            help="Decode server URL. Can be specified multiple times.",
160
        )
161
162
163
164
165
166
        parser.add_argument(
            f"--{prefix}worker-startup-timeout-secs",
            type=int,
            default=RouterArgs.worker_startup_timeout_secs,
            help="Timeout in seconds for worker startup",
        )
167
168
169
170
171
172
        parser.add_argument(
            f"--{prefix}worker-startup-check-interval",
            type=int,
            default=RouterArgs.worker_startup_check_interval,
            help="Interval in seconds between checks for worker startup",
        )
173
174
175
176
177
178
179
        parser.add_argument(
            f"--{prefix}cache-threshold",
            type=float,
            default=RouterArgs.cache_threshold,
            help="Cache threshold (0.0-1.0) for cache-aware routing",
        )
        parser.add_argument(
180
181
182
183
184
185
186
            f"--{prefix}balance-abs-threshold",
            type=int,
            default=RouterArgs.balance_abs_threshold,
            help="Load balancing is triggered when (max_load - min_load) > abs_threshold AND max_load > min_load * rel_threshold. Otherwise, use cache aware",
        )
        parser.add_argument(
            f"--{prefix}balance-rel-threshold",
187
            type=float,
188
189
            default=RouterArgs.balance_rel_threshold,
            help="Load balancing is triggered when (max_load - min_load) > abs_threshold AND max_load > min_load * rel_threshold. Otherwise, use cache aware",
190
191
192
193
194
195
196
197
198
199
200
201
202
        )
        parser.add_argument(
            f"--{prefix}eviction-interval",
            type=int,
            default=RouterArgs.eviction_interval,
            help="Interval in seconds between cache eviction operations",
        )
        parser.add_argument(
            f"--{prefix}max-tree-size",
            type=int,
            default=RouterArgs.max_tree_size,
            help="Maximum size of the approximation tree for cache-aware routing",
        )
203
204
205
206
207
208
        parser.add_argument(
            f"--{prefix}max-payload-size",
            type=int,
            default=RouterArgs.max_payload_size,
            help="Maximum payload size in bytes",
        )
209
210
211
212
213
214
215
216
217
218
219
        parser.add_argument(
            f"--{prefix}dp-aware",
            action="store_true",
            help="Enable data parallelism aware schedule",
        )
        parser.add_argument(
            f"--{prefix}api-key",
            type=str,
            default=None,
            help="The api key used for the authorization with the worker.  Useful when the dp aware scheduling strategy is enaled.",
        )
220
221
222
223
224
225
        parser.add_argument(
            f"--{prefix}log-dir",
            type=str,
            default=None,
            help="Directory to store log files. If not specified, logs are only output to console.",
        )
226
227
228
229
230
231
232
        parser.add_argument(
            f"--{prefix}log-level",
            type=str,
            default="info",
            choices=["debug", "info", "warning", "error", "critical"],
            help="Set the logging level. If not specified, defaults to INFO.",
        )
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
        parser.add_argument(
            f"--{prefix}service-discovery",
            action="store_true",
            help="Enable Kubernetes service discovery",
        )
        parser.add_argument(
            f"--{prefix}selector",
            type=str,
            nargs="+",
            help="Label selector for Kubernetes service discovery (format: key1=value1 key2=value2)",
        )
        parser.add_argument(
            f"--{prefix}service-discovery-port",
            type=int,
            default=RouterArgs.service_discovery_port,
            help="Port to use for discovered worker pods",
        )
        parser.add_argument(
            f"--{prefix}service-discovery-namespace",
            type=str,
            help="Kubernetes namespace to watch for pods. If not provided, watches all namespaces (requires cluster-wide permissions)",
        )
255
256
257
258
259
260
261
262
263
264
265
266
        parser.add_argument(
            f"--{prefix}prefill-selector",
            type=str,
            nargs="+",
            help="Label selector for prefill server pods in PD mode (format: key1=value1 key2=value2)",
        )
        parser.add_argument(
            f"--{prefix}decode-selector",
            type=str,
            nargs="+",
            help="Label selector for decode server pods in PD mode (format: key1=value1 key2=value2)",
        )
267
268
269
270
271
272
273
274
275
276
277
278
279
        # Prometheus configuration
        parser.add_argument(
            f"--{prefix}prometheus-port",
            type=int,
            default=29000,
            help="Port to expose Prometheus metrics. If not specified, Prometheus metrics are disabled",
        )
        parser.add_argument(
            f"--{prefix}prometheus-host",
            type=str,
            default="127.0.0.1",
            help="Host address to bind the Prometheus metrics server",
        )
280
281
282
283
284
285
        parser.add_argument(
            f"--{prefix}request-id-headers",
            type=str,
            nargs="*",
            help="Custom HTTP headers to check for request IDs (e.g., x-request-id x-trace-id). If not specified, uses common defaults.",
        )
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
        parser.add_argument(
            f"--{prefix}request-timeout-secs",
            type=int,
            default=RouterArgs.request_timeout_secs,
            help="Request timeout in seconds",
        )
        parser.add_argument(
            f"--{prefix}max-concurrent-requests",
            type=int,
            default=RouterArgs.max_concurrent_requests,
            help="Maximum number of concurrent requests allowed (for rate limiting)",
        )
        parser.add_argument(
            f"--{prefix}cors-allowed-origins",
            type=str,
            nargs="*",
            default=[],
            help="CORS allowed origins (e.g., http://localhost:3000 https://example.com)",
        )
305
306
307
308
309
310
311
312
313
314
315
316
317

    @classmethod
    def from_cli_args(
        cls, args: argparse.Namespace, use_router_prefix: bool = False
    ) -> "RouterArgs":
        """
        Create RouterArgs instance from parsed command line arguments.

        Args:
            args: Parsed command line arguments
            use_router_prefix: If True, look for arguments with 'router-' prefix
        """
        prefix = "router_" if use_router_prefix else ""
318
319
320
321
322
323
        worker_urls = getattr(args, "worker_urls", [])

        # Parse PD URLs
        prefill_urls = cls._parse_prefill_urls(getattr(args, f"{prefix}prefill", None))
        decode_urls = cls._parse_decode_urls(getattr(args, f"{prefix}decode", None))

324
        return cls(
325
            worker_urls=worker_urls,
326
327
            host=args.host,
            port=args.port,
328
            pd_disaggregation=getattr(args, f"{prefix}pd_disaggregation", False),
329
330
            prefill_urls=prefill_urls,
            decode_urls=decode_urls,
331
            policy=getattr(args, f"{prefix}policy"),
332
333
            prefill_policy=getattr(args, f"{prefix}prefill_policy", None),
            decode_policy=getattr(args, f"{prefix}decode_policy", None),
334
335
336
            worker_startup_timeout_secs=getattr(
                args, f"{prefix}worker_startup_timeout_secs"
            ),
337
338
339
            worker_startup_check_interval=getattr(
                args, f"{prefix}worker_startup_check_interval"
            ),
340
            cache_threshold=getattr(args, f"{prefix}cache_threshold"),
341
342
            balance_abs_threshold=getattr(args, f"{prefix}balance_abs_threshold"),
            balance_rel_threshold=getattr(args, f"{prefix}balance_rel_threshold"),
343
344
            eviction_interval=getattr(args, f"{prefix}eviction_interval"),
            max_tree_size=getattr(args, f"{prefix}max_tree_size"),
345
            max_payload_size=getattr(args, f"{prefix}max_payload_size"),
346
347
            dp_aware=getattr(args, f"{prefix}dp_aware", False),
            api_key=getattr(args, f"{prefix}api_key", None),
348
            log_dir=getattr(args, f"{prefix}log_dir", None),
349
            log_level=getattr(args, f"{prefix}log_level", None),
350
351
352
353
354
355
            service_discovery=getattr(args, f"{prefix}service_discovery", False),
            selector=cls._parse_selector(getattr(args, f"{prefix}selector", None)),
            service_discovery_port=getattr(args, f"{prefix}service_discovery_port"),
            service_discovery_namespace=getattr(
                args, f"{prefix}service_discovery_namespace", None
            ),
356
357
358
359
360
361
362
            prefill_selector=cls._parse_selector(
                getattr(args, f"{prefix}prefill_selector", None)
            ),
            decode_selector=cls._parse_selector(
                getattr(args, f"{prefix}decode_selector", None)
            ),
            bootstrap_port_annotation="sglang.ai/bootstrap-port",  # Mooncake-specific annotation
363
364
            prometheus_port=getattr(args, f"{prefix}prometheus_port", None),
            prometheus_host=getattr(args, f"{prefix}prometheus_host", None),
365
            request_id_headers=getattr(args, f"{prefix}request_id_headers", None),
366
367
368
369
370
371
372
373
374
            request_timeout_secs=getattr(
                args, f"{prefix}request_timeout_secs", RouterArgs.request_timeout_secs
            ),
            max_concurrent_requests=getattr(
                args,
                f"{prefix}max_concurrent_requests",
                RouterArgs.max_concurrent_requests,
            ),
            cors_allowed_origins=getattr(args, f"{prefix}cors_allowed_origins", []),
375
376
        )

377
378
379
380
381
382
383
384
385
386
387
388
    @staticmethod
    def _parse_selector(selector_list):
        if not selector_list:
            return {}

        selector = {}
        for item in selector_list:
            if "=" in item:
                key, value = item.split("=", 1)
                selector[key] = value
        return selector

389
390
391
392
    @staticmethod
    def _parse_prefill_urls(prefill_list):
        """Parse prefill URLs from --prefill arguments.

393
394
395
396
397
        Format: --prefill URL [BOOTSTRAP_PORT]
        Example:
            --prefill http://prefill1:8080 9000  # With bootstrap port
            --prefill http://prefill2:8080 none  # Explicitly no bootstrap port
            --prefill http://prefill3:8080       # Defaults to no bootstrap port
398
399
400
401
402
        """
        if not prefill_list:
            return []

        prefill_urls = []
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
        for prefill_args in prefill_list:

            url = prefill_args[0]

            # Handle optional bootstrap port
            if len(prefill_args) >= 2:
                bootstrap_port_str = prefill_args[1]
                # Handle 'none' as None
                if bootstrap_port_str.lower() == "none":
                    bootstrap_port = None
                else:
                    try:
                        bootstrap_port = int(bootstrap_port_str)
                    except ValueError:
                        raise ValueError(
                            f"Invalid bootstrap port: {bootstrap_port_str}. Must be a number or 'none'"
                        )
420
            else:
421
422
                # No bootstrap port specified, default to None
                bootstrap_port = None
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440

            prefill_urls.append((url, bootstrap_port))

        return prefill_urls

    @staticmethod
    def _parse_decode_urls(decode_list):
        """Parse decode URLs from --decode arguments.

        Format: --decode URL
        Example: --decode http://decode1:8081 --decode http://decode2:8081
        """
        if not decode_list:
            return []

        # decode_list is a list of single-element lists due to nargs=1
        return [url[0] for url in decode_list]

441
442
443
444
445
446
447

def policy_from_str(policy_str: str) -> PolicyType:
    """Convert policy string to PolicyType enum."""
    policy_map = {
        "random": PolicyType.Random,
        "round_robin": PolicyType.RoundRobin,
        "cache_aware": PolicyType.CacheAware,
448
        "power_of_two": PolicyType.PowerOfTwo,
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
    }
    return policy_map[policy_str]


def launch_router(args: argparse.Namespace) -> Optional[Router]:
    """
    Launch the SGLang router with the configuration from parsed arguments.

    Args:
        args: Namespace object containing router configuration
            Can be either raw argparse.Namespace or converted RouterArgs

    Returns:
        Router instance if successful, None if failed
    """
464
    logger = logging.getLogger("router")
465
466
467
468
469
470
471
    try:
        # Convert to RouterArgs if needed
        if not isinstance(args, RouterArgs):
            router_args = RouterArgs.from_cli_args(args)
        else:
            router_args = args

472
        # Validate configuration based on mode
473
474
475
476
477
478
479
        if router_args.pd_disaggregation:
            # Validate PD configuration - skip URL requirements if using service discovery
            if not router_args.service_discovery:
                if not router_args.prefill_urls:
                    raise ValueError("PD disaggregation mode requires --prefill")
                if not router_args.decode_urls:
                    raise ValueError("PD disaggregation mode requires --decode")
480

481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
            # Warn about policy usage in PD mode
            if (
                router_args.prefill_policy
                and router_args.decode_policy
                and router_args.policy
            ):
                logger.warning(
                    "Both --prefill-policy and --decode-policy are specified. "
                    "The main --policy flag will be ignored for PD mode."
                )
            elif (
                router_args.prefill_policy
                and not router_args.decode_policy
                and router_args.policy
            ):
                logger.info(
                    f"Using --prefill-policy '{router_args.prefill_policy}' for prefill nodes "
                    f"and --policy '{router_args.policy}' for decode nodes."
                )
            elif (
                router_args.decode_policy
                and not router_args.prefill_policy
                and router_args.policy
            ):
                logger.info(
                    f"Using --policy '{router_args.policy}' for prefill nodes "
                    f"and --decode-policy '{router_args.decode_policy}' for decode nodes."
                )

510
        # Create router with unified constructor
511
        router = Router(
512
            worker_urls=(
513
514
515
                []
                if router_args.service_discovery or router_args.pd_disaggregation
                else router_args.worker_urls
516
            ),
517
518
            host=router_args.host,
            port=router_args.port,
519
520
            policy=policy_from_str(router_args.policy),
            worker_startup_timeout_secs=router_args.worker_startup_timeout_secs,
521
            worker_startup_check_interval=router_args.worker_startup_check_interval,
522
            cache_threshold=router_args.cache_threshold,
523
524
            balance_abs_threshold=router_args.balance_abs_threshold,
            balance_rel_threshold=router_args.balance_rel_threshold,
525
526
            eviction_interval_secs=router_args.eviction_interval,
            max_tree_size=router_args.max_tree_size,
527
            max_payload_size=router_args.max_payload_size,
528
529
            dp_aware=router_args.dp_aware,
            api_key=router_args.api_key,
530
            log_dir=router_args.log_dir,
531
            log_level=router_args.log_level,
532
533
534
535
            service_discovery=router_args.service_discovery,
            selector=router_args.selector,
            service_discovery_port=router_args.service_discovery_port,
            service_discovery_namespace=router_args.service_discovery_namespace,
536
537
            prefill_selector=router_args.prefill_selector,
            decode_selector=router_args.decode_selector,
538
539
            prometheus_port=router_args.prometheus_port,
            prometheus_host=router_args.prometheus_host,
540
            request_timeout_secs=router_args.request_timeout_secs,
541
            pd_disaggregation=router_args.pd_disaggregation,
542
            prefill_urls=(
543
                router_args.prefill_urls if router_args.pd_disaggregation else None
544
545
            ),
            decode_urls=(
546
                router_args.decode_urls if router_args.pd_disaggregation else None
547
            ),
548
549
550
551
552
553
554
555
556
557
            prefill_policy=(
                policy_from_str(router_args.prefill_policy)
                if router_args.prefill_policy
                else None
            ),
            decode_policy=(
                policy_from_str(router_args.decode_policy)
                if router_args.decode_policy
                else None
            ),
558
            request_id_headers=router_args.request_id_headers,
559
560
            max_concurrent_requests=router_args.max_concurrent_requests,
            cors_allowed_origins=router_args.cors_allowed_origins,
561
562
563
564
565
566
        )

        router.start()
        return router

    except Exception as e:
567
        logger.error(f"Error starting router: {e}")
568
        raise e
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588


class CustomHelpFormatter(
    argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
):
    """Custom formatter that preserves both description formatting and shows defaults"""

    pass


def parse_router_args(args: List[str]) -> RouterArgs:
    """Parse command line arguments and return RouterArgs instance."""
    parser = argparse.ArgumentParser(
        description="""SGLang Router - High-performance request distribution across worker nodes

Usage:
This launcher enables starting a router with individual worker instances. It is useful for
multi-node setups or when you want to start workers and router separately.

Examples:
589
  # Regular mode
590
  python -m sglang_router.launch_router --worker-urls http://worker1:8000 http://worker2:8000
591

592
  # PD disaggregated mode with same policy for both
593
  python -m sglang_router.launch_router --pd-disaggregation \\
594
    --prefill http://prefill1:8000 9000 --prefill http://prefill2:8000 \\
595
596
    --decode http://decode1:8001 --decode http://decode2:8001 \\
    --policy cache_aware
597

598
599
600
601
602
603
604
  # PD mode with optional bootstrap ports
  python -m sglang_router.launch_router --pd-disaggregation \\
    --prefill http://prefill1:8000 9000 \\    # With bootstrap port
    --prefill http://prefill2:8000 none \\    # Explicitly no bootstrap port
    --prefill http://prefill3:8000 \\         # Defaults to no bootstrap port
    --decode http://decode1:8001 --decode http://decode2:8001

605
606
  # PD mode with different policies for prefill and decode
  python -m sglang_router.launch_router --pd-disaggregation \\
607
    --prefill http://prefill1:8000 --prefill http://prefill2:8000 \\
608
609
610
    --decode http://decode1:8001 --decode http://decode2:8001 \\
    --prefill-policy cache_aware --decode-policy power_of_two

611
612
613
614
615
616
617
618
619
620
    """,
        formatter_class=CustomHelpFormatter,
    )

    RouterArgs.add_cli_args(parser, use_router_prefix=False)
    return RouterArgs.from_cli_args(parser.parse_args(args), use_router_prefix=False)


def main() -> None:
    router_args = parse_router_args(sys.argv[1:])
621
    launch_router(router_args)
622
623
624
625


if __name__ == "__main__":
    main()