conftest.py 2.82 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import os
import subprocess
import time
from pathlib import Path
from typing import Dict, Iterable, List, Tuple

import pytest
import requests

from ..fixtures.ports import find_free_port
from ..fixtures.router_manager import RouterManager


def pytest_configure(config):
    config.addinivalue_line("markers", "integration: mark as router integration test")


@pytest.fixture
def router_manager() -> Iterable[RouterManager]:
    mgr = RouterManager()
    try:
        yield mgr
    finally:
        mgr.stop_all()


def _spawn_mock_worker(args: List[str]) -> Tuple[subprocess.Popen, str, str]:
    repo_root = Path(__file__).resolve().parents[2]
    script = repo_root / "py_test" / "fixtures" / "mock_worker.py"
    port = find_free_port()
    worker_id = f"worker-{port}"
    base_cmd = [
        "python3",
        str(script),
        "--port",
        str(port),
        "--worker-id",
        worker_id,
    ]
    cmd = base_cmd + args
    proc = subprocess.Popen(cmd)
    url = f"http://127.0.0.1:{port}"
    _wait_health(url)
    return proc, url, worker_id


def _wait_health(url: str, timeout: float = 10.0):
    start = time.time()
    with requests.Session() as s:
        while time.time() - start < timeout:
            try:
                r = s.get(f"{url}/health", timeout=1)
                if r.status_code == 200:
                    return
            except requests.RequestException:
                pass
            time.sleep(0.1)
    raise TimeoutError(f"Mock worker at {url} did not become healthy")


@pytest.fixture
def mock_worker():
    """Start a single healthy mock worker; yields (process, url, worker_id)."""
    proc, url, worker_id = _spawn_mock_worker([])
    try:
        yield proc, url, worker_id
    finally:
        if proc.poll() is None:
            proc.terminate()
            try:
                proc.wait(timeout=3)
            except subprocess.TimeoutExpired:
                proc.kill()


@pytest.fixture
def mock_workers():
    """Factory to start N workers with custom args.

    Usage:
        procs, urls, ids = mock_workers(n=3, args=["--latency-ms", "5"])  # same args for all
        ...
    """

    procs: List[subprocess.Popen] = []

    def _start(n: int, args: List[str] | None = None):
        args = args or []
        new_procs: List[subprocess.Popen] = []
        urls: List[str] = []
        ids: List[str] = []
        for _ in range(n):
            p, url, wid = _spawn_mock_worker(args)
            procs.append(p)
            new_procs.append(p)
            urls.append(url)
            ids.append(wid)
        return new_procs, urls, ids

    try:
        yield _start
    finally:
        for p in procs:
            if p.poll() is None:
                p.terminate()
                try:
                    p.wait(timeout=3)
                except subprocess.TimeoutExpired:
                    p.kill()