Unverified Commit 095ea3e7 authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

chore: updating and removing tests (#2130)

parent fdcf611f
......@@ -8,11 +8,10 @@ This document outlines the testing framework for the Dynamo runtime system, incl
```bash
tests/
├── serve/ # E2E tests using dynamo serve
│ ├── conftest.py # test fixtures as needed for specific test area
├── run/ # E2E tests using dynamo run
├── serve/ # E2E tests
│ ├── conftest.py # test fixtures as needed for specific test area
├── conftest.py # Shared fixtures and configuration
├── utils # Common utils accross tests
└── README.md # This file
```
......
This diff is collapsed.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
router-num-threads: 4
common-configs: [model, block-size, max-model-len]
VllmWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len]
Planner:
environment: local
no-operation: true
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
router-num-threads: 4
common-configs: [model, block-size, max-model-len]
VllmWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
ServiceArgs:
workers: 4
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
router-num-threads: 4
common-configs: [model, block-size, max-model-len]
VllmWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
ServiceArgs:
workers: 8
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
router-num-threads: 4
common-configs: [model, block-size, max-model-len]
VllmWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
tensor-parallel-size: 2
ServiceArgs:
workers: 1
resources:
gpu: '2'
common-configs: [model, block-size, max-model-len]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
ServiceArgs:
workers: 1
resources:
cpu: "10"
memory: "20Gi"
Processor:
router: round-robin
router-num-threads: 4
common-configs: [model, block-size, max-model-len]
ServiceArgs:
workers: 2
resources:
cpu: "10"
memory: "20Gi"
VllmWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
tensor-parallel-size: 2
ServiceArgs:
workers: 2
resources:
gpu: '2'
common-configs: [model, block-size, max-model-len]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
router-num-threads: 4
common-configs: [model, block-size, max-model-len]
ServiceArgs:
workers: 2
resources:
cpu: "10"
memory: "20Gi"
VllmWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
tensor-parallel-size: 2
ServiceArgs:
workers: 4
resources:
gpu: '2'
common-configs: [model, block-size, max-model-len]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
common-configs: [model, block-size]
VllmWorker:
remote-prefill: true
conditional-disagg: true
max-local-prefill-length: 10
max-prefill-queue-size: 2
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
common-configs: [model, block-size]
VllmWorker:
remote-prefill: true
conditional-disagg: true
max-local-prefill-length: 10
max-prefill-queue-size: 2
tensor-parallel-size: 2
ServiceArgs:
workers: 1
resources:
gpu: '2'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
tensor-parallel-size: 1
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
common-configs: [model, block-size]
ServiceArgs:
workers: 2
resources:
cpu: "10"
memory: "20Gi"
VllmWorker:
remote-prefill: true
conditional-disagg: true
max-local-prefill-length: 10
max-prefill-queue-size: 2
tensor-parallel-size: 2
ServiceArgs:
workers: 1
resources:
gpu: '2'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
tensor-parallel-size: 1
ServiceArgs:
workers: 2
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
common-configs: [model, block-size]
VllmWorker:
remote-prefill: true
conditional-disagg: true
max-local-prefill-length: 10
max-prefill-queue-size: 2
tensor-parallel-size: 4
ServiceArgs:
workers: 1
resources:
gpu: '4'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
ServiceArgs:
workers: 4
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
common-configs: [model, block-size]
VllmWorker:
remote-prefill: true
conditional-disagg: true
max-local-prefill-length: 10
max-prefill-queue-size: 2
tensor-parallel-size: 4
ServiceArgs:
workers: 1
resources:
gpu: '4'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
tensor-parallel-size: 2
ServiceArgs:
workers: 1
resources:
gpu: '2'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: round-robin
common-configs: [model, block-size]
ServiceArgs:
workers: 2
resources:
cpu: "10"
memory: "20Gi"
VllmWorker:
remote-prefill: true
conditional-disagg: true
max-local-prefill-length: 10
max-prefill-queue-size: 2
tensor-parallel-size: 4
ServiceArgs:
workers: 1
resources:
gpu: '4'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
tensor-parallel-size: 2
ServiceArgs:
workers: 2
resources:
gpu: '2'
common-configs: [model, block-size, max-model-len, kv-transfer-config]
Planner:
environment: local
no-operation: true
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
def pytest_addoption(parser):
parser.addoption("--requests-per-client", type=int, default=100)
parser.addoption("--clients", type=int, default=10)
parser.addoption("--no-respawn", action="store_true", default=False)
parser.addoption("--input-token-length", type=int, default=100)
parser.addoption("--output-token-length", type=int, default=100)
parser.addoption("--max-num-seqs", type=int, default=None)
parser.addoption("--max-retries", type=int, default=1)
parser.addoption("--display-dynamo-output", action="store_true", default=False)
parser.addoption("--combine-process-logs", action="store_true", default=False)
parser.addoption("--hf-hub-offline", action="store_true", default=False)
@pytest.fixture
def display_dynamo_output(request):
return request.config.getoption("--display-dynamo-output")
@pytest.fixture
def max_retries(request):
return request.config.getoption("--max-retries")
@pytest.fixture
def max_num_seqs(request):
return request.config.getoption("--max-num-seqs")
@pytest.fixture
def num_clients(request):
return request.config.getoption("--clients")
@pytest.fixture
def input_token_length(request):
return request.config.getoption("--input-token-length")
@pytest.fixture
def output_token_length(request):
return request.config.getoption("--output-token-length")
@pytest.fixture
def requests_per_client(request):
return request.config.getoption("--requests-per-client")
@pytest.fixture
def respawn(request):
return not request.config.getoption("--no-respawn")
@pytest.fixture
def separate_process_logs(request):
return not request.config.getoption("--combine-process-logs")
@pytest.fixture
def hf_hub_offline(request):
return request.config.getoption("--hf-hub-offline")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import os
import re
from datetime import datetime
from typing import Any
import pandas as pd
from tabulate import tabulate
def parse_test_log(file_path):
start_time = None
ready_time = None
fault_time = None
start_cmd = None
if not os.path.isfile(file_path):
return None, None, None
with open(file_path, "r") as f:
for line in f:
line = line.strip()
if "Running command: dynamo serve" in line:
start_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
start_cmd = line.split("Running command:")[1]
elif "Deployment Ready" in line:
ready_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
elif "Injecting failure for:" in line:
fault_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
startup_time = (
(ready_time - start_time).total_seconds() if start_time and ready_time else None
)
return startup_time, fault_time, start_cmd
def parse_client_logs(test_dir, expected_length=100):
all_logs = []
for file in os.listdir(test_dir):
if file.startswith("client_") and file.endswith(".log.txt"):
with open(os.path.join(test_dir, file), "r") as f:
request_number = 0
for line in f:
request_number += 1
data = json.loads(line.strip())
for result in data["results"]:
log_entry = {
"time": datetime.fromisoformat(
data["time"].replace("T", " ")
),
"status": result["status"],
"request_elapsed_time": result["request_elapsed_time"],
"request_number": request_number - 1,
"client": file.split("_")[1].split(".")[0],
}
if (
"result" in result
and result["result"]
and "choices" in result["result"]
and result["result"]["choices"]
):
log_entry["success"] = True
content = result["result"]["choices"][0]["message"][
"content"
]
if not content or len(content) < expected_length:
log_entry["success"] = False
else:
log_entry["success"] = False
all_logs.append(log_entry)
if len(all_logs):
df = pd.DataFrame(all_logs)
df.sort_values("time", inplace=True)
return df
return None
def calculate_metrics(df, fault_time, sla=2.1):
success = df["success"].sum()
failure = len(df) - success
if fault_time:
before_fault = df[df["time"] <= fault_time]
after_fault = df[df["time"] > fault_time]
else:
before_fault = df
after_fault = None
# Existing latency metrics (only successful requests)
successful_before = before_fault[before_fault["success"]]
avg_before = successful_before["request_elapsed_time"].mean()
std_before = successful_before["request_elapsed_time"].std()
avg_after, std_after = None, None
if after_fault is not None and not after_fault.empty:
successful_after = after_fault[after_fault["success"]]
avg_after = successful_after["request_elapsed_time"].mean()
std_after = successful_after["request_elapsed_time"].std()
# SLA violations (only successful requests exceeding the SLA)
violations_before = (successful_before["request_elapsed_time"] > sla).sum()
violations_after = (
(successful_after["request_elapsed_time"] > sla).sum()
if after_fault is not None and not after_fault.empty
else None
)
return (
success,
failure,
avg_before,
std_before,
avg_after,
std_after,
violations_before,
violations_after,
)
def parse_process_log(log_dir, process_name):
process_ready_line = {
"dynamo_Frontend": "added model",
"dynamo_VllmWorker": "Starting VllmWorker instance with all registered endpoints",
"dynamo_Processor": "Starting Processor instance with all registered endpoints",
"dynamo_PrefillWorker": "Starting PrefillWorker instance with all registered endpoints",
}
process_shutdown_line = {
"dynamo_Frontend": "SIGTERM received, starting graceful shutdown",
"dynamo_VllmWorker": "Received shutdown signal, shutting down DistributedRuntime",
"dynamo_Processor": "Received signal 15, initiating graceful shutdown",
"dynamo_PrefillWorker": "Shutdown hooks completed successfully",
}
process_log_path = os.path.join(log_dir, "error.log")
if not os.path.isfile(process_log_path):
return None, None
process_ready = []
process_shutdown = []
process_start_time = None
with open(process_log_path, "r") as f:
for line in f:
clean_line = re.sub(r"\x1b\[.*?m", "", line.strip()) # Remove ANSI codes
if not clean_line:
continue
parts = clean_line.split()
if len(parts) < 2:
continue
try:
# Parse timestamp (remove 'Z' for naive datetime)
timestamp = datetime.fromisoformat(parts[0].replace("Z", ""))
except ValueError:
continue
if not process_start_time:
process_start_time = timestamp
log_message = " ".join(parts[1:])
relative_time = (timestamp - process_start_time).total_seconds()
# Check for process start lines
if process_name in process_ready_line:
if process_ready_line[process_name] in log_message:
process_ready.append((timestamp, log_message, relative_time))
# Check for process end lines
if process_name in process_shutdown_line:
if process_shutdown_line[process_name] in log_message:
process_shutdown.append((timestamp, log_message, relative_time))
return process_ready, process_shutdown
def parse_watcher_log(test_dir, fault_time):
before_requests = []
after_requests = []
watcher_log_path = os.path.join(test_dir, "watcher.log.txt")
if not os.path.isfile(watcher_log_path):
return None, None
with open(watcher_log_path, "r") as f:
for line in f:
try:
data = json.loads(line.strip())
except json.JSONDecodeError:
continue
if "metrics" not in data:
continue
entry_time = datetime.fromisoformat(data["time"].replace("T", " "))
for metric in data["metrics"]:
if len(metric) != 2:
continue
_, metric_data = metric
if (
"num_requests_waiting" in metric_data
and "request_active_slots" in metric_data
and metric_data["request_active_slots"] > 0
):
if fault_time is None or entry_time <= fault_time:
before_requests.append(metric_data["num_requests_waiting"])
else:
after_requests.append(metric_data["num_requests_waiting"])
avg_before = (
sum(before_requests) / len(before_requests) if before_requests else None
)
avg_after = sum(after_requests) / len(after_requests) if after_requests else None
return avg_before, avg_after
def calculate_recovery_time(test_dir, failure_type, fault_time):
processes = [
"dynamo_Frontend",
"dynamo_Processor",
"dynamo_VllmWorker",
"dynamo_PrefillWorker",
]
process_start_ends = {}
start_time = None
for process in processes:
starts, ends = parse_process_log(os.path.join(test_dir, process), process)
if starts:
process_start_ends[process] = (starts, ends)
if failure_type == "processor":
start_time = process_start_ends["dynamo_Processor"][0][-1][0]
elif failure_type == "frontend":
start_time = process_start_ends["dynamo_Frontend"][0][-1][0]
elif failure_type == "decode_worker":
start_times = [
x
for x in process_start_ends["dynamo_VllmWorker"][0]
if "VllmWorker:1" in x[1]
]
if not start_times:
return None
start_time = start_times[-1][0]
elif failure_type == "prefill_worker":
if "dynamo_PrefillWorker" not in process_start_ends:
return None
start_times = [
x
for x in process_start_ends["dynamo_PrefillWorker"][0]
if "PrefillWorker:1" in x[1]
]
start_time = start_times[-1][0]
if not start_time:
return None
if fault_time > start_time:
return None
return (start_time - fault_time).total_seconds()
def process_test_directory(test_dir):
test_name = test_dir.split("test_worker_failure[", 1)[1].rstrip("]")
failure_type = test_name.split("-")[-1]
test_prefix = "-".join(test_name.split("-")[:-1])
startup_time, fault_time, start_cmd = parse_test_log(
os.path.join(test_dir, "test.log.txt")
)
df = parse_client_logs(test_dir)
if df is None or df.empty:
return None
pending_requests_before, pending_requests_after = parse_watcher_log(
test_dir, fault_time
)
(
success,
failure,
avg_before,
std_before,
avg_after,
std_after,
violations_before,
violations_after,
) = calculate_metrics(df, fault_time)
recovery_time = calculate_recovery_time(test_dir, failure_type, fault_time)
return {
"test": test_prefix,
"cmd": start_cmd,
"failure": failure_type,
"start_time": startup_time,
"success_requests": success,
"failed_requests": failure,
"avg_latency_before": avg_before,
"std_latency_before": std_before,
"avg_latency_after": avg_after,
"std_latency_after": std_after,
"pending_requests_before": pending_requests_before,
"pending_requests_after": pending_requests_after,
"violations_before": violations_before,
"violations_after": violations_after,
"recovery_time": recovery_time,
}
def main(logs_dir, tablefmt, log_paths=[]):
results = []
if log_paths:
for log_path in log_paths:
result = process_test_directory(log_path)
if result:
results.append(result)
elif logs_dir:
for entry in os.listdir(logs_dir):
if entry.startswith("test_worker_failure[") and os.path.isdir(
os.path.join(logs_dir, entry)
):
result = process_test_directory(os.path.join(logs_dir, entry))
if result:
results.append(result)
# Group results by test prefix
grouped: dict[str, list[dict[str, Any]]] = {}
commands = {}
for res in results:
test_prefix = res["test"]
if test_prefix not in grouped:
grouped[test_prefix] = []
commands[test_prefix] = res["cmd"]
grouped[test_prefix].append(res)
order = [
"none",
"frontend",
"processor",
"decode_worker",
"prefill_worker",
"vllm_worker",
]
# Print grouped tables
for test_prefix, group in grouped.items():
new_group = []
for failure in order:
for res in group:
if failure == res["failure"]:
new_group.append(res)
group = new_group
headers = [
"Failure",
"Startup Time",
"Success",
"Failed",
"Latency Before",
"Latency After",
"Pending Before",
"Pending After",
"Violations Before",
"Violations After",
"Recovery Time",
]
rows = []
for res in group:
row = [
res["failure"],
res["start_time"], # if res["start_time"] is not None else "N/A",
res["success_requests"],
res["failed_requests"],
res["avg_latency_before"],
res["avg_latency_after"],
res["pending_requests_before"],
res["pending_requests_after"],
res["violations_before"],
res["violations_after"],
res["recovery_time"],
]
rows.append(row)
print(f"\nTest Group: {test_prefix}")
print(f"\nTest Command: {commands[test_prefix]}")
print(
tabulate(
rows,
headers,
tablefmt=tablefmt,
floatfmt=".2f",
missingval="N/A",
numalign="right",
stralign="center",
)
)
print("\n" + "=" * 80)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse test results")
parser.add_argument("--log-dir", default=".", help="Path to the logs directory")
parser.add_argument(
"--format", choices=["fancy", "markdown"], default="fancy", help="Table format"
)
args = parser.parse_args()
# Map format choices to tabulate formats
tablefmt = (
"fancy_grid" if args.format == "fancy" else "pipe"
) # Using pipe for markdown compatibility
main(args.log_dir, tablefmt)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from tests.utils.deployment_graph import (
DeploymentGraph,
Payload,
chat_completions_response_handler,
)
# Initial payload used for testing
# initial deployment readiness.
text_prompt = "Tell me a short joke about AI."
text_payload = Payload(
payload_chat={
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"messages": [
{
"role": "user",
"content": text_prompt, # Shorter prompt
}
],
"max_tokens": 150,
"temperature": 0.1,
# "seed": 10,
"ignore_eos": True,
"min_tokens": 150,
"stream": False,
},
expected_log=[],
expected_response=["AI"],
)
# Each Deployment Graph contains
# the dynamo serve module and configuration as well
# as the endpoint for interaction
deployment_graphs = {
"agg-tp-1-dp-1": (
DeploymentGraph(
module="graphs.agg:Frontend",
config="/workspace/tests/fault_tolerance/configs/agg_tp_1_dp_1.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_1, pytest.mark.vllm],
),
text_payload,
),
"agg-tp-1-dp-8": (
DeploymentGraph(
module="graphs.agg:Frontend",
config="/workspace/tests/fault_tolerance/configs/agg_tp_1_dp_8.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_8, pytest.mark.vllm],
),
text_payload,
),
"agg-tp-1-dp-4": (
DeploymentGraph(
module="graphs.agg:Frontend",
config="/workspace/tests/fault_tolerance/configs/agg_tp_1_dp_4.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_4, pytest.mark.vllm],
),
text_payload,
),
"agg-tp-2-dp-1": (
DeploymentGraph(
module="graphs.agg:Frontend",
config="/workspace/tests/fault_tolerance/configs/agg_tp_2_dp_1.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_2, pytest.mark.vllm],
),
text_payload,
),
"agg-tp-2-dp-2": (
DeploymentGraph(
module="graphs.agg:Frontend",
config="/workspace/tests/fault_tolerance/configs/agg_tp_2_dp_2.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_4, pytest.mark.vllm],
),
text_payload,
),
"agg-tp-2-dp-4": (
DeploymentGraph(
module="graphs.agg:Frontend",
config="/workspace/tests/fault_tolerance/configs/agg_tp_2_dp_4.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_8, pytest.mark.vllm],
),
text_payload,
),
"disagg-p-tp-1-dp-1-d-tp-1-dp-1": (
DeploymentGraph(
module="graphs.disagg:Frontend",
config="/workspace/tests/fault_tolerance/configs/disagg_p_tp_1_dp_1_d_tp_1_dp_1.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_2, pytest.mark.vllm],
),
text_payload,
),
"disagg-p-tp-1-dp-4-d-tp-4-dp-1": (
DeploymentGraph(
module="graphs.disagg:Frontend",
config="/workspace/tests/fault_tolerance/configs/disagg_p_tp_1_dp_4_d_tp_4_dp_1.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_8, pytest.mark.vllm],
),
text_payload,
),
"disagg-p-tp-2-dp-2-d-tp-4-dp-1": (
DeploymentGraph(
module="graphs.disagg:Frontend",
config="/workspace/tests/fault_tolerance/configs/disagg_p_tp_2_dp_2_d_tp_4_dp_1.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_8, pytest.mark.vllm],
),
text_payload,
),
"disagg-p-tp-2-dp-1-d-tp-4-dp-1": (
DeploymentGraph(
module="graphs.disagg:Frontend",
config="/workspace/tests/fault_tolerance/configs/disagg_p_tp_2_dp_1_d_tp_4_dp_1.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_8, pytest.mark.vllm],
),
text_payload,
),
"disagg-p-tp-1-dp-2-d-tp-2-dp-1": (
DeploymentGraph(
module="graphs.disagg:Frontend",
config="/workspace/tests/fault_tolerance/configs/disagg_p_tp_1_dp_2_d_tp_2_dp_1.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_4, pytest.mark.vllm],
),
text_payload,
),
"disagg-p-tp-1-dp-1-d-tp-2-dp-1": (
DeploymentGraph(
module="graphs.disagg:Frontend",
config="/workspace/tests/fault_tolerance/configs/disagg_p_tp_1_dp_1_d_tp_2_dp_1.yaml",
directory="/workspace/examples/llm",
endpoints=["v1/chat/completions"],
response_handlers=[chat_completions_response_handler],
marks=[pytest.mark.gpu_4, pytest.mark.vllm],
),
text_payload,
),
}
# Each failure scenaro contains a list of failure injections
# Each failure injection has a time in seconds after the pervious injection and
# a list of failures to inject including the number of failures for each type.
# Failures are currently process termination.
#
# Example:
#
# "prefill_worker": [[30, [("dynamo_prefillworker", 1)]]],
#
# terminates 1 prefill worker after 30 seconds
failure_scenarios = {
"decode_worker": [[30, [("dynamo_vllmworker", 1)]]],
"prefill_worker": [[30, [("dynamo_prefillworker", 1)]]],
"frontend": [[30, [("dynamo_frontend", 1)]]],
"processor": [[30, [("dynamo_processor", 1)]]],
"vllm_worker": [[30, [("vllm_worker", 1)]]],
"none": [],
}
@pytest.fixture(params=list(failure_scenarios.keys()))
def failures(request):
return failure_scenarios[request.param]
@pytest.fixture(params=list(deployment_graphs.keys()))
def deployment_graph_test(request):
"""
Fixture that provides different deployment graph test configurations.
"""
return deployment_graphs[request.param]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import time
from contextlib import contextmanager
from multiprocessing import Process
import psutil
import pytest
from tests.fault_tolerance.client import client
from tests.fault_tolerance.parse_results import main as parse_results
from tests.fault_tolerance.scenarios import ( # noqa: F401
deployment_graph_test,
failures,
)
from tests.fault_tolerance.utils.circus_controller import CircusController
from tests.fault_tolerance.utils.metrics import nvidia_smi # noqa: F401
from tests.fault_tolerance.utils.metrics import worker_metrics # noqa: F401
from tests.serve.test_dynamo_serve import DynamoServeProcess
from tests.utils.managed_process import terminate_process_tree
def _set_deployment_args(request, max_num_seqs):
decode_worker_name = "VllmWorker"
args = {}
if max_num_seqs is not None:
args[f"--{decode_worker_name}.max_num_seqs"] = max_num_seqs
return args
def _list_vllm_worker_processes():
processes = []
for ps_process in psutil.process_iter(["name", "cmdline"]):
try:
if "from multiprocessing.spawn import spawn_main;" in " ".join(
ps_process.cmdline()
):
processes.append(ps_process.pid)
except Exception:
pass
return processes
@contextmanager
def _clients(
logger,
num_clients,
request,
deployment_graph,
server_process,
payload,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
):
procs = []
for i in range(num_clients):
procs.append(
Process(
target=client,
args=(
deployment_graph,
server_process,
payload,
request.node.name,
i,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
),
)
)
procs[-1].start()
yield procs
for proc in procs:
logger.debug(f"{proc} waiting for join")
proc.join()
logger.debug(f"{proc} joined")
def _inject_failures(failures, logger): # noqa: F811
circus_controller = CircusController.from_state_file("dynamo")
for failure_time, component in failures:
time.sleep(failure_time)
for component_name, number in component:
logger.info(f"Injecting failure for: {component_name}")
if "dynamo" in component_name:
result = circus_controller.client.call(
{"command": "list", "properties": {"name": f"{component_name}"}}
)
if result["status"] == "error":
logger.warning(f"component {component_name} not found {result}")
continue
num_processes = len(result["pids"])
if number is None:
number = num_processes
for x in range(number):
pid = result["pids"][x % num_processes]
logger.info(f"Terminating {component_name} Pid {pid}")
terminate_process_tree(pid, logger, immediate_kill=True)
elif "vllm" in component_name:
vllm_processes = _list_vllm_worker_processes()
num_processes = len(vllm_processes)
if number is None:
number = len(vllm_processes)
for x in range(number):
pid = vllm_processes[x % num_processes]
terminate_process_tree(pid, logger, immediate_kill=True)
circus_controller.close()
global_result_list = []
@pytest.fixture(autouse=True)
def results_table(request):
yield
parse_results(logs_dir=None, log_paths=[request.node.name], tablefmt="fancy")
global_result_list.append(request.node.name)
@pytest.fixture(autouse=True, scope="session")
def results_summary():
yield
parse_results(logs_dir=None, log_paths=global_result_list, tablefmt="fancy")
@pytest.mark.e2e
@pytest.mark.slow
def test_worker_failure(
deployment_graph_test, # noqa: F811
request,
runtime_services,
num_clients,
requests_per_client,
worker_metrics, # noqa: F811
respawn,
failures, # noqa: F811
input_token_length,
output_token_length,
max_num_seqs,
max_retries,
display_dynamo_output,
nvidia_smi, # noqa: F811
separate_process_logs,
hf_hub_offline,
):
"""
Test dynamo serve deployments with injected failures
"""
# runtime_services is used to start nats and etcd
logger = logging.getLogger(request.node.name)
logger.info("Starting test_deployment")
deployment_graph, payload = deployment_graph_test
if hf_hub_offline:
os.environ["HF_HUB_OFFLINE"] = "1"
else:
if "HF_HUB_OFFLINE" in os.environ:
del os.environ["HF_HUB_OFFLINE"]
if respawn:
os.environ["DYN_CIRCUS_RESPAWN"] = "1"
else:
if "DYN_CIRCUS_RESPAWN" in os.environ:
del os.environ["DYN_CIRCUS_RESPAWN"]
if separate_process_logs:
os.environ["DYN_CIRCUS_LOG_DIR"] = os.path.abspath(request.node.name)
deployment_args = _set_deployment_args(request, max_num_seqs)
with DynamoServeProcess(
deployment_graph,
request,
display_output=display_dynamo_output,
args=deployment_args,
) as server_process:
server_process.wait_for_ready(payload)
with _clients(
logger,
num_clients,
request,
deployment_graph,
server_process,
payload,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
):
_inject_failures(failures, logger)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
from pathlib import Path
from typing import List, Optional
from circus.client import CircusClient
from circus.exc import CallError
logger = logging.getLogger(__name__)
class CircusController:
"""A circus client implementation for Dynamo"""
def __init__(self, endpoint: str):
"""Initialize connection to arbiter.
Args:
endpoint: The circus endpoint (e.g., tcp://127.0.0.1:54927)
"""
self.endpoint = endpoint
self.client = CircusClient(endpoint=endpoint, timeout=15.0)
@classmethod
def from_state_file(cls, namespace: str) -> "CircusController":
"""
Create a CircusController from a Dynamo state file.
Args:
namespace: The Dynamo namespace
Returns:
CircusController instance
Raises:
FileNotFoundError: If state file doesn't exist
ValueError: If no endpoint found in state file
"""
state_file = (
Path(
os.environ.get("DYN_LOCAL_STATE_DIR", Path.home() / ".dynamo" / "state")
)
/ f"{namespace}.json"
)
if not state_file.exists():
raise FileNotFoundError(f"State file not found: {state_file}")
with open(state_file, "r") as f:
state = json.load(f)
endpoint = state.get("circus_endpoint")
if not endpoint:
raise ValueError(f"No endpoint found in state file: {state_file}")
return cls(endpoint)
async def _get_watcher_processes(self, name: str) -> Optional[int]:
"""
Get number of processes for a watcher.
Args:
name: The name of the watcher
Returns:
Number of processes for the watcher. Returns None operation fails.
"""
try:
response = self.client.send_message("numprocesses", name=name)
return int(response.get("numprocesses", 0))
except (CallError, Exception) as e:
logger.error(f"Failed to get process count for {name}: {e}")
return None
async def _list_watchers(self) -> List[str]:
"""
List all watchers managed by circus.
Returns:
List of watcher names. Returns None if the list operation fails.
"""
try:
response = self.client.send_message("list")
return response.get("watchers", [])
except (CallError, Exception) as e:
logger.error(f"Failed to list watchers: {e}")
return []
def close(self) -> None:
"""Close the connection to the arbiter."""
if hasattr(self, "client"):
self.client.stop()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import os
from datetime import datetime
from multiprocessing import Process
import psutil
import pytest
from dynamo.runtime import dynamo_worker
from tests.fault_tolerance.utils.circus_controller import CircusController
from tests.utils.managed_process import ManagedProcess
def run_metrics_process(log_dir):
asyncio.run(get_metrics(log_dir))
@dynamo_worker()
async def get_metrics(runtime, log_dir):
# Log # processes
# Log # metrics per vllm worker
circus_controller = None
pipeline = None
log_path = os.path.join(log_dir, "watcher.log.txt")
with open(log_path, "w") as log:
while True:
try:
await asyncio.sleep(0.5)
if not circus_controller:
circus_controller = CircusController.from_state_file("dynamo")
if not pipeline:
pipeline = (
await runtime.namespace("dynamo")
.component("VllmWorker")
.endpoint("load_metrics")
.client()
)
watchers = []
for x in await circus_controller._list_watchers():
result = circus_controller.client.call(
{"command": "list", "properties": {"name": f"{x}"}}
)
watchers.append((x, result))
metrics = []
for x in pipeline.instance_ids():
async for worker_metric in await pipeline.direct(None, x):
metrics.append((x, worker_metric.data()))
vllm_processes = []
for ps_process in psutil.process_iter(["name", "cmdline"]):
try:
if "from multiprocessing.spawn import spawn_main;" in " ".join(
ps_process.cmdline()
):
vllm_processes.append(ps_process.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process may have terminated or become inaccessible during iteration
pass
record = {
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"watchers": watchers,
"metrics": metrics,
"vllm_processes": vllm_processes,
}
log.write(json.dumps(record) + "\n")
log.flush()
except Exception as e:
record = {
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"watchers": [],
"metrics": [],
"vllm_processes": [],
"error": str(e),
}
log.write(json.dumps(record) + "\n")
log.flush()
@pytest.fixture
def worker_metrics(request):
process = Process(target=run_metrics_process, args=(request.node.name,))
process.start()
yield
process.kill()
class NvidiaSMI(ManagedProcess):
def __init__(self, request):
super().__init__(
command=[
"nvidia-smi",
"dmon",
"--select=puc",
],
health_check_ports=[],
terminate_existing=True,
display_output=False,
data_dir=None,
log_dir=request.node.name,
)
@pytest.fixture
def nvidia_smi(request):
with NvidiaSMI(request) as nvidia_smi_process:
yield nvidia_smi_process
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment