#!/usr/bin/env python3 # SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 """Generate a dependency CSV (and optional attribution markdown) for Go modules. Extracts Go module dependencies from one or more Go module directories using ``go list -deps`` for names/versions and ``go-licenses report`` for SPDX license identifiers. Falls back gracefully when ``go-licenses`` is not installed. Optionally generates an attribution markdown file with full license texts. Usage: python generate_go_deps.py -o go_deps.csv python generate_go_deps.py --attributions ATTRIBUTIONS-Go.md python generate_go_deps.py --module-dirs deploy/operator -o operator_deps.csv -v """ import argparse import csv import io import json import logging import re import subprocess import sys from pathlib import Path _SCRIPT_DIR = Path(__file__).resolve().parent _REPO_ROOT = _SCRIPT_DIR.parent.parent log = logging.getLogger(__name__) FIELDNAMES = ["dependency_type", "package_name", "version", "spdx_license", "repo_url"] DEFAULT_MODULE_DIRS = [ "deploy/operator", "deploy/snapshot", ] DEFAULT_IGNORE_PREFIXES = "github.com/ai-dynamo/dynamo" # Transitive test/indirect deps that go-licenses requires to be fetched # before it can scan certain modules. Keyed by module directory (relative to # repo root). See deploy/snapshot/deps.md for background. _PREFETCH_DEPS: dict[str, list[str]] = { "deploy/snapshot": [ "github.com/opencontainers/runtime-spec/schema@v1.2.0", "github.com/evanphx/json-patch", "github.com/jessevdk/go-flags", ], } _LICENSE_NAMES = [ "LICENSE", "LICENSE.md", "LICENSE.txt", "LICENSE-MIT", "LICENSE-APACHE", "LICENCE", "LICENCE.md", "LICENCE.txt", "COPYING", "COPYING.md", ] _ATTRIBUTION_PREAMBLE = """\ # Third-Party Software Attributions This project uses the following third-party libraries. Each library is \ open-source and licensed under the terms indicated below. This file is automatically generated. Please do not edit it directly. ## Dependencies """ # Well-known Go module path -> repository URL mappings _WELLKNOWN_REPOS: dict[str, str] = { "google.golang.org/grpc": "https://github.com/grpc/grpc-go", "google.golang.org/protobuf": "https://github.com/protocolbuffers/protobuf-go", "google.golang.org/genproto": "https://github.com/googleapis/go-genproto", "google.golang.org/api": "https://github.com/googleapis/google-api-go-client", "google.golang.org/appengine": "https://github.com/golang/appengine", "cloud.google.com/go": "https://github.com/googleapis/google-cloud-go", } # Prefix-based mappings (checked in order) _PREFIX_REPOS: list[tuple[str, str]] = [ ("golang.org/x/", "https://github.com/golang/"), ("k8s.io/", "https://github.com/kubernetes/"), ("sigs.k8s.io/", "https://github.com/kubernetes-sigs/"), ("go.uber.org/", "https://github.com/uber-go/"), ("go.opentelemetry.io/", "https://github.com/open-telemetry/"), ("go.etcd.io/", "https://github.com/etcd-io/"), ] def _parse_json_stream(text: str) -> list[dict]: """Parse a stream of concatenated JSON objects (as emitted by ``go list -json``).""" decoder = json.JSONDecoder() results = [] idx = 0 length = len(text) while idx < length: # Skip whitespace while idx < length and text[idx] in " \t\n\r": idx += 1 if idx >= length: break obj, end = decoder.raw_decode(text, idx) results.append(obj) idx = end return results def prefetch_deps(module_dir: Path, go_cmd: str, deps: list[str]) -> None: """Run ``go get`` to fetch transitive deps that go-licenses needs present.""" cmd = [go_cmd, "get"] + deps log.debug("Pre-fetching in %s: %s", module_dir, " ".join(cmd)) result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, cwd=str(module_dir) ) if result.returncode != 0: log.warning( "go get pre-fetch failed (exit %d) in %s: %s", result.returncode, module_dir, result.stderr.strip(), ) else: log.info("Pre-fetched %d transitive deps in %s", len(deps), module_dir) def get_go_modules( module_dir: Path, go_cmd: str, packages: list[str] ) -> dict[str, str]: """Return a {module_path: version} map for modules actually imported by *packages*. Uses ``go list -deps`` to walk the transitive import graph so that modules which are in the module graph (``go list -m all``) but never imported are excluded. """ cmd = [ go_cmd, "list", "-deps", "-f", "{{if .Module}}{{.Module.Path}}\t{{.Module.Version}}{{end}}", ] + packages log.debug("Running in %s: %s ...", module_dir, " ".join(cmd[:6])) result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, cwd=str(module_dir) ) if result.returncode != 0: log.error( "go list -deps failed (exit %d) in %s: %s", result.returncode, module_dir, result.stderr, ) raise RuntimeError(f"go list -deps failed in {module_dir}: {result.stderr}") modules: dict[str, str] = {} for line in result.stdout.strip().splitlines(): if not line or "\t" not in line: continue path, version = line.split("\t", 1) if path and version: modules[path] = version return modules def resolve_go_packages(module_dir: Path, go_cmd: str) -> list[str]: """Resolve the list of Go packages in *module_dir*. Finds subdirectories containing ``.go`` files and runs ``go list`` on them, skipping stale/non-module directories (e.g. ``srcs/``, ``vendor/``). """ go_dirs: set[str] = set() for go_file in module_dir.rglob("*.go"): rel = go_file.parent.relative_to(module_dir) parts = rel.parts if parts and any( p in ("vendor", "srcs", "testdata") or "@" in p for p in parts ): continue go_dirs.add(str(rel)) if go_dirs: top_dirs = {d.split("/")[0] if "/" in d else d for d in go_dirs if d != "."} if "." in go_dirs: top_dirs.add(".") patterns = [f"./{d}/..." for d in sorted(top_dirs)] if top_dirs else ["./..."] else: patterns = ["./..."] list_result = subprocess.run( [go_cmd, "list"] + patterns, capture_output=True, text=True, timeout=300, cwd=str(module_dir), ) if list_result.returncode != 0: log.warning("go list failed in %s: %s", module_dir, list_result.stderr.strip()) return patterns packages = [p for p in list_result.stdout.strip().splitlines() if p] if not packages: return patterns log.debug("Resolved %d packages in %s", len(packages), module_dir) return packages def get_go_licenses( module_dir: Path, go_licenses_cmd: str, ignore_prefix: str, packages: list[str] ) -> dict[str, tuple[str, str]]: """Run ``go-licenses report`` and return a {package_path: (license_url, spdx)} map.""" cmd = [go_licenses_cmd, "report"] + packages if ignore_prefix: cmd.extend(["--ignore", ignore_prefix]) log.debug("Running in %s: %s", module_dir, " ".join(cmd[:5]) + " ...") result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, cwd=str(module_dir) ) if result.returncode != 0: log.warning( "go-licenses failed (exit %d) in %s: %s", result.returncode, module_dir, result.stderr.strip(), ) raise RuntimeError(f"go-licenses failed in {module_dir}") licenses = {} reader = csv.reader(io.StringIO(result.stdout)) for row in reader: if len(row) < 3: continue pkg_path, license_url, spdx = row[0], row[1], row[2] licenses[pkg_path] = (license_url, spdx) return licenses def find_license_for_module( module_path: str, licenses: dict[str, tuple[str, str]] ) -> tuple[str, str]: """Find the SPDX license and license URL for a module using longest prefix match. Returns (license_url, spdx_license) or ("", "UNKNOWN") if not found. """ best_match = "" best_value = ("", "UNKNOWN") for pkg_path, (license_url, spdx) in licenses.items(): # Check if the package path starts with the module path if pkg_path == module_path or pkg_path.startswith(module_path + "/"): if len(module_path) > len(best_match): best_match = module_path best_value = (license_url, spdx) # Also check if the module path starts with the package path # (for cases where go-licenses reports a parent package) elif module_path.startswith(pkg_path + "/") or module_path == pkg_path: if len(pkg_path) > len(best_match): best_match = pkg_path best_value = (license_url, spdx) return best_value def _strip_version_suffix(module_path: str) -> str: """Strip Go major version suffix like /v2, /v3, etc.""" return re.sub(r"/v\d+$", "", module_path) def derive_repo_url(module_path: str, license_url: str = "") -> str: """Derive a repository URL from a Go module path and optional license URL. Tries to extract the repo URL from the license URL first (most reliable), then falls back to heuristics based on the module path. """ # Try to extract repo URL from license URL (e.g., https://github.com/X/Y/blob/vN/LICENSE) if license_url: m = re.match(r"(https://github\.com/[^/]+/[^/]+)", license_url) if m: return m.group(1) m = re.match(r"(https://gitlab\.com/[^/]+/[^/]+)", license_url) if m: return m.group(1) clean_path = _strip_version_suffix(module_path) # Check well-known exact mappings for prefix, url in _WELLKNOWN_REPOS.items(): if clean_path == prefix or clean_path.startswith(prefix + "/"): return url # Check prefix-based mappings for prefix, url_base in _PREFIX_REPOS: if clean_path.startswith(prefix): # Extract the first path component after the prefix remainder = clean_path[len(prefix) :] name = remainder.split("/")[0] return url_base + name # github.com/X/Y/... -> https://github.com/X/Y if clean_path.startswith("github.com/"): parts = clean_path.split("/") if len(parts) >= 3: return f"https://github.com/{parts[1]}/{parts[2]}" # gitlab.com/X/Y/... -> https://gitlab.com/X/Y if clean_path.startswith("gitlab.com/"): parts = clean_path.split("/") if len(parts) >= 3: return f"https://gitlab.com/{parts[1]}/{parts[2]}" # gopkg.in/X.vN -> https://github.com/go-X/X (single element) # gopkg.in/USER/X.vN -> https://github.com/USER/X (two elements) if clean_path.startswith("gopkg.in/"): remainder = clean_path[len("gopkg.in/") :] # Strip .vN suffix remainder = re.sub(r"\.v\d+$", "", remainder) parts = remainder.split("/") if len(parts) == 1: return f"https://github.com/go-{parts[0]}/{parts[0]}" elif len(parts) >= 2: return f"https://github.com/{parts[0]}/{parts[1]}" # Fallback: link to pkg.go.dev return f"https://pkg.go.dev/{module_path}" def deduplicate(entries: list[dict[str, str]]) -> list[dict[str, str]]: """Deduplicate entries by (package_name, version).""" seen: dict[tuple[str, str], dict[str, str]] = {} for entry in entries: key = (entry["package_name"], entry["version"]) if key not in seen: seen[key] = entry return list(seen.values()) def get_module_cache_dirs(module_dir: Path, go_cmd: str) -> dict[str, str]: """Run ``go list -m -json all`` and return a {module_path: dir} map. Only includes modules that have a local ``Dir`` (i.e. are cached). """ cmd = [go_cmd, "list", "-m", "-json", "all"] log.debug("Fetching module cache dirs in %s ...", module_dir) result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, cwd=str(module_dir) ) if result.returncode != 0: log.warning("go list -m -json all failed: %s", result.stderr.strip()) return {} dirs: dict[str, str] = {} for obj in _parse_json_stream(result.stdout): path = obj.get("Path", "") mod_dir = obj.get("Dir", "") if path and mod_dir and not obj.get("Main"): dirs[path] = mod_dir return dirs def _find_license_text(directory: Path) -> str: """Find and read a license file in *directory*.""" for name in _LICENSE_NAMES: path = directory / name if path.is_file(): return path.read_text(errors="replace").rstrip() return "" def write_attributions_md( packages: list[dict[str, str]], output_path: str, module_dirs_map: dict[str, str], ) -> None: """Write a markdown attribution file following the ATTRIBUTIONS-Go.md style.""" sorted_packages = sorted( packages, key=lambda p: (p["package_name"].lower(), p["version"]) ) lines = [_ATTRIBUTION_PREAMBLE] found = 0 for pkg in sorted_packages: mod_dir = module_dirs_map.get(pkg["package_name"], "") license_text = _find_license_text(Path(mod_dir)) if mod_dir else "" lines.append(f"### {pkg['package_name']}") lines.append("") lines.append(f"License Identifier: {pkg['spdx_license']}") lines.append("License Text:") if license_text: lines.append(f"```\n{license_text}\n```") found += 1 else: lines.append("```\nLicense text not available locally.\n```") lines.append("") with open(output_path, "w") as f: f.write("\n".join(lines) + "\n") log.info( "Wrote attribution markdown to %s (%d/%d with license text)", output_path, found, len(sorted_packages), ) def write_csv(packages: list[dict[str, str]], output_path: str | None) -> None: """Write packages to CSV, sorted by package_name.""" sorted_packages = sorted(packages, key=lambda p: p["package_name"]) if output_path: with open(output_path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=FIELDNAMES) writer.writeheader() writer.writerows(sorted_packages) log.info("Wrote %d entries to %s", len(sorted_packages), output_path) else: writer = csv.DictWriter(sys.stdout, fieldnames=FIELDNAMES) writer.writeheader() writer.writerows(sorted_packages) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Generate a dependency CSV for Go modules", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s -o go_deps.csv %(prog)s --module-dirs deploy/operator -o operator_deps.csv %(prog)s -v """, ) parser.add_argument( "--output", "-o", help="Output CSV file path (default: stdout)", ) parser.add_argument( "--attributions", help="Output attribution markdown file path (e.g. ATTRIBUTIONS-Go.md)", ) parser.add_argument( "--module-dirs", default=",".join(DEFAULT_MODULE_DIRS), help=( "Comma-separated Go module directories relative to repo root " f"(default: {','.join(DEFAULT_MODULE_DIRS)})" ), ) parser.add_argument( "--go-cmd", default="go", help="Path to go binary (default: go)", ) parser.add_argument( "--go-licenses-cmd", default="go-licenses", help="Path to go-licenses binary (default: go-licenses)", ) parser.add_argument( "--ignore-prefixes", default=DEFAULT_IGNORE_PREFIXES, help=( "Comma-separated module prefixes to exclude from license scan " f"(default: {DEFAULT_IGNORE_PREFIXES})" ), ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging", ) return parser.parse_args() def main() -> None: args = parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(levelname)s: %(message)s", stream=sys.stderr, ) module_dirs = [d.strip() for d in args.module_dirs.split(",") if d.strip()] ignore_prefix = args.ignore_prefixes.strip() all_entries: list[dict[str, str]] = [] for rel_dir in module_dirs: abs_dir = _REPO_ROOT / rel_dir if not (abs_dir / "go.mod").is_file(): log.error("go.mod not found in %s", abs_dir) sys.exit(1) # Resolve packages (skips stale dirs like srcs/) packages = resolve_go_packages(abs_dir, args.go_cmd) # Pre-fetch transitive deps that go-licenses requires prefetch = _PREFETCH_DEPS.get(rel_dir, []) if prefetch: prefetch_deps(abs_dir, args.go_cmd, prefetch) # Pass 1: get only transitively-imported modules log.info("Listing imported modules in %s ...", rel_dir) try: modules = get_go_modules(abs_dir, args.go_cmd, packages) except (RuntimeError, FileNotFoundError) as exc: log.error("Failed to list modules in %s: %s", rel_dir, exc) sys.exit(1) log.info("Found %d imported modules in %s", len(modules), rel_dir) # Pass 2: get license info via go-licenses licenses: dict[str, tuple[str, str]] = {} try: licenses = get_go_licenses( abs_dir, args.go_licenses_cmd, ignore_prefix, packages ) log.info("Found %d license entries in %s", len(licenses), rel_dir) except FileNotFoundError: log.warning( "go-licenses not found on PATH. Install with: " "go install github.com/google/go-licenses@v1.6.0" ) log.warning("License info will be UNKNOWN for all modules.") except RuntimeError: log.warning( "go-licenses failed for %s, license info will be UNKNOWN.", rel_dir ) # Merge: build entries for each module for mod_path, version in modules.items(): license_url, spdx = find_license_for_module(mod_path, licenses) repo_url = derive_repo_url(mod_path, license_url) all_entries.append( { "dependency_type": "go", "package_name": mod_path, "version": version, "spdx_license": spdx, "repo_url": repo_url, } ) deduplicated = deduplicate(all_entries) log.info("Total unique Go dependencies: %d", len(deduplicated)) if args.output or not args.attributions: write_csv(deduplicated, args.output) if args.attributions: # Fetch local cache dirs from each Go module directory and merge. unique_mods = {e["package_name"] for e in deduplicated} module_dirs_map: dict[str, str] = {} for rel_dir in module_dirs: abs_dir = _REPO_ROOT / rel_dir partial = get_module_cache_dirs(abs_dir, args.go_cmd) # Only keep modules we actually need for mod_path, mod_dir in partial.items(): if mod_path in unique_mods and mod_path not in module_dirs_map: module_dirs_map[mod_path] = mod_dir log.info( "Resolved %d/%d module cache dirs", len(module_dirs_map), len(unique_mods) ) write_attributions_md(deduplicated, args.attributions, module_dirs_map) if __name__ == "__main__": main()