port_utils.py 7.42 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Port allocation utilities for tests.

Port allocation with flock-based locking to prevent race conditions in parallel tests.
"""

import fcntl
import inspect
import json
import os
import random
import socket
import tempfile
import time
from pathlib import Path

# Port allocation lock file
_PORT_LOCK_FILE = Path(tempfile.gettempdir()) / "pytest_port_allocations.lock"
_PORT_REGISTRY_FILE = Path(tempfile.gettempdir()) / "pytest_port_allocations.json"

# Port range for allocation (i16 range for Rust compatibility)
# TODO: Get Rust backend to use u16 instead of i16 so we can use full 1024-65535 range
_PORT_MIN = 1024
_PORT_MAX = 32767


def _load_port_registry() -> dict:
    """Load the port registry from disk.

    Returns:
        dict: Port registry mapping port numbers (as strings) to allocation info.
              Example: {
                  "30001": {
                      "timestamp": 1732647123.456,
                      "caller_file": "/workspace/tests/test_foo.py",
                      "caller_function": "test_bar",
                      "caller_line": 42
                  }
              }
    """
    if not _PORT_REGISTRY_FILE.exists():
        return {}
    try:
        with open(_PORT_REGISTRY_FILE, "r") as f:
            return json.load(f)
    except (json.JSONDecodeError, OSError):
        return {}


def _save_port_registry(registry: dict) -> None:
    """Save the port registry to disk."""
    with open(_PORT_REGISTRY_FILE, "w") as f:
        json.dump(registry, f)


def _cleanup_stale_allocations(registry: dict, max_age: float = 900.0) -> dict:
    """Remove port allocations older than max_age seconds."""
    current_time = time.time()
    cleaned = {}
    for port, info in registry.items():
        # Handle both old format (timestamp only) and new format (dict with timestamp)
        if isinstance(info, dict):
            timestamp = info.get("timestamp", 0)
        else:
            timestamp = info

        if current_time - timestamp < max_age:
            cleaned[str(port)] = info

    return cleaned


def allocate_ports(count: int, start_port: int) -> list[int]:
    """Find and return available ports in i16 range with flock-based locking.

    Uses file locking (flock) to prevent race conditions when multiple test processes
    allocate ports simultaneously.

    Port range is limited to i16 (1024-32767) due to Rust backend expecting i16.

    Searches from a random offset (start_port + random(100)) and walks up incrementally.
    Wraps around to _PORT_MIN (1024) when exceeding _PORT_MAX. Retries up to 100 times.

    Args:
        count: Number of unique ports to allocate
        start_port: Starting port number for allocation (required)

    Returns:
        list[int]: List of available port numbers
    """
    # Get caller information for debugging
    caller_file = "unknown"
    caller_function = "unknown"
    caller_line = 0

    frame = inspect.currentframe()
    if frame and frame.f_back:
        caller_frame = frame.f_back
        caller_info = inspect.getframeinfo(caller_frame)
        caller_function = caller_frame.f_code.co_name
        caller_file = caller_info.filename
        caller_line = caller_info.lineno

    # Validate start_port is in valid i16 range. Note that <1024 is reserved for system services (root only)
    if start_port < _PORT_MIN or start_port > _PORT_MAX:
        raise ValueError(
            f"start_port must be between {_PORT_MIN} and {_PORT_MAX}, got {start_port}"
        )

    # Ensure lock file exists and is writable
    _PORT_LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
    _PORT_LOCK_FILE.touch(exist_ok=True)

    if not os.access(_PORT_LOCK_FILE, os.W_OK):
        raise PermissionError(
            f"Port allocation lock file is not writable: {_PORT_LOCK_FILE}"
        )

    with open(_PORT_LOCK_FILE, "r+") as lock_file:
        # Acquire exclusive lock
        fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)

        try:
            # Load registry and clean up stale allocations
            registry = _load_port_registry()
            registry = _cleanup_stale_allocations(registry)

            allocated_ports = set(int(p) for p in registry.keys())
            ports: list[int] = []

            # Start searching from desired port + random offset
            current_port = start_port + random.randint(0, 100)
            if current_port > _PORT_MAX:
                current_port = _PORT_MIN + (current_port - _PORT_MAX - 1)

            # Retry limit
            max_retries = 100
            attempts = 0

            while len(ports) < count and attempts < max_retries:
                attempts += 1

                # Try current port
                port = current_port

                # Increment and wrap around to _PORT_MIN
                current_port += 1
                if current_port > _PORT_MAX:
                    current_port = _PORT_MIN

                # Skip if already allocated or in our current list
                if port in allocated_ports or port in ports:
                    continue

                # Try to bind to verify it's actually free
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.bind(("", port))
                    sock.close()
                    ports.append(port)
                    registry[str(port)] = {
                        "timestamp": time.time(),
                        "caller_file": caller_file,
                        "caller_function": caller_function,
                        "caller_line": caller_line,
                    }
                except OSError:
                    continue

            if len(ports) < count:
                raise RuntimeError(
                    f"Could not find {count} available ports after {max_retries} retries"
                )

            # Save updated registry
            _save_port_registry(registry)

            return ports

        finally:
            # Release lock
            fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)


def allocate_port(start_port: int) -> int:
    """Find and return a single available port in i16 range.

    Args:
        start_port: Starting port number for allocation (required)

    Returns:
        int: An available port number between start_port and 32767 (i16 max)
    """
    return allocate_ports(1, start_port)[0]


def deallocate_ports(ports: list[int]) -> None:
    """Release previously allocated ports back to the pool.

    Args:
        ports: List of port numbers to release
    """
    if not ports:
        return

    # Ensure lock file exists
    _PORT_LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
    _PORT_LOCK_FILE.touch(exist_ok=True)

    with open(_PORT_LOCK_FILE, "r+") as lock_file:
        # Acquire exclusive lock
        fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)

        try:
            # Load registry
            registry = _load_port_registry()

            # Remove the specified ports
            for port in ports:
                registry.pop(str(port), None)

            # Save updated registry
            _save_port_registry(registry)

        finally:
            # Release lock
            fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)


def deallocate_port(port: int) -> None:
    """Release a previously allocated port back to the pool.

    Args:
        port: Port number to release
    """
    deallocate_ports([port])