kubernetes.py 5.37 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess
import sys
from pathlib import Path
from typing import List

21
22
PVC_ACCESS_POD_NAME = "pvc-access-pod"

23
24

def run_command(
25
    cmd: List[str], capture_output: bool = True, exit_on_error: bool = True
26
27
28
29
30
31
32
33
34
35
36
37
38
39
) -> subprocess.CompletedProcess:
    """Run a command and handle errors."""
    try:
        result = subprocess.run(
            cmd, capture_output=capture_output, text=True, check=True
        )
        return result
    except subprocess.CalledProcessError as e:
        print(f"ERROR: Command failed: {' '.join(cmd)}")
        print(f"Exit code: {e.returncode}")
        if e.stdout:
            print(f"STDOUT: {e.stdout}")
        if e.stderr:
            print(f"STDERR: {e.stderr}")
40
41
42
43
        if exit_on_error:
            sys.exit(1)
        else:
            raise
44
45
46
47
48
49
50
51
52


def check_kubectl_access(namespace: str) -> None:
    """Check if kubectl can access the specified namespace."""
    print(f"Checking kubectl access to namespace '{namespace}'...")
    run_command(["kubectl", "get", "pods", "-n", namespace], capture_output=True)
    print("✓ kubectl access confirmed")


53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
def ensure_clean_access_pod(namespace: str) -> str:
    """Ensure a clean PVC access pod deployment by deleting any existing pod first."""

    # Check if pod exists and delete it if it does
    try:
        result = subprocess.run(
            [
                "kubectl",
                "get",
                "pod",
                PVC_ACCESS_POD_NAME,
                "-n",
                namespace,
                "-o",
                "jsonpath={.metadata.name}",
            ],
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode == 0 and result.stdout.strip() == PVC_ACCESS_POD_NAME:
            print(f"Found existing access pod '{PVC_ACCESS_POD_NAME}', deleting it...")
            run_command(
                [
                    "kubectl",
                    "delete",
                    "pod",
                    PVC_ACCESS_POD_NAME,
                    "-n",
                    namespace,
                    "--ignore-not-found",
                ],
                capture_output=False,
                exit_on_error=False,
            )
            print("✓ Existing access pod deleted")
    except Exception:
        pass  # Pod doesn't exist, which is fine

    try:
        return deploy_access_pod(namespace)
    except Exception as e:
        print(f"Deployment failed: {e}")
        print(
            "Pod left running for debugging. Use 'kubectl delete pod pvc-access-pod -n <namespace>' to clean up manually."
        )
        raise


102
103
104
105
106
107
108
109
110
111
def deploy_access_pod(namespace: str) -> str:
    """Deploy the PVC access pod and return pod name."""

    # Check if pod already exists and is running
    try:
        result = subprocess.run(
            [
                "kubectl",
                "get",
                "pod",
112
                PVC_ACCESS_POD_NAME,
113
114
115
116
117
118
119
120
121
122
                "-n",
                namespace,
                "-o",
                "jsonpath={.status.phase}",
            ],
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode == 0 and result.stdout.strip() == "Running":
123
124
            print(f"✓ Access pod '{PVC_ACCESS_POD_NAME}' already running")
            return PVC_ACCESS_POD_NAME
125
    except Exception:
126
        pass  # Pod doesn't exist or isn't running
127

128
    print(f"Deploying access pod '{PVC_ACCESS_POD_NAME}' in namespace '{namespace}'...")
129

130
    pod_yaml_path = Path(__file__).parent / "manifests" / "pvc-access-pod.yaml"
131
132
133
134
135
136
137
138
139
140
    if not pod_yaml_path.exists():
        print(f"ERROR: Pod YAML not found at {pod_yaml_path}")
        sys.exit(1)

    run_command(
        ["kubectl", "apply", "-f", str(pod_yaml_path), "-n", namespace],
        capture_output=False,
    )

    print("Waiting for pod to be ready...")
141
142
143
144
145
146
147
148
149
150
151
    run_command(
        [
            "kubectl",
            "wait",
            f"pod/{PVC_ACCESS_POD_NAME}",
            "-n",
            namespace,
            "--for=condition=Ready",
            "--timeout=60s",
        ],
        capture_output=False,
152
        exit_on_error=False,
153
154
155
    )
    print("✓ Access pod is ready")
    return PVC_ACCESS_POD_NAME
156

157
158
159

def cleanup_access_pod(namespace: str) -> None:
    print("Cleaning up access pod...")
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    try:
        run_command(
            [
                "kubectl",
                "delete",
                "pod",
                PVC_ACCESS_POD_NAME,
                "-n",
                namespace,
                "--ignore-not-found",
            ],
            capture_output=False,
            exit_on_error=False,
        )
        print("✓ Access pod deleted")
    except Exception as e:
        print(f"Warning: Failed to clean up access pod: {e}")