kubernetes.py 5.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import os
17
18
19
20
21
import subprocess
import sys
from pathlib import Path
from typing import List

22
23
PVC_ACCESS_POD_NAME = "pvc-access-pod"

24
25
26
27
28
29
30
31
K8S_SA_TOKEN = Path("/var/run/secrets/kubernetes.io/serviceaccount/token")


def is_running_in_cluster() -> bool:
    """Return True if running inside a Kubernetes cluster."""
    # Prefer well-known env var; fall back to SA token presence
    return bool(os.environ.get("KUBERNETES_SERVICE_HOST")) or K8S_SA_TOKEN.exists()

32
33

def run_command(
34
    cmd: List[str], capture_output: bool = True, exit_on_error: bool = True
35
36
37
38
39
40
41
42
43
44
45
46
47
48
) -> subprocess.CompletedProcess:
    """Run a command and handle errors."""
    try:
        result = subprocess.run(
            cmd, capture_output=capture_output, text=True, check=True
        )
        return result
    except subprocess.CalledProcessError as e:
        print(f"ERROR: Command failed: {' '.join(cmd)}")
        print(f"Exit code: {e.returncode}")
        if e.stdout:
            print(f"STDOUT: {e.stdout}")
        if e.stderr:
            print(f"STDERR: {e.stderr}")
49
50
51
52
        if exit_on_error:
            sys.exit(1)
        else:
            raise
53
54
55
56
57
58
59
60
61


def check_kubectl_access(namespace: str) -> None:
    """Check if kubectl can access the specified namespace."""
    print(f"Checking kubectl access to namespace '{namespace}'...")
    run_command(["kubectl", "get", "pods", "-n", namespace], capture_output=True)
    print("✓ kubectl access confirmed")


62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
def ensure_clean_access_pod(namespace: str) -> str:
    """Ensure a clean PVC access pod deployment by deleting any existing pod first."""

    # Check if pod exists and delete it if it does
    try:
        result = subprocess.run(
            [
                "kubectl",
                "get",
                "pod",
                PVC_ACCESS_POD_NAME,
                "-n",
                namespace,
                "-o",
                "jsonpath={.metadata.name}",
            ],
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode == 0 and result.stdout.strip() == PVC_ACCESS_POD_NAME:
            print(f"Found existing access pod '{PVC_ACCESS_POD_NAME}', deleting it...")
            run_command(
                [
                    "kubectl",
                    "delete",
                    "pod",
                    PVC_ACCESS_POD_NAME,
                    "-n",
                    namespace,
                    "--ignore-not-found",
                ],
                capture_output=False,
                exit_on_error=False,
            )
            print("✓ Existing access pod deleted")
    except Exception:
        pass  # Pod doesn't exist, which is fine

    try:
        return deploy_access_pod(namespace)
    except Exception as e:
        print(f"Deployment failed: {e}")
        print(
            "Pod left running for debugging. Use 'kubectl delete pod pvc-access-pod -n <namespace>' to clean up manually."
        )
        raise


111
112
113
114
115
116
117
118
119
120
def deploy_access_pod(namespace: str) -> str:
    """Deploy the PVC access pod and return pod name."""

    # Check if pod already exists and is running
    try:
        result = subprocess.run(
            [
                "kubectl",
                "get",
                "pod",
121
                PVC_ACCESS_POD_NAME,
122
123
124
125
126
127
128
129
130
131
                "-n",
                namespace,
                "-o",
                "jsonpath={.status.phase}",
            ],
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode == 0 and result.stdout.strip() == "Running":
132
133
            print(f"✓ Access pod '{PVC_ACCESS_POD_NAME}' already running")
            return PVC_ACCESS_POD_NAME
134
    except Exception:
135
        pass  # Pod doesn't exist or isn't running
136

137
    print(f"Deploying access pod '{PVC_ACCESS_POD_NAME}' in namespace '{namespace}'...")
138

139
    pod_yaml_path = Path(__file__).parent / "manifests" / "pvc-access-pod.yaml"
140
141
142
143
144
145
146
147
148
149
    if not pod_yaml_path.exists():
        print(f"ERROR: Pod YAML not found at {pod_yaml_path}")
        sys.exit(1)

    run_command(
        ["kubectl", "apply", "-f", str(pod_yaml_path), "-n", namespace],
        capture_output=False,
    )

    print("Waiting for pod to be ready...")
150
151
152
153
154
155
156
157
158
159
160
    run_command(
        [
            "kubectl",
            "wait",
            f"pod/{PVC_ACCESS_POD_NAME}",
            "-n",
            namespace,
            "--for=condition=Ready",
            "--timeout=60s",
        ],
        capture_output=False,
161
        exit_on_error=False,
162
163
164
    )
    print("✓ Access pod is ready")
    return PVC_ACCESS_POD_NAME
165

166
167
168

def cleanup_access_pod(namespace: str) -> None:
    print("Cleaning up access pod...")
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
    try:
        run_command(
            [
                "kubectl",
                "delete",
                "pod",
                PVC_ACCESS_POD_NAME,
                "-n",
                namespace,
                "--ignore-not-found",
            ],
            capture_output=False,
            exit_on_error=False,
        )
        print("✓ Access pod deleted")
    except Exception as e:
        print(f"Warning: Failed to clean up access pod: {e}")