scenarios.py 9.56 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import re
17
from dataclasses import dataclass
18
from typing import Dict, Optional, Pattern
19
20
21

from tests.utils.managed_deployment import DeploymentSpec

22
# Worker name mapping for different backends
23
24
25
26
27
28
29
30
31
32
33
WORKER_MAP = {
    "vllm": {
        "decode": "VllmDecodeWorker",
        "prefill": "VllmPrefillWorker",
    },
    "sglang": {
        "decode": "decode",
        "prefill": "prefill",
    },
}

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Process ready patterns for recovery detection
WORKER_READY_PATTERNS: Dict[str, Pattern] = {
    # Frontend
    "Frontend": re.compile(r"added model"),
    # vLLM workers
    "VllmDecodeWorker": re.compile(
        r"VllmWorker for (?P<model_name>.*?) has been initialized"
    ),
    "VllmPrefillWorker": re.compile(
        r"VllmWorker for (?P<model_name>.*?) has been initialized"
    ),
    # SGLang workers - look for their specific initialization messages
    "decode": re.compile(
        r"Model registration succeeded|Decode worker handler initialized|Worker handler initialized"
    ),
    "prefill": re.compile(
        r"Model registration succeeded|Prefill worker handler initialized|Worker handler initialized"
    ),
}


def get_all_worker_types() -> list[str]:
    """Get all worker type names for both vLLM and SGLang."""
    worker_types = ["Frontend"]
    for backend in WORKER_MAP.values():
        worker_types.extend(backend.values())
    # Remove duplicates while preserving order
    seen = set()
    result = []
    for x in worker_types:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def get_worker_ready_pattern(worker_name: str) -> Optional[Pattern]:
    """Get the ready pattern for a specific worker type."""
    return WORKER_READY_PATTERNS.get(worker_name)


def get_backend_workers(backend: str) -> Dict[str, str]:
    """Get worker mapping for a specific backend."""
    return WORKER_MAP.get(backend, {})

79
80
81
82
83
84
85

@dataclass
class Load:
    clients: int = 10
    requests_per_client: int = 150
    input_token_length: int = 100
    output_token_length: int = 100
86
    max_retries: int = 3  # Increased for fault tolerance
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
    sla: Optional[float] = None


@dataclass
class Failure:
    time: int
    pod_name: str
    command: str
    signal: str = "SIGINT"
    replicas: int = 1


@dataclass
class Scenario:
    deployment: DeploymentSpec
    load: Load
    failures: list[Failure]
    model: Optional[str] = None
105
    backend: str = "vllm"  # Backend type for tracking
106
107


108
109
110
111
# Helper functions to create deployment specs
def _create_deployment_spec(backend, deploy_type, yaml_path):
    """Create a deployment spec with backend information."""
    return {"spec": DeploymentSpec(yaml_path), "backend": backend}
112
113


114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def _set_replicas(deployment_spec, backend, deploy_type, replicas):
    """Set replicas for all components in a deployment based on backend type."""
    spec = deployment_spec["spec"]

    # Frontend is common for all backends
    spec["Frontend"].replicas = replicas

    if backend in WORKER_MAP:
        # always scale decode
        spec[WORKER_MAP[backend]["decode"]].replicas = replicas
        # scale prefill only for disagg
        if deploy_type == "disagg":
            spec[WORKER_MAP[backend]["prefill"]].replicas = replicas


def _set_tensor_parallel(deployment_spec, backend, deploy_type, tp_size):
    """Set tensor parallel size for worker components."""
    spec = deployment_spec["spec"]

    if backend in WORKER_MAP:
        decode_worker = WORKER_MAP[backend]["decode"]
        prefill_worker = WORKER_MAP[backend]["prefill"]

        if deploy_type == "agg":
            if hasattr(spec, "set_tensor_parallel"):
                spec.set_tensor_parallel(tp_size, [decode_worker])
            else:
                spec[decode_worker].tensor_parallel_size = tp_size
        elif deploy_type == "disagg":
            spec[prefill_worker].tensor_parallel_size = tp_size
            spec[decode_worker].tensor_parallel_size = tp_size


def _create_deployments_for_backend(backend):
    """Create all deployment specifications for a given backend."""
    deployments = {}

    # Define the yaml files for agg and disagg deployments
    yaml_files = {
        "agg": f"components/backends/{backend}/deploy/agg.yaml",
        "disagg": f"components/backends/{backend}/deploy/disagg.yaml",
    }

    # Define the different configurations to test
    configurations = [
        {"tp": 1, "dp": 1},
        {"tp": 1, "dp": 2},
        {"tp": 2, "dp": 1},
        {"tp": 4, "dp": 1},
    ]

    for deploy_type in ["agg", "disagg"]:
        for config in configurations:
            tp_size = config["tp"]
            dp_replicas = config["dp"]
            # Skip creating disagg scenarios for TP > 1 if DP is also > 1 (uncommon case)
            if deploy_type == "disagg" and tp_size > 1 and dp_replicas > 1:
                continue

            # Construct the scenario name
            name_parts = [backend, deploy_type]

            if deploy_type == "agg":
                name_parts.append(f"tp-{tp_size}")
            elif deploy_type == "disagg":
                name_parts.append(f"prefill-tp-{tp_size}-decode-tp-{tp_size}")

            name_parts.append(f"dp-{dp_replicas}")

            scenario_name = "-".join(name_parts)

            # Create and configure the deployment
            deployment = _create_deployment_spec(
                backend, deploy_type, yaml_files[deploy_type]
            )
            if tp_size > 1:
                _set_tensor_parallel(deployment, backend, deploy_type, tp_size)
            if dp_replicas > 1:
                _set_replicas(deployment, backend, deploy_type, dp_replicas)

            deployments[scenario_name] = deployment

    return deployments


# Create all deployment specifications
deployment_specs = {}
deployment_specs.update(_create_deployments_for_backend("vllm"))
deployment_specs.update(_create_deployments_for_backend("sglang"))
203
204
205
206
207
208
209
210
211


# Each failure scenaro contains a list of failure injections
# Each failure injection has a time in seconds after the pervious injection and
# a list of failures to inject including the number of failures for each type.
# Failures are currently process termination or pod deletion
#
# Example:
#
212
#   "prefill_worker": [Failure(30, "VllmPrefillWorker", "dynamo.vllm", "SIGKILL")],
213
214
#
# terminates 1 prefill worker after 30 seconds
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def _create_backend_failures(backend):
    """Generate backend-specific failure scenarios."""
    workers = WORKER_MAP[backend]
    decode_worker = workers["decode"]
    prefill_worker = workers["prefill"]
    process_name = f"dynamo.{backend}"

    failures = {
        "frontend": [Failure(30, "Frontend", "dynamo.frontend")],
        "frontend_pod": [Failure(30, "Frontend", "delete_pod")],
        "decode_worker": [Failure(30, decode_worker, process_name, "SIGKILL")],
        "decode_worker_pod": [Failure(30, decode_worker, "delete_pod")],
        "prefill_worker": [Failure(30, prefill_worker, process_name, "SIGKILL")],
        "prefill_worker_pod": [Failure(30, prefill_worker, "delete_pod")],
        "none": [],
    }

    if backend == "vllm":
        failures["vllm_decode_engine_core"] = [
            Failure(30, decode_worker, "VLLM::EngineCore", "SIGKILL")
        ]
        failures["vllm_prefill_engine_core"] = [
            Failure(30, prefill_worker, "VLLM::EngineCore", "SIGKILL")
        ]
    elif backend == "sglang":
        failures["sglang_decode_scheduler"] = [
            Failure(30, decode_worker, "sglang::scheduler", "SIGKILL")
        ]
        failures["sglang_decode_detokenizer"] = [
            Failure(30, decode_worker, "sglang::detokenizer", "SIGKILL")
        ]
        failures["sglang_prefill_scheduler"] = [
            Failure(30, prefill_worker, "sglang::scheduler", "SIGKILL")
        ]
        failures["sglang_prefill_detokenizer"] = [
            Failure(30, prefill_worker, "sglang::detokenizer", "SIGKILL")
        ]

    return failures
254
255
256
257
258
259
260
261
262
263
264
265


load = Load()

# model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"

model = None

# Populate Scenarios

scenarios = {}

266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# Map of backend to failure definitions
backend_failure_map = {
    "vllm": _create_backend_failures("vllm"),
    "sglang": _create_backend_failures("sglang"),
}

for deployment_name, deployment_info in deployment_specs.items():
    backend = deployment_info["backend"]

    # Validate backend
    if backend not in backend_failure_map:
        raise ValueError(
            f"Unsupported backend: {backend}. Supported backends are: {list(backend_failure_map.keys())}"
        )

    # Get the appropriate failure set for this backend
    failure_set = backend_failure_map[backend]

    for failure_name, failure in failure_set.items():
        # Skip prefill failures for aggregated deployments
286
287
        if "prefill" in failure_name and "disagg" not in deployment_name:
            continue
288
289
290
291
292
293
294
295

        scenario_name = f"{deployment_name}-{failure_name}"
        scenarios[scenario_name] = Scenario(
            deployment=deployment_info["spec"],
            load=load,
            failures=failure,
            model=model,
            backend=backend,
296
        )