test_tensor_mocker_engine.py 3.6 KB
Newer Older
1
2
3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

4
5
6
7
8
9
10
11
# Parallelization: Hermetic test (xdist-safe via dynamic ports).
# Tested on: Linux (Ubuntu 24.04 container), Intel(R) Core(TM) i9-14900K, 32 vCPU.
# Combined pre_merge wall time (this file + test_tensor_parameters.py):
# - Serialized: 87.48s.
# - Parallel (-n auto): 25.27s (62.21s saved, 3.46x).
# GPU Requirement: gpu_0 (CPU-only, echo worker does not use GPU)

"""gRPC tensor echo test with mocker worker."""
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

from __future__ import annotations

import logging
import os
import shutil

import pytest
import triton_echo_client

from tests.utils.constants import QWEN
from tests.utils.managed_process import ManagedProcess

logger = logging.getLogger(__name__)

TEST_MODEL = QWEN


class MockWorkerProcess(ManagedProcess):
31
    def __init__(self, request, system_port: int, worker_id: str = "mocker-worker"):
32
        self.worker_id = worker_id
33
        self.system_port = system_port
34
35
36
37
38
39
40
41
42

        command = [
            "python3",
            os.path.join(os.path.dirname(__file__), "echo_tensor_worker.py"),
        ]

        env = os.environ.copy()
        env["DYN_LOG"] = "debug"
        env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
43
        env["DYN_SYSTEM_PORT"] = str(system_port)
44
45
46
47
48
49
50
51
52
53
54
55
56

        log_dir = f"{request.node.name}_{worker_id}"

        try:
            shutil.rmtree(log_dir)
        except FileNotFoundError:
            pass

        super().__init__(
            command=command,
            env=env,
            health_check_urls=[
                # gRPC doesn't expose endpoint for listing models, so skip this check
57
58
                # (f"http://localhost:{grpc_port}/v1/models", check_models_api),
                (f"http://localhost:{system_port}/health", self.is_ready),
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
            ],
            timeout=300,
            display_output=True,
            terminate_existing=False,
            stragglers=[],
            straggler_commands=["echo_tensor_worker.py"],
            log_dir=log_dir,
        )

    def is_ready(self, response) -> bool:
        try:
            status = (response.json() or {}).get("status")
        except ValueError:
            logger.warning("%s health response is not valid JSON", self.worker_id)
            return False

        is_ready = status == "ready"
        if is_ready:
            logger.info("%s status is ready", self.worker_id)
        else:
            logger.warning("%s status is not ready: %s", self.worker_id, status)
        return is_ready


83
84
85
@pytest.fixture(scope="function")
def start_services_with_echo_worker(request, start_services_with_grpc):
    """Start echo worker with the shared gRPC frontend.
86

87
88
89
90
91
92
93
94
    Function-scoped to allow parallel test execution.
    Each test gets its own gRPC frontend + echo worker on unique ports.
    No namespace conflicts because runtime_services_dynamic_ports provides isolated Etcd/NATS.
    """
    frontend_port, system_port = start_services_with_grpc
    with MockWorkerProcess(request, system_port):
        logger.info(f"gRPC Echo Worker started for test on port {frontend_port}")
        yield frontend_port
95
96
97


@pytest.mark.pre_merge
98
99
@pytest.mark.gpu_0  # Echo worker is CPU-only (no GPU required)
@pytest.mark.parallel
100
@pytest.mark.integration
101
@pytest.mark.model(TEST_MODEL)
102
103
104
105
106
107
108
def test_echo(start_services_with_echo_worker) -> None:
    frontend_port = start_services_with_echo_worker
    # Use a per-test client instance to avoid cross-test/global state issues.
    client = triton_echo_client.TritonEchoClient(grpc_port=frontend_port)
    client.check_health()
    client.run_infer()
    client.get_config()