kubectl.py 4.94 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Safe kubectl subprocess helpers.

All k8s interactions go through this module for consistent error handling
and namespace scoping.
"""

from __future__ import annotations

import json
import subprocess
import time
from typing import Any, Dict, List, Optional


def run_kubectl(
    args: List[str],
    namespace: Optional[str] = None,
    capture: bool = True,
    check: bool = True,
    timeout: int = 60,
    input_data: Optional[str] = None,
) -> subprocess.CompletedProcess:
    """Run a kubectl command with namespace scoping and error handling.

    Args:
        args: kubectl arguments (e.g., ["get", "pods"]).
        namespace: K8s namespace (prepended as -n <namespace>).
        capture: Whether to capture stdout/stderr.
        check: Whether to raise on non-zero exit.
        timeout: Command timeout in seconds.
        input_data: Optional stdin input.

    Returns:
        CompletedProcess result.
    """
    cmd = ["kubectl"]
    if namespace:
        cmd.extend(["-n", namespace])
    cmd.extend(args)

    result = subprocess.run(
        cmd,
        capture_output=capture,
        text=True,
        check=False,
        timeout=timeout,
        input=input_data,
    )
    if check and result.returncode != 0:
        stderr = result.stderr.strip() if result.stderr else ""
        print(f"  kubectl error (rc={result.returncode}): {' '.join(args[:4])}")
        if stderr:
            print(f"    {stderr}")
        result.check_returncode()
    return result


def get_json(
    resource: str,
    name: str,
    namespace: str,
    timeout: int = 30,
) -> Dict[str, Any]:
    """Get a k8s resource as a parsed JSON dict."""
    result = run_kubectl(
        ["get", resource, name, "-o", "json"],
        namespace=namespace,
        timeout=timeout,
    )
    return json.loads(result.stdout)


def patch_json(
    resource: str,
    name: str,
    namespace: str,
    patch: List[Dict[str, Any]],
    timeout: int = 30,
) -> None:
    """Apply a JSON patch to a k8s resource."""
    patch_str = json.dumps(patch)
    run_kubectl(
        ["patch", resource, name, "--type=json", f"-p={patch_str}"],
        namespace=namespace,
        timeout=timeout,
    )


def patch_merge(
    resource: str,
    name: str,
    namespace: str,
    patch: Dict[str, Any],
    timeout: int = 30,
) -> None:
    """Apply a strategic merge patch to a k8s resource."""
    patch_str = json.dumps(patch)
    run_kubectl(
        ["patch", resource, name, "--type=merge", f"-p={patch_str}"],
        namespace=namespace,
        timeout=timeout,
    )


def wait_pod(
    label_selector: str,
    namespace: str,
    condition: str = "Ready",
    timeout: int = 300,
) -> None:
    """Wait for pod(s) matching a label selector to reach a condition."""
    run_kubectl(
        [
            "wait",
            "pod",
            "-l",
            label_selector,
            f"--for=condition={condition}",
            f"--timeout={timeout}s",
        ],
        namespace=namespace,
        timeout=timeout + 10,
    )


def delete_pod(
    name: str,
    namespace: str,
    grace_period: int = 5,
) -> None:
    """Delete a pod by name."""
    run_kubectl(
        ["delete", "pod", name, f"--grace-period={grace_period}"],
        namespace=namespace,
        check=False,
    )


def get_pod_name(
    label_selector: str,
    namespace: str,
) -> Optional[str]:
    """Get the name of the first pod matching a label selector."""
    result = run_kubectl(
        [
            "get",
            "pod",
            "-l",
            label_selector,
            "-o",
            "jsonpath={.items[0].metadata.name}",
        ],
        namespace=namespace,
        check=False,
    )
    name = result.stdout.strip()
    return name if name else None


def pod_exists(name: str, namespace: str) -> bool:
    """Check if a pod exists."""
    result = run_kubectl(
        ["get", "pod", name],
        namespace=namespace,
        check=False,
    )
    return result.returncode == 0


def apply_yaml(yaml_content: str, namespace: str) -> None:
    """Apply YAML content via kubectl apply -f -."""
    run_kubectl(
        ["apply", "-f", "-"],
        namespace=namespace,
        input_data=yaml_content,
    )


def apply_secret_literal(name: str, namespace: str, key: str, value: str) -> None:
    """Create or update an opaque Secret from a literal value."""
    secret_yaml = f"""apiVersion: v1
kind: Secret
metadata:
  name: {name}
type: Opaque
stringData:
  {key}: {json.dumps(value)}
"""
    apply_yaml(secret_yaml, namespace)


def wait_for_pod_deletion(
    name: str,
    namespace: str,
    timeout: int = 120,
) -> None:
    """Wait for a pod to be deleted."""
    waited = 0
    while pod_exists(name, namespace):
        time.sleep(5)
        waited += 5
        if waited >= timeout:
            print(f"  WARNING: pod {name} still present after {timeout}s")
            break