#!/usr/bin/env python3 # SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 """Convert BuildKit TSV extraction output to attribution CSV files. Reads dpkg.tsv and python.tsv from a target extraction directory, writes a sorted CSV, and optionally computes a diff against a base extraction directory. Usage: python process_results.py --target-dir --output attribution.csv python process_results.py --target-dir --base-dir --output attribution.csv # Produces: attribution.csv and attribution_diff.csv """ import argparse import csv import sys from pathlib import Path def read_tsv(tsv_path: Path, pkg_type: str) -> list[dict[str, str]]: """Parse a tab-separated extraction output file into package dicts.""" packages = [] if not tsv_path.is_file(): return packages for line in tsv_path.read_text(errors="replace").strip().splitlines(): parts = line.split("\t", 2) if len(parts) != 3: continue pkg_name, version, spdx_license = parts packages.append( { "package_name": pkg_name, "version": version, "type": pkg_type, "spdx_license": spdx_license, } ) return packages def read_extraction_dir(directory: Path) -> list[dict[str, str]]: """Read dpkg.tsv and python.tsv from an extraction directory.""" packages = read_tsv(directory / "dpkg.tsv", "dpkg") packages += read_tsv(directory / "python.tsv", "python") return packages def compute_diff( target_packages: list[dict[str, str]], base_packages: list[dict[str, str]], ) -> list[dict[str, str]]: """Return packages in target that are new or have a different version vs base.""" base_lookup = { (pkg["package_name"], pkg["type"]): pkg["version"] for pkg in base_packages } return [ pkg for pkg in target_packages if base_lookup.get((pkg["package_name"], pkg["type"])) != pkg["version"] ] def write_csv(packages: list[dict[str, str]], output_path: Path | None) -> None: """Write packages to CSV sorted by (type, package_name).""" sorted_packages = sorted(packages, key=lambda p: (p["type"], p["package_name"])) fieldnames = ["package_name", "version", "type", "spdx_license"] if output_path: with open(output_path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(sorted_packages) print(f"Wrote {len(sorted_packages)} entries to {output_path}", file=sys.stderr) else: writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames) writer.writeheader() writer.writerows(sorted_packages) def main() -> None: parser = argparse.ArgumentParser( description="Convert BuildKit TSV extraction output to attribution CSV", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s --target-dir ./output --output attribution.csv %(prog)s --target-dir ./output --base-dir ./base-output --output attribution.csv """, ) parser.add_argument( "--target-dir", required=True, help="Directory containing dpkg.tsv and python.tsv from the target image extraction", ) parser.add_argument( "--base-dir", help="Directory containing dpkg.tsv and python.tsv from the base image extraction (enables diff output)", ) parser.add_argument( "--output", "-o", help="Output CSV file path (default: stdout)", ) args = parser.parse_args() target_dir = Path(args.target_dir) if not target_dir.is_dir(): print(f"ERROR: --target-dir does not exist: {target_dir}", file=sys.stderr) sys.exit(1) target_packages = read_extraction_dir(target_dir) if not target_packages: print(f"ERROR: no packages found in {target_dir}", file=sys.stderr) sys.exit(1) output_path = Path(args.output) if args.output else None write_csv(target_packages, output_path) if args.base_dir: base_dir = Path(args.base_dir) if not base_dir.is_dir(): print(f"ERROR: --base-dir does not exist: {base_dir}", file=sys.stderr) sys.exit(1) base_packages = read_extraction_dir(base_dir) diff_packages = compute_diff(target_packages, base_packages) print( f"Diff: {len(diff_packages)} new/changed packages vs base", file=sys.stderr ) if output_path: diff_path = output_path.with_stem(output_path.stem + "_diff") write_csv(diff_packages, diff_path) else: print("\n# --- DIFF (new/changed packages vs base) ---", file=sys.stderr) write_csv(diff_packages, None) if __name__ == "__main__": main()