Unverified Commit 5035447f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: NAT telemetry conversion (#6022)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent b19de4ed
...@@ -85,6 +85,73 @@ python convert.py \ ...@@ -85,6 +85,73 @@ python convert.py \
| `--num-requests` | Max requests to process | All | | `--num-requests` | Max requests to process | All |
| `--skip-requests` | Skip first N requests | 0 | | `--skip-requests` | Skip first N requests | 0 |
---
# Telemetry Trace Converter
Convert OpenAI-style telemetry JSONL (e.g., from agentic research pipelines) to mooncake format for aiperf benchmarking.
## Overview
This converter transforms `telemetry.jsonl` containing `llm_call` and `tool_call` events into mooncake-style JSONL. It identifies 6 agent types from the telemetry and tags each entry accordingly.
## Input Format
Telemetry JSONL with one event per line:
```json
{"event_type": "llm_call", "timestamp": "...", "session_id": "...", "latency_priority": "HIGH", "latency_ms": 738.22, "request_payload": {"messages": [...], "model": "gpt-5.2"}, "response_payload": {"usage": {"prompt_tokens": 256, "completion_tokens": 4}}}
{"event_type": "tool_call", "tool_name": "tavily_web_search", "session_id": "...", "start_time": "...", "end_time": "...", "duration_ms": 181.08}
```
Only `llm_call` events are processed; `tool_call` events are dropped.
## Output Format
Mooncake JSONL with agent type and priority:
```json
{"session_id": "082e33c7-...", "agent_type": "deep_coordinator", "input_length": 2426, "output_length": 33, "hash_ids": [1, 2, 3], "priority": "HIGH"}
{"session_id": "082e33c7-...", "agent_type": "research_worker", "input_length": 4800, "output_length": 154, "hash_ids": [1, 2, 3, 4], "priority": "LOW"}
```
## Agent Types
Agents are identified by system prompt prefix matching:
| Agent Type | System Prompt Prefix |
|---|---|
| `deep_coordinator` | `You are a Deep Research agent` |
| `research_worker` | `Gather and synthesize comprehe` |
| `research_planner` | `For the given task, generate a` |
| `shallow_agent` | `Current date and time:` |
| `classifier` | (no system msg) — "Classify" in user msg |
| `complexity_analyzer` | (no system msg) — "complexity analyzer" in user msg |
## Usage
Basic conversion:
```bash
python convert_telemetry.py --input-file /path/to/telemetry.jsonl
```
With custom tokenizer:
```bash
python convert_telemetry.py \
--input-file /path/to/telemetry.jsonl \
--tokenizer deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--block-size 128
```
## Arguments
| Argument | Description | Default |
|---|---|---|
| `--input-file` | Path to telemetry JSONL | Required |
| `--output-file` | Output JSONL path | `<input>_mooncake.jsonl` |
| `--tokenizer` | HuggingFace tokenizer name | `deepseek-ai/DeepSeek-R1-Distill-Llama-8B` |
| `--block-size` | Block size for hash generation | 64 |
---
## Running with aiperf ## Running with aiperf
After conversion, use with aiperf: After conversion, use with aiperf:
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Shared constants and utilities for trace converters."""
from aiperf.common.tokenizer import Tokenizer
from aiperf.dataset.synthesis.rolling_hasher import RollingHasher
DEFAULT_TOKENIZER = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
DEFAULT_BLOCK_SIZE = 64
def texts_to_hashes_and_lengths(
tokenizer: Tokenizer,
texts: list[str],
block_size: int,
) -> tuple[list[list[int]], list[int]]:
"""
Convert texts to hash IDs and token lengths.
Returns:
Tuple of (hash_ids_list, token_lengths) where:
- hash_ids_list: List of hash ID sequences, one per input text
- token_lengths: List of token counts, one per input text
"""
hasher = RollingHasher(block_size=block_size)
hash_results: list[list[int]] = []
length_results: list[int] = []
for text in texts:
tokens = tokenizer.encode(text)
length_results.append(len(tokens))
blocks: list[list[int]] = [
tokens[i : i + block_size] for i in range(0, len(tokens), block_size)
]
if blocks:
hashes = hasher.hash_token_blocks(blocks)
hash_results.append(hashes)
else:
hash_results.append([])
return hash_results, length_results
...@@ -19,7 +19,7 @@ import re ...@@ -19,7 +19,7 @@ import re
from collections import defaultdict from collections import defaultdict
from aiperf.common.tokenizer import Tokenizer from aiperf.common.tokenizer import Tokenizer
from aiperf.dataset.synthesis.rolling_hasher import RollingHasher from common import DEFAULT_BLOCK_SIZE, DEFAULT_TOKENIZER, texts_to_hashes_and_lengths
from tqdm import tqdm from tqdm import tqdm
...@@ -49,8 +49,8 @@ def parse_args(): ...@@ -49,8 +49,8 @@ def parse_args():
parser.add_argument( parser.add_argument(
"--block-size", "--block-size",
type=int, type=int,
default=64, default=DEFAULT_BLOCK_SIZE,
help="Block size for hash generation (default: 64)", help=f"Block size for hash generation (default: {DEFAULT_BLOCK_SIZE})",
) )
parser.add_argument( parser.add_argument(
"--num-requests", "--num-requests",
...@@ -179,39 +179,6 @@ def extract_llm_calls(request: dict) -> list[dict]: ...@@ -179,39 +179,6 @@ def extract_llm_calls(request: dict) -> list[dict]:
return llm_calls return llm_calls
def texts_to_hashes_and_lengths(
tokenizer: Tokenizer,
texts: list[str],
block_size: int,
) -> tuple[list[list[int]], list[int]]:
"""
Convert texts to hash IDs and token lengths.
Returns:
Tuple of (hash_ids_list, token_lengths) where:
- hash_ids_list: List of hash ID sequences, one per input text
- token_lengths: List of token counts, one per input text
"""
hasher = RollingHasher(block_size=block_size)
hash_results: list[list[int]] = []
length_results: list[int] = []
for text in texts:
tokens = tokenizer.encode(text)
length_results.append(len(tokens))
blocks: list[list[int]] = [
tokens[i : i + block_size] for i in range(0, len(tokens), block_size)
]
if blocks:
hashes = hasher.hash_token_blocks(blocks)
hash_results.append(hashes)
else:
hash_results.append([])
return hash_results, length_results
def chat_inputs_to_text(chat_inputs: list) -> str: def chat_inputs_to_text(chat_inputs: list) -> str:
""" """
Convert chat_inputs array to a single text string for hashing. Convert chat_inputs array to a single text string for hashing.
...@@ -353,10 +320,8 @@ def infer_tokenizer(requests: list) -> str: ...@@ -353,10 +320,8 @@ def infer_tokenizer(requests: list) -> str:
print(f"Inferred tokenizer from model '{model_name}': {tokenizer}") print(f"Inferred tokenizer from model '{model_name}': {tokenizer}")
return tokenizer return tokenizer
# Default fallback print(f"Could not infer tokenizer, using default: {DEFAULT_TOKENIZER}")
default = "meta-llama/Llama-3.1-8B-Instruct" return DEFAULT_TOKENIZER
print(f"Could not infer tokenizer, using default: {default}")
return default
def print_statistics(mooncake_data: list): def print_statistics(mooncake_data: list):
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Convert OpenAI-style telemetry JSONL to mooncake format.
Input: telemetry.jsonl with llm_call and tool_call events
Output: mooncake-style JSONL with agent_type and priority fields
Example output:
{"session_id": "082e33c7-...", "agent_type": "deep_coordinator", "input_length": 2426, "output_length": 33, "hash_ids": [1, 2, 3], "priority": "HIGH"}
"""
import argparse
import json
import os
from collections import defaultdict
from aiperf.common.tokenizer import Tokenizer
from common import DEFAULT_BLOCK_SIZE, DEFAULT_TOKENIZER, texts_to_hashes_and_lengths
from tqdm import tqdm
AGENT_PREFIX_MAP = {
"You are a Deep Research agent": "deep_coordinator",
"Gather and synthesize comprehe": "research_worker",
"For the given task, generate a": "research_planner",
"Current date and time:": "shallow_agent",
}
def parse_args():
parser = argparse.ArgumentParser(
description="Convert telemetry JSONL to mooncake format"
)
parser.add_argument(
"--input-file",
type=str,
required=True,
help="Path to the input telemetry.jsonl file",
)
parser.add_argument(
"--output-file",
type=str,
default=None,
help="Path to the output mooncake-style JSONL file. Defaults to <input>_mooncake.jsonl",
)
parser.add_argument(
"--tokenizer",
type=str,
default=DEFAULT_TOKENIZER,
help=f"Tokenizer name/path for hashing (default: {DEFAULT_TOKENIZER})",
)
parser.add_argument(
"--block-size",
type=int,
default=DEFAULT_BLOCK_SIZE,
help=f"Block size for hash generation (default: {DEFAULT_BLOCK_SIZE})",
)
return parser.parse_args()
def load_and_sort(filepath: str) -> list[dict]:
"""Load telemetry JSONL, filter to llm_call events, sort by (session_id, timestamp)."""
events = []
with open(filepath) as f:
for line in f:
obj = json.loads(line)
if obj.get("event_type") != "llm_call":
continue
events.append(obj)
events.sort(key=lambda e: (e["session_id"], e["timestamp"]))
return events
def extract_system_prompt(event: dict) -> str | None:
"""Extract the system prompt text from an llm_call event, or None if absent."""
rp = event.get("request_payload")
if not isinstance(rp, dict):
return None
for msg in rp.get("messages", []):
if not isinstance(msg, dict) or msg.get("role") != "system":
continue
content = msg.get("content", "")
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("text"):
return item["text"]
return None
return content
return None
def extract_first_user_message(event: dict) -> str:
"""Extract the first user message text from an llm_call event."""
rp = event.get("request_payload")
if not isinstance(rp, dict):
return ""
for msg in rp.get("messages", []):
if not isinstance(msg, dict) or msg.get("role") != "user":
continue
content = msg.get("content", "")
if isinstance(content, str):
return content
if isinstance(content, list):
return str(content)
return ""
def classify_agent(event: dict) -> str:
"""Identify agent type from system prompt prefix or user message content."""
sys_prompt = extract_system_prompt(event)
if sys_prompt:
for prefix, agent_type in AGENT_PREFIX_MAP.items():
if sys_prompt.startswith(prefix):
return agent_type
return "unknown"
user_msg = extract_first_user_message(event)
if "Classify the user message" in user_msg:
return "classifier"
if "complexity analyzer" in user_msg:
return "complexity_analyzer"
return "unknown"
def messages_to_text(event: dict) -> str:
"""Concatenate all message contents from request_payload.messages into a single string."""
rp = event.get("request_payload")
if not isinstance(rp, dict):
return ""
parts = []
for msg in rp.get("messages", []):
if not isinstance(msg, dict):
continue
content = msg.get("content")
if content is None:
continue
if isinstance(content, str):
parts.append(content)
elif isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("text"):
parts.append(item["text"])
elif isinstance(item, str):
parts.append(item)
return "\n".join(parts)
def get_output_tokens(event: dict) -> int:
"""Extract completion_tokens from response_payload.usage."""
rp = event.get("response_payload")
if not isinstance(rp, dict):
return 0
usage = rp.get("usage", {})
if not isinstance(usage, dict):
return 0
return usage.get("completion_tokens", 0)
def convert_to_mooncake(
events: list[dict],
tokenizer_name: str,
block_size: int,
) -> list[dict]:
"""Convert sorted llm_call events to mooncake format."""
# Phase 1: classify agents
for event in events:
event["_agent_type"] = classify_agent(event)
# Phase 2: collect texts for tokenization
all_texts = []
for event in tqdm(events, desc="Extracting messages"):
all_texts.append(messages_to_text(event))
# Phase 3: tokenize and hash
print(f"Tokenizing and hashing {len(all_texts)} texts...")
tokenizer = Tokenizer.from_pretrained(tokenizer_name)
all_hash_ids, all_input_lengths = texts_to_hashes_and_lengths(
tokenizer, all_texts, block_size
)
# Phase 4: build output entries
mooncake_data = []
for event, input_length, hash_ids in zip(
events, all_input_lengths, all_hash_ids, strict=True
):
mooncake_data.append(
{
"session_id": event["session_id"],
"agent_type": event["_agent_type"],
"input_length": input_length,
"output_length": get_output_tokens(event),
"hash_ids": hash_ids,
"priority": event.get("latency_priority", "MEDIUM"),
}
)
return mooncake_data
def print_statistics(mooncake_data: list[dict]):
"""Print statistics about the converted data."""
if not mooncake_data:
print("No data to report statistics on.")
return
print("\n" + "=" * 60)
print("CONVERSION STATISTICS")
print("=" * 60)
sessions = defaultdict(list)
for entry in mooncake_data:
sessions[entry["session_id"]].append(entry)
print(f"\nSessions: {len(sessions)}")
turns_per_session = [len(turns) for turns in sessions.values()]
print(
f"Turns per session: min={min(turns_per_session)}, "
f"max={max(turns_per_session)}, "
f"avg={sum(turns_per_session) / len(turns_per_session):.1f}"
)
print(f"Total LLM calls: {len(mooncake_data)}")
# Agent type breakdown
from collections import Counter
agent_counts = Counter(e["agent_type"] for e in mooncake_data)
print("\nAgent types:")
for agent, count in agent_counts.most_common():
print(f" {agent}: {count}")
# Priority breakdown
priority_counts = Counter(e["priority"] for e in mooncake_data)
print("\nPriorities:")
for prio, count in priority_counts.most_common():
print(f" {prio}: {count}")
# Token statistics
input_lengths = [e["input_length"] for e in mooncake_data]
output_lengths = [e["output_length"] for e in mooncake_data]
print("\nInput Length (tokens):")
print(f" Min: {min(input_lengths)}")
print(f" Max: {max(input_lengths)}")
print(f" Avg: {sum(input_lengths) / len(input_lengths):.1f}")
print("\nOutput Length (tokens):")
print(f" Min: {min(output_lengths)}")
print(f" Max: {max(output_lengths)}")
print(f" Avg: {sum(output_lengths) / len(output_lengths):.1f}")
# Hash statistics
hash_lengths = [len(e["hash_ids"]) for e in mooncake_data]
print("\nHash IDs per entry:")
print(f" Min: {min(hash_lengths)}")
print(f" Max: {max(hash_lengths)}")
print(f" Avg: {sum(hash_lengths) / len(hash_lengths):.1f}")
print("=" * 60)
def main():
args = parse_args()
print(f"Loading {args.input_file}...")
events = load_and_sort(args.input_file)
print(f"Loaded {len(events)} llm_call events")
print(f"Using tokenizer: {args.tokenizer}")
print(f"Block size: {args.block_size}")
mooncake_data = convert_to_mooncake(events, args.tokenizer, args.block_size)
print_statistics(mooncake_data)
if args.output_file is None:
base_name = os.path.splitext(args.input_file)[0]
args.output_file = base_name + "_mooncake.jsonl"
with open(args.output_file, "w") as f:
for entry in mooncake_data:
f.write(json.dumps(entry) + "\n")
print(f"\nSaved {len(mooncake_data)} entries to {args.output_file}")
return 0
if __name__ == "__main__":
exit(main())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment