"vscode:/vscode.git/clone" did not exist on "a0fa0454b5e44b3fc91f74215989e1f005ea3676"
conftest.py 2.81 KB
Newer Older
1
2
3
import subprocess
import time
from pathlib import Path
4
from typing import Iterable, List, Optional, Tuple
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

import pytest
import requests

from ..fixtures.ports import find_free_port
from ..fixtures.router_manager import RouterManager


def pytest_configure(config):
    config.addinivalue_line("markers", "integration: mark as router integration test")


@pytest.fixture
def router_manager() -> Iterable[RouterManager]:
    mgr = RouterManager()
    try:
        yield mgr
    finally:
        mgr.stop_all()


def _spawn_mock_worker(args: List[str]) -> Tuple[subprocess.Popen, str, str]:
    repo_root = Path(__file__).resolve().parents[2]
    script = repo_root / "py_test" / "fixtures" / "mock_worker.py"
    port = find_free_port()
    worker_id = f"worker-{port}"
    base_cmd = [
        "python3",
        str(script),
        "--port",
        str(port),
        "--worker-id",
        worker_id,
    ]
    cmd = base_cmd + args
    proc = subprocess.Popen(cmd)
    url = f"http://127.0.0.1:{port}"
    _wait_health(url)
    return proc, url, worker_id


def _wait_health(url: str, timeout: float = 10.0):
    start = time.time()
    with requests.Session() as s:
        while time.time() - start < timeout:
            try:
                r = s.get(f"{url}/health", timeout=1)
                if r.status_code == 200:
                    return
            except requests.RequestException:
                pass
            time.sleep(0.1)
    raise TimeoutError(f"Mock worker at {url} did not become healthy")


@pytest.fixture
def mock_worker():
    """Start a single healthy mock worker; yields (process, url, worker_id)."""
    proc, url, worker_id = _spawn_mock_worker([])
    try:
        yield proc, url, worker_id
    finally:
        if proc.poll() is None:
            proc.terminate()
            try:
                proc.wait(timeout=3)
            except subprocess.TimeoutExpired:
                proc.kill()


@pytest.fixture
def mock_workers():
    """Factory to start N workers with custom args.

    Usage:
        procs, urls, ids = mock_workers(n=3, args=["--latency-ms", "5"])  # same args for all
        ...
    """

    procs: List[subprocess.Popen] = []

86
    def _start(n: int, args: Optional[List[str]] = None):
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
        args = args or []
        new_procs: List[subprocess.Popen] = []
        urls: List[str] = []
        ids: List[str] = []
        for _ in range(n):
            p, url, wid = _spawn_mock_worker(args)
            procs.append(p)
            new_procs.append(p)
            urls.append(url)
            ids.append(wid)
        return new_procs, urls, ids

    try:
        yield _start
    finally:
        for p in procs:
            if p.poll() is None:
                p.terminate()
                try:
                    p.wait(timeout=3)
                except subprocess.TimeoutExpired:
                    p.kill()