#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Plot achieved vs. offered throughput from mooncake_bench sweep data.
Uses the Dynamo dark Plotly template (design_tokens.yaml + plotly_dynamo.py).
Usage:
python3 gen_throughput.py ../data/sweep_plot.json
"""
from __future__ import annotations
import argparse
import json
import math
import sys
import warnings
from pathlib import Path
import plotly.graph_objects as go
sys.path.insert(0, str(Path(__file__).parent))
from plotly_dynamo import build_template, load_tokens
BACKENDS = {
"nested-map": {
"label": "Concurrent Positional Indexer (Flash Indexer)",
"color": "#76b900", # Dynamo green -- best performer
"version": "Dynamo v1.0.0",
},
"concurrent-radix-tree": {
"label": "Concurrent Radix Tree",
"color": "#008564", # Emerald -- strong second
"version": "Dynamo v1.0.0",
},
"radix-tree": {
"label": "Radix Tree",
"color": "#fac200", # Fluorite -- middle
"version": "Dynamo v0.1.0",
},
"inverted-index": {
"label": "Inverted Index",
"color": "#969696", # Silver Gray (7.1:1 on black) -- weak
"version": "Naive",
},
"naive-nested-map": {
"label": "Naive Nested Map",
"color": "#767676", # Muted Gray (4.6:1 on black) -- worst
"version": "Naive",
},
}
PLOT_ORDER = list(BACKENDS.keys())
def main():
parser = argparse.ArgumentParser(description=__doc__)
images_dir = Path(__file__).resolve().parent.parent / "images"
default_out = str(images_dir / "fig-6-indexer-throughput.png")
parser.add_argument("json_path", help="Path to sweep_plot.json")
parser.add_argument(
"-o", "--output", default=default_out, help="Output path (.png, .svg, .pdf)"
)
parser.add_argument("--width", type=int, default=775, help="Figure width in px")
parser.add_argument("--height", type=int, default=650, help="Figure height in px")
args = parser.parse_args()
with open(args.json_path) as f:
data = json.load(f)
tokens = load_tokens(Path(__file__).parent / "design_tokens.yaml")
template = build_template(tokens)
series_colors = tokens["colors"]["chart_series"]
border_subtle = tokens["colors"]["border"]["subtle"]
ordered = [(k, data[k]) for k in PLOT_ORDER if k in data]
for k in data:
if k not in PLOT_ORDER:
ordered.append((k, data[k]))
fig = go.Figure()
axis_min = 1e5 # 100k fixed floor
x_max = 2e9 # 2.0G
y_max = 5e8 # 500M
fig.add_trace(
go.Scatter(
x=[axis_min, x_max],
y=[axis_min, x_max],
mode="lines",
line=dict(color=border_subtle, width=1, dash="dash"),
showlegend=False,
hoverinfo="skip",
)
)
for i, (name, steps) in enumerate(ordered):
meta = BACKENDS.get(name, {})
color = meta.get("color", series_colors[i % len(series_colors)])
display = meta.get("label", name.replace("-", " ").title())
offered = [s["offered_block_throughput"] for s in steps]
achieved = [s["block_throughput"] for s in steps]
fig.add_trace(
go.Scatter(
x=offered,
y=achieved,
mode="lines+markers",
name=display,
line=dict(color=color, width=2.5),
marker=dict(size=7, color=color),
hovertemplate=(
f"{display}
"
"Offered: %{x:.2s} ops/s
"
"Achieved: %{y:.2s} ops/s"
""
),
)
)
peak_annotations = []
peak_shapes = []
peak_values = {}
label_x = 1e9 # 1.0G mark
peak_header_added = False
for name, steps in ordered:
meta = BACKENDS.get(name, {})
color = meta.get("color", "#ffffff")
version = meta.get("version", "")
best = max(steps, key=lambda s: s["block_throughput"])
peak_y = best["block_throughput"]
peak_x = best["offered_block_throughput"]
if not peak_header_added:
peak_annotations.append(
dict(
x=_log10(label_x),
y=_log10(peak_y),
xref="x",
yref="y",
text="PEAK THROUGHPUT",
showarrow=False,
xanchor="center",
yanchor="bottom",
yshift=24,
font=dict(
family=tokens["typography"]["font_family"],
size=10,
color="#ffffff",
),
)
)
peak_header_added = True
label_text = f"{_fmt_si(peak_y)}"
peak_annotations.append(
dict(
x=_log10(label_x),
y=_log10(peak_y),
xref="x",
yref="y",
text=label_text,
showarrow=False,
xanchor="center",
yanchor="bottom",
yshift=6,
font=dict(
family=tokens["typography"]["font_family_mono"],
size=12,
color=color,
),
)
)
if version:
peak_annotations.append(
dict(
x=_log10(label_x),
y=_log10(peak_y),
xref="x",
yref="y",
text=f"{version}",
showarrow=False,
xanchor="center",
yanchor="top",
yshift=4,
font=dict(
family=tokens["typography"]["font_family"],
size=9,
color=color,
),
opacity=1.0,
)
)
peak_values[name] = peak_y
y_up = peak_y * 1.06
peak_shapes.append(
dict(
type="line",
xref="x",
yref="y",
x0=peak_x,
x1=label_x,
y0=y_up,
y1=y_up,
line=dict(color=color, width=2, dash="dot"),
layer="above",
)
)
# ] bracket from pixel editor (bracket_editor_px.html)
# Pixel coords on 775x650 canvas → Plotly paper-x / data-y
top_peak = peak_values.get("nested-map", 170e6)
if "nested-map" not in peak_values:
warnings.warn("nested-map not in data; using fallback 170M")
bot_peak = peak_values.get("radix-tree", 4e6)
if "radix-tree" not in peak_values:
warnings.warn("radix-tree not in data; using fallback 4M")
improvement = top_peak / bot_peak
_margin_l, _margin_r, _margin_t, _margin_b = 60, 55, 70, 60
_plot_w = args.width - _margin_l - _margin_r
_plot_h = args.height - _margin_t - _margin_b
_y_log_lo = _log10(axis_min)
_y_log_hi = _log10(y_max)
def _px_to_paper_x(px: float) -> float:
return float((px - _margin_l) / _plot_w)
def _px_to_data_y(px: float) -> float:
frac = (_margin_t + _plot_h - px) / _plot_h
return float(10 ** (_y_log_lo + frac * (_y_log_hi - _y_log_lo)))
br_x = 723
br_y_top = 131
br_y_bot = 369
br_tick = 16
br_lx_shift = 6
bx_paper = _px_to_paper_x(br_x)
tick_paper = br_tick / _plot_w
top_y = _px_to_data_y(br_y_top)
bot_y = _px_to_data_y(br_y_bot)
mid_y = _px_to_data_y((br_y_top + br_y_bot) / 2)
for y_val in [top_y, bot_y]:
peak_shapes.append(
dict(
type="line",
xref="paper",
yref="y",
x0=bx_paper,
x1=bx_paper - tick_paper,
y0=y_val,
y1=y_val,
line=dict(color="#cdcdcd", width=1.5),
layer="above",
)
)
peak_shapes.append(
dict(
type="line",
xref="paper",
yref="y",
x0=bx_paper,
x1=bx_paper,
y0=bot_y,
y1=top_y,
line=dict(color="#cdcdcd", width=1.5),
layer="above",
)
)
peak_annotations.append(
dict(
xref="paper",
yref="y",
x=bx_paper,
y=_log10(mid_y),
text=f"{improvement:.0f}×", # noqa: RUF001
showarrow=False,
xanchor="left",
xshift=br_lx_shift,
font=dict(
family=tokens["typography"]["font_family"],
size=10,
color="#ffffff",
),
)
)
fig.update_layout(
template=template,
title=dict(text="ACHIEVED VS. OFFERED THROUGHPUT (HIGHER IS BETTER)"),
xaxis=dict(
title="Offered Throughput (block ops/s)",
type="log",
range=[_log10(axis_min), _log10(x_max)],
tickformat=".2s",
),
yaxis=dict(
title="Achieved Throughput (block ops/s)",
type="log",
range=[_log10(axis_min), _log10(y_max)],
tickformat=".2s",
),
legend=dict(
x=0.02,
y=0.98,
xanchor="left",
yanchor="top",
),
width=args.width,
height=args.height,
margin=dict(l=_margin_l, r=_margin_r, t=_margin_t, b=_margin_b),
annotations=peak_annotations,
shapes=peak_shapes,
)
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
png = out.with_suffix(".png")
svg = out.with_suffix(".svg")
fig.write_image(str(png), scale=3)
fig.write_image(str(svg))
print(f"Wrote {png.name} ({args.width}x{args.height})")
print(f"Wrote {svg.name} ({args.width}x{args.height})")
def _log10(x: float) -> float:
return math.log10(x)
def _fmt_si(v: float) -> str:
if v >= 1e9:
return f"{v / 1e9:.1f}G"
if v >= 1e6:
return f"{v / 1e6:.0f}M"
if v >= 1e3:
return f"{v / 1e3:.0f}K"
return f"{v:.0f}"
if __name__ == "__main__":
main()