metrics.py 4.62 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Prometheus metrics capture for k8s sweeps.

Captures pre/post frontend /metrics snapshots for delta analysis.
Supports both direct HTTP (when endpoint is reachable) and kubectl-exec
(when only in-cluster DNS is available).
"""

from __future__ import annotations

import shlex
import subprocess
import time
import urllib.request
from pathlib import Path
from typing import Optional


def capture_metrics(
    endpoint: str,
    dest: Path,
    namespace: Optional[str] = None,
    pod_label: Optional[str] = None,
) -> None:
    """Capture frontend /metrics to a file.

    Tries direct HTTP first. If that fails and namespace + pod_label are
    provided, falls back to kubectl exec curl from the frontend pod.

    Args:
        endpoint: Frontend endpoint (host:port) -- may be in-cluster DNS.
        dest: Destination file path.
        namespace: K8s namespace (for kubectl exec fallback).
        pod_label: Pod label selector (for kubectl exec fallback).
    """
    dest.parent.mkdir(parents=True, exist_ok=True)

    # Try direct HTTP first (works if port-forwarded or on same network)
    body = _try_http(endpoint)

    # Fallback: kubectl exec into the frontend pod to curl metrics
    if body is None and namespace and pod_label:
        body = _try_kubectl_exec(endpoint, namespace, pod_label)

    # Fallback 2: kubectl run a temporary pod to curl
    if body is None and namespace:
        body = _try_kubectl_run(endpoint, namespace)

    if body and body.strip():
        dest.write_text(body)
        line_count = len(body.strip().split("\n"))
        print(f"  Metrics captured -> {dest.name} ({line_count} lines)")
    else:
        msg = f"# metrics capture failed at {time.strftime('%Y-%m-%dT%H:%M:%S')}\n"
        dest.write_text(msg)
        print(f"  WARNING: could not capture metrics from {endpoint}")


def _try_http(endpoint: str) -> Optional[str]:
    """Try fetching metrics via direct HTTP."""
    try:
        req = urllib.request.Request(f"http://{endpoint}/metrics")
        with urllib.request.urlopen(req, timeout=10) as resp:
            return resp.read().decode()
    except Exception:
        return None


def _try_kubectl_exec(
    endpoint: str,
    namespace: str,
    pod_label: str,
) -> Optional[str]:
    """Fetch metrics by exec-ing curl inside a running pod."""
    try:
        # Get a pod name from the label selector
        result = subprocess.run(
            [
                "kubectl",
                "-n",
                namespace,
                "get",
                "pod",
                "-l",
                pod_label,
                "-o",
                "jsonpath={.items[0].metadata.name}",
            ],
            capture_output=True,
            text=True,
            timeout=10,
        )
        pod_name = result.stdout.strip()
        if not pod_name:
            return None

        # Exec curl inside the pod (curl may not be available; try wget too)
        safe_endpoint = shlex.quote(endpoint)
        result = subprocess.run(
            [
                "kubectl",
                "-n",
                namespace,
                "exec",
                pod_name,
                "--",
                "sh",
                "-c",
                f"curl -sf http://{safe_endpoint}/metrics 2>/dev/null || "
                f"wget -qO- http://{safe_endpoint}/metrics 2>/dev/null || "
                f'python3 -c "import urllib.request,sys; print(urllib.request.urlopen(sys.argv[1]).read().decode())" http://{safe_endpoint}/metrics 2>/dev/null',
            ],
            capture_output=True,
            text=True,
            timeout=15,
        )
        if result.returncode == 0 and result.stdout.strip():
            return result.stdout
    except Exception:
        pass
    return None


def _try_kubectl_run(endpoint: str, namespace: str) -> Optional[str]:
    """Fetch metrics via a one-shot kubectl run --rm pod."""
    try:
        result = subprocess.run(
            [
                "kubectl",
                "run",
                "metrics-fetch",
                "--rm",
                "-i",
                "--restart=Never",
                "-n",
                namespace,
                "--image=curlimages/curl:latest",
                "--",
                "-sf",
                f"http://{endpoint}/metrics",
            ],
            capture_output=True,
            text=True,
            timeout=30,
        )
        if result.returncode == 0 and result.stdout.strip():
            return result.stdout
    except Exception:
        pass
    return None