port_utils.py 11.7 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# SPDX-License-Identifier: Apache-2.0

"""
Port allocation utilities for tests.

Port allocation with flock-based locking to prevent race conditions in parallel tests.
"""

import fcntl
import inspect
import json
import os
import random
import socket
import tempfile
import time
18
from dataclasses import dataclass
19
20
21
22
23
24
25
26
27
28
29
30
from pathlib import Path

# Port allocation lock file
_PORT_LOCK_FILE = Path(tempfile.gettempdir()) / "pytest_port_allocations.lock"
_PORT_REGISTRY_FILE = Path(tempfile.gettempdir()) / "pytest_port_allocations.json"

# Port range for allocation (i16 range for Rust compatibility)
# TODO: Get Rust backend to use u16 instead of i16 so we can use full 1024-65535 range
_PORT_MIN = 1024
_PORT_MAX = 32767


31
32
33
34
35
36
37
38
39
40
41
42
@dataclass(frozen=True)
class ServicePorts:
    """Port allocation for Dynamo service deployments.

    Used by tests that need to pass a cohesive set of ports around (frontend + one or
    more worker/system ports).
    """

    frontend_port: int
    system_ports: list[int]


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def _load_port_registry() -> dict:
    """Load the port registry from disk.

    Returns:
        dict: Port registry mapping port numbers (as strings) to allocation info.
              Example: {
                  "30001": {
                      "timestamp": 1732647123.456,
                      "caller_file": "/workspace/tests/test_foo.py",
                      "caller_function": "test_bar",
                      "caller_line": 42
                  }
              }
    """
    if not _PORT_REGISTRY_FILE.exists():
        return {}
    try:
        with open(_PORT_REGISTRY_FILE, "r") as f:
            return json.load(f)
    except (json.JSONDecodeError, OSError):
        return {}


def _save_port_registry(registry: dict) -> None:
    """Save the port registry to disk."""
    with open(_PORT_REGISTRY_FILE, "w") as f:
        json.dump(registry, f)


def _cleanup_stale_allocations(registry: dict, max_age: float = 900.0) -> dict:
    """Remove port allocations older than max_age seconds."""
    current_time = time.time()
    cleaned = {}
    for port, info in registry.items():
        # Handle both old format (timestamp only) and new format (dict with timestamp)
        if isinstance(info, dict):
            timestamp = info.get("timestamp", 0)
        else:
            timestamp = info

        if current_time - timestamp < max_age:
            cleaned[str(port)] = info

    return cleaned


def allocate_ports(count: int, start_port: int) -> list[int]:
    """Find and return available ports in i16 range with flock-based locking.

    Uses file locking (flock) to prevent race conditions when multiple test processes
    allocate ports simultaneously.

    Port range is limited to i16 (1024-32767) due to Rust backend expecting i16.

    Searches from a random offset (start_port + random(100)) and walks up incrementally.
    Wraps around to _PORT_MIN (1024) when exceeding _PORT_MAX. Retries up to 100 times.

    Args:
        count: Number of unique ports to allocate
        start_port: Starting port number for allocation (required)

    Returns:
        list[int]: List of available port numbers
    """
    # Get caller information for debugging
    caller_file = "unknown"
    caller_function = "unknown"
    caller_line = 0

    frame = inspect.currentframe()
    if frame and frame.f_back:
        caller_frame = frame.f_back
        caller_info = inspect.getframeinfo(caller_frame)
        caller_function = caller_frame.f_code.co_name
        caller_file = caller_info.filename
        caller_line = caller_info.lineno

    # Validate start_port is in valid i16 range. Note that <1024 is reserved for system services (root only)
    if start_port < _PORT_MIN or start_port > _PORT_MAX:
        raise ValueError(
            f"start_port must be between {_PORT_MIN} and {_PORT_MAX}, got {start_port}"
        )

    # Ensure lock file exists and is writable
    _PORT_LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
    _PORT_LOCK_FILE.touch(exist_ok=True)

    if not os.access(_PORT_LOCK_FILE, os.W_OK):
        raise PermissionError(
            f"Port allocation lock file is not writable: {_PORT_LOCK_FILE}"
        )

    with open(_PORT_LOCK_FILE, "r+") as lock_file:
        # Acquire exclusive lock
        fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)

        try:
            # Load registry and clean up stale allocations
            registry = _load_port_registry()
            registry = _cleanup_stale_allocations(registry)

            allocated_ports = set(int(p) for p in registry.keys())
            ports: list[int] = []

            # Start searching from desired port + random offset
            current_port = start_port + random.randint(0, 100)
            if current_port > _PORT_MAX:
                current_port = _PORT_MIN + (current_port - _PORT_MAX - 1)

            # Retry limit
            max_retries = 100
            attempts = 0

            while len(ports) < count and attempts < max_retries:
                attempts += 1

                # Try current port
                port = current_port

                # Increment and wrap around to _PORT_MIN
                current_port += 1
                if current_port > _PORT_MAX:
                    current_port = _PORT_MIN

                # Skip if already allocated or in our current list
                if port in allocated_ports or port in ports:
                    continue

                # Try to bind to verify it's actually free
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.bind(("", port))
                    sock.close()
                    ports.append(port)
                    registry[str(port)] = {
                        "timestamp": time.time(),
                        "caller_file": caller_file,
                        "caller_function": caller_function,
                        "caller_line": caller_line,
                    }
                except OSError:
                    continue

            if len(ports) < count:
                raise RuntimeError(
                    f"Could not find {count} available ports after {max_retries} retries"
                )

            # Save updated registry
            _save_port_registry(registry)

            return ports

        finally:
            # Release lock
            fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)


201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def allocate_contiguous_ports(
    count: int, block_size: int, start_port: int
) -> list[int]:
    """Find and return contiguous port blocks in i16 range with flock-based locking.

    Args:
        count: Number of contiguous blocks to allocate
        block_size: Size of each contiguous block
        start_port: Starting port number for allocation (required)

    Returns:
        list[int]: Flattened list of allocated ports grouped into contiguous blocks
    """
    if count <= 0:
        return []
    if block_size <= 0:
        raise ValueError(f"block_size must be positive, got {block_size}")

    caller_file = "unknown"
    caller_function = "unknown"
    caller_line = 0

    frame = inspect.currentframe()
    if frame and frame.f_back:
        caller_frame = frame.f_back
        caller_info = inspect.getframeinfo(caller_frame)
        caller_function = caller_frame.f_code.co_name
        caller_file = caller_info.filename
        caller_line = caller_info.lineno

    if start_port < _PORT_MIN or start_port > _PORT_MAX:
        raise ValueError(
            f"start_port must be between {_PORT_MIN} and {_PORT_MAX}, got {start_port}"
        )

    if start_port + block_size - 1 > _PORT_MAX:
        raise ValueError(
            f"start_port {start_port} with block_size {block_size} exceeds {_PORT_MAX}"
        )

    _PORT_LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
    _PORT_LOCK_FILE.touch(exist_ok=True)

    if not os.access(_PORT_LOCK_FILE, os.W_OK):
        raise PermissionError(
            f"Port allocation lock file is not writable: {_PORT_LOCK_FILE}"
        )

    with open(_PORT_LOCK_FILE, "r+") as lock_file:
        fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)

        try:
            registry = _load_port_registry()
            registry = _cleanup_stale_allocations(registry)

            allocated_ports = set(int(p) for p in registry.keys())
            ports: list[int] = []

            current_port = start_port + random.randint(0, 100)
            if current_port + block_size - 1 > _PORT_MAX:
                current_port = _PORT_MIN

            max_retries = 500
            attempts = 0

            while len(ports) < count * block_size and attempts < max_retries:
                attempts += 1
                base_port = current_port

                current_port += 1
                if current_port + block_size - 1 > _PORT_MAX:
                    current_port = _PORT_MIN

                candidate_ports = list(range(base_port, base_port + block_size))

                if candidate_ports[-1] > _PORT_MAX:
                    continue

                if any(
                    port in allocated_ports or port in ports for port in candidate_ports
                ):
                    continue

                sockets: list[socket.socket] = []
                try:
                    for port in candidate_ports:
                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        sock.bind(("", port))
                        sockets.append(sock)
                except OSError:
                    for sock in sockets:
                        sock.close()
                    continue

                for sock in sockets:
                    sock.close()

                ports.extend(candidate_ports)
                timestamp = time.time()
                for port in candidate_ports:
                    registry[str(port)] = {
                        "timestamp": timestamp,
                        "caller_file": caller_file,
                        "caller_function": caller_function,
                        "caller_line": caller_line,
                    }

            if len(ports) < count * block_size:
                raise RuntimeError(
                    f"Could not find {count} contiguous port blocks of size {block_size} "
                    f"after {max_retries} retries"
                )

            _save_port_registry(registry)
            return ports

        finally:
            fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)


321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def allocate_port(start_port: int) -> int:
    """Find and return a single available port in i16 range.

    Args:
        start_port: Starting port number for allocation (required)

    Returns:
        int: An available port number between start_port and 32767 (i16 max)
    """
    return allocate_ports(1, start_port)[0]


def deallocate_ports(ports: list[int]) -> None:
    """Release previously allocated ports back to the pool.

    Args:
        ports: List of port numbers to release
    """
    if not ports:
        return

    # Ensure lock file exists
    _PORT_LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
    _PORT_LOCK_FILE.touch(exist_ok=True)

    with open(_PORT_LOCK_FILE, "r+") as lock_file:
        # Acquire exclusive lock
        fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)

        try:
            # Load registry
            registry = _load_port_registry()

            # Remove the specified ports
            for port in ports:
                registry.pop(str(port), None)

            # Save updated registry
            _save_port_registry(registry)

        finally:
            # Release lock
            fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)


def deallocate_port(port: int) -> None:
    """Release a previously allocated port back to the pool.

    Args:
        port: Port number to release
    """
    deallocate_ports([port])