process_results.py 4.85 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Convert BuildKit TSV extraction output to attribution CSV files.

Reads dpkg.tsv and python.tsv from a target extraction directory, writes a
sorted CSV, and optionally computes a diff against a base extraction directory.

Usage:
    python process_results.py --target-dir <dir> --output attribution.csv
    python process_results.py --target-dir <dir> --base-dir <dir> --output attribution.csv
    # Produces: attribution.csv and attribution_diff.csv
"""

import argparse
import csv
import sys
from pathlib import Path


def read_tsv(tsv_path: Path, pkg_type: str) -> list[dict[str, str]]:
    """Parse a tab-separated extraction output file into package dicts."""
    packages = []
    if not tsv_path.is_file():
        return packages
    for line in tsv_path.read_text(errors="replace").strip().splitlines():
        parts = line.split("\t", 2)
        if len(parts) != 3:
            continue
        pkg_name, version, spdx_license = parts
        packages.append(
            {
                "package_name": pkg_name,
                "version": version,
                "type": pkg_type,
                "spdx_license": spdx_license,
            }
        )
    return packages


def read_extraction_dir(directory: Path) -> list[dict[str, str]]:
    """Read dpkg.tsv and python.tsv from an extraction directory."""
    packages = read_tsv(directory / "dpkg.tsv", "dpkg")
    packages += read_tsv(directory / "python.tsv", "python")
    return packages


def compute_diff(
    target_packages: list[dict[str, str]],
    base_packages: list[dict[str, str]],
) -> list[dict[str, str]]:
    """Return packages in target that are new or have a different version vs base."""
    base_lookup = {
        (pkg["package_name"], pkg["type"]): pkg["version"] for pkg in base_packages
    }
    return [
        pkg
        for pkg in target_packages
        if base_lookup.get((pkg["package_name"], pkg["type"])) != pkg["version"]
    ]


def write_csv(packages: list[dict[str, str]], output_path: Path | None) -> None:
    """Write packages to CSV sorted by (type, package_name)."""
    sorted_packages = sorted(packages, key=lambda p: (p["type"], p["package_name"]))
    fieldnames = ["package_name", "version", "type", "spdx_license"]
    if output_path:
        with open(output_path, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(sorted_packages)
        print(f"Wrote {len(sorted_packages)} entries to {output_path}", file=sys.stderr)
    else:
        writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(sorted_packages)


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Convert BuildKit TSV extraction output to attribution CSV",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s --target-dir ./output --output attribution.csv
  %(prog)s --target-dir ./output --base-dir ./base-output --output attribution.csv
        """,
    )
    parser.add_argument(
        "--target-dir",
        required=True,
        help="Directory containing dpkg.tsv and python.tsv from the target image extraction",
    )
    parser.add_argument(
        "--base-dir",
        help="Directory containing dpkg.tsv and python.tsv from the base image extraction (enables diff output)",
    )
    parser.add_argument(
        "--output",
        "-o",
        help="Output CSV file path (default: stdout)",
    )
    args = parser.parse_args()

    target_dir = Path(args.target_dir)
    if not target_dir.is_dir():
        print(f"ERROR: --target-dir does not exist: {target_dir}", file=sys.stderr)
        sys.exit(1)

    target_packages = read_extraction_dir(target_dir)
    if not target_packages:
        print(f"ERROR: no packages found in {target_dir}", file=sys.stderr)
        sys.exit(1)

    output_path = Path(args.output) if args.output else None
    write_csv(target_packages, output_path)

    if args.base_dir:
        base_dir = Path(args.base_dir)
        if not base_dir.is_dir():
            print(f"ERROR: --base-dir does not exist: {base_dir}", file=sys.stderr)
            sys.exit(1)
        base_packages = read_extraction_dir(base_dir)
        diff_packages = compute_diff(target_packages, base_packages)
        print(
            f"Diff: {len(diff_packages)} new/changed packages vs base", file=sys.stderr
        )

        if output_path:
            diff_path = output_path.with_stem(output_path.stem + "_diff")
            write_csv(diff_packages, diff_path)
        else:
            print("\n# --- DIFF (new/changed packages vs base) ---", file=sys.stderr)
            write_csv(diff_packages, None)


if __name__ == "__main__":
    main()