Unverified Commit 4fcee92f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: use aiperf utils in prefix synthesizer (#5906)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 6cb76b96
......@@ -7,7 +7,7 @@ import os
import random
import pandas as pd
from prefix_data_generator.hasher import RollingHasher
from aiperf.dataset.synthesis import RollingHasher
from tqdm import tqdm
......@@ -213,7 +213,7 @@ def convert_to_mooncake(df, block_size, num_hash_blocks):
(random.randint(0, num_hash_blocks),) for _ in range(hash_array_length)
]
hash_ids = hasher(content_blocks)
hash_ids = hasher.hash_token_blocks(content_blocks)
mooncake_data.append(
{
......
......@@ -18,7 +18,8 @@ import os
import re
from collections import defaultdict
from prefix_data_generator.hasher import texts_to_hashes
from aiperf.common.tokenizer import Tokenizer
from aiperf.dataset.synthesis.rolling_hasher import texts_to_hashes
from tqdm import tqdm
......@@ -264,7 +265,8 @@ def convert_to_mooncake(
all_texts = [entry[3] for entry in all_entries]
print(f"Hashing {len(all_texts)} texts...")
all_hash_ids = texts_to_hashes(tokenizer_name, all_texts, block_size)
tokenizer = Tokenizer.from_pretrained(tokenizer_name)
all_hash_ids = texts_to_hashes(tokenizer, all_texts, block_size)
# Phase 3: Build mooncake entries
mooncake_data = []
......
......@@ -13,10 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License. -->
## Quickstart
`example.py` contains an example workflow guiding through synthesizing new requests based on the mooncake trace file. It touches on the core components of this directory.
## Trace File Format
The following tools help analyze and synthesize new data based on the [mooncake trace file format](https://github.com/kvcache-ai/Mooncake/blob/d21da178bae8db9651cf18a76824c084145fc725/mooncake_trace.jsonl). In this format, the first few lines would look like this, for example:
......@@ -28,16 +24,12 @@ The following tools help analyze and synthesize new data based on the [mooncake
{"timestamp": 3052, "input_length": 2287, "output_length": 316, "hash_ids": [0, 42, 43, 44, 45]}
```
**Hash ID Generation:** Each new hash ID is the next consecutive integer after the last one used. Two `hash_ids` sharing the same integers represents the prefix overlap. To generate these increasing hash IDs from a list of texts, we provide the `texts_to_hashes` function in `hasher.py`.
> [!note]The `hashes_to_texts` function can then be used to generate back random texts from these hash IDs sampling from Lorem Ipsum.
**Hash ID Generation:** Each new hash ID is the next consecutive integer after the last one used. Two `hash_ids` sharing the same integers represents the prefix overlap. To generate hash IDs from a list of texts, use `texts_to_hashes` from `aiperf.dataset.synthesis.rolling_hasher`.
**Timestamp:** The arrival time (in milliseconds) of the request since the first request, which can be the same for multiple requests arriving simultaneously.
**Block Size and Hash IDs:** In this example, the `block_size` (the page size of the KV cache) is assumed to be 512. The length of the `hash_ids` array equals `input_length // block_size`.
A general workflow can use `texts_to_hashes` to convert texts to hashes, then use `datagen synthesize` to generate new hashes, then use `hashes_to_texts` to convert them back to random texts.
## Prefix Analyzer
The Prefix Analyzer provides statistics on a trace file, such as Input Sequence Length (ISL), Output Sequence Length (OSL), and theoretical cache hit rate.
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import tempfile
import requests
from prefix_data_generator.hasher import hashes_to_texts
from prefix_data_generator.synthesizer import Synthesizer
# download the mooncake trace file
mooncake_trace_permalink = "https://raw.githubusercontent.com/kvcache-ai/Mooncake/f09c501b2a5d73e4d60cdeb612d7d0d54e1ec228/mooncake_trace.jsonl"
with tempfile.NamedTemporaryFile(delete=False, suffix=".jsonl", mode="w+b") as tmp_file:
response = requests.get(mooncake_trace_permalink)
tmp_file.write(response.content)
trace_file = tmp_file.name
# create the synthesizer
synthesizer = Synthesizer(
dataset_file=trace_file,
block_size=512, # it has to be this, as determined by the mooncake trace
speedup_ratio=2, # the requests will be sent twice as fast
prefix_root_multiplier=4, # will generate 4 separate prefix roots
prefix_len_multiplier=4, # prefix lengths 4 times as long
prompt_len_multiplier=0.5, # shorten prompt lengths to make prefix ratio even larger
)
# generate requests
requests_synth = synthesizer.synthesize_requests(
num_requests=100,
max_isl=(
16384 - 1000
), # this is what most model defaults to, leaving some room for outputs
)
# convert the hashes into random texts (lorem ipsum), respecting the prefix structure
tokenizer = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
input_texts = hashes_to_texts(
tokenizer=tokenizer,
hash_ids_list=[req["hash_ids"] for req in requests_synth],
input_lengths=[req["input_length"] for req in requests_synth],
block_size=512,
)
for i, req in enumerate(requests_synth):
req["input_text"] = input_texts[i]
del req["hash_ids"]
output_file = "synthesized_requests.jsonl"
with open("synthesized_requests.jsonl", "w") as f:
for req in requests_synth:
f.write(json.dumps(req) + "\n")
print(f"Saved {len(requests_synth)} requests to {output_file}")
......@@ -15,9 +15,13 @@
import networkx as nx
import numpy as np
from prefix_data_generator.protocols import CACHE_END, END_NODE, SUPER_ROOT
from prefix_data_generator.sampler import get_cdf
# Protocol-level constants for synthetic data graph structure
SUPER_ROOT = -1 # Dummy node preceding all real nodes; not an actual data root
CACHE_END = -2 # Special node indicating end of a path
END_NODE = -3 # Special node indicating to skip leaf sampling
def _verify_tree(G: nx.DiGraph) -> None:
invalid_nodes = [(node, d) for node, d in G.in_degree() if d > 1]
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import re
from typing import Dict, List, Sequence, Union, cast
import numpy as np
from transformers import AutoTokenizer, PreTrainedTokenizerBase
lorem_text = (
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor "
"incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis "
"nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. "
"Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore "
"eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt "
"in culpa qui officia deserunt mollit anim id est laborum."
)
words = np.array(list(set(re.findall(r"\b[a-zA-Z]+\b", lorem_text))))
class RollingHasher:
"""
A stateful rolling hasher that converts blocks of content into globally unique hash IDs.
This class maintains a mapping from content hashes to unique integer IDs across multiple
sequences. Each block's hash depends on its content and the hash of the previous block
(rolling/chained hashing).
Usage:
hasher = RollingHasher()
hash_ids = hasher(blocks) # blocks is List[List[int]] or List[tuple]
"""
def __init__(self):
"""Initialize the hasher with empty state."""
self.hash_to_int: Dict[int, int] = {}
self.next_int = 0
def __call__(self, blocks: Sequence[Sequence[int]]) -> List[int]:
"""
Convert a sequence of blocks into a sequence of unique hash IDs.
Args:
blocks: Sequence of blocks, where each block is a sequence of integers
Returns:
List of integer hash IDs, one per block
"""
parent_hash = 0
hashes: List[int] = []
for block in blocks:
# Convert block to tuple for hashing
block_tuple = tuple(block) if not isinstance(block, tuple) else block
combined = (parent_hash, hash(block_tuple))
global_hash = hash(combined)
# Map global_hash to a unique integer
if global_hash not in self.hash_to_int:
self.hash_to_int[global_hash] = self.next_int
self.next_int += 1
hashes.append(self.hash_to_int[global_hash])
parent_hash = global_hash
return hashes
def reset(self):
"""Reset the hasher state (clear all mappings)."""
self.hash_to_int.clear()
self.next_int = 0
def texts_to_hashes(
tokenizer: Union[str, PreTrainedTokenizerBase],
texts: List[str],
block_size: int = 512,
) -> List[List[int]]:
"""
Tokenizes a list of strings (without special tokens), splits tokens into blocks,
computes rolling hashes, and returns a list of lists of integer-mapped rolling hashes
for each input string.
Args:
tokenizer: Tokenizer object with a .encode method or string name to load from HuggingFace.
texts (List[str]): List of input strings.
block_size (int): Size of each token block for hashing.
Returns:
List[List[int]]: List of lists of integer-mapped rolling hashes for each block of each input string.
"""
# Load tokenizer if string is provided
if isinstance(tokenizer, str):
tokenizer = cast(
PreTrainedTokenizerBase, AutoTokenizer.from_pretrained(tokenizer)
)
# Batch tokenize for efficiency
batch_encoding = tokenizer(
texts,
add_special_tokens=False,
return_attention_mask=False,
return_token_type_ids=False,
)
# batch_encoding["input_ids"] is a List[List[int]]
all_tokens: List[List[int]] = batch_encoding["input_ids"]
# Initialize the rolling hasher
hasher = RollingHasher()
results: List[List[int]] = []
for tokens in all_tokens:
blocks: List[List[int]] = [
tokens[i : i + block_size] for i in range(0, len(tokens), block_size)
]
hashes = hasher(blocks)
results.append(hashes)
return results
def hashes_to_texts(
tokenizer: Union[str, PreTrainedTokenizerBase],
hash_ids_list: List[List[int]],
input_lengths: List[int],
block_size: int = 512,
) -> List[str]:
"""
Converts a list of hash ID sequences back to text strings using a global token mapping.
Args:
tokenizer: Tokenizer object with a .decode method or string name to load from HuggingFace.
hash_ids_list (List[List[int]]): List of hash ID sequences for each input.
input_lengths (List[int]): Target input lengths for each sequence.
block_size (int): Size of each token block for reconstruction.
Returns:
List[str]: List of reconstructed text strings.
"""
# Load tokenizer if string is provided
if isinstance(tokenizer, str):
tokenizer = cast(
PreTrainedTokenizerBase, AutoTokenizer.from_pretrained(tokenizer)
)
results: List[str] = []
_hash_id_to_tokens: Dict[int, np.ndarray] = {}
for hash_ids, input_len in zip(hash_ids_list, input_lengths):
# Verify constraint: len(hash_ids) * block_size <= input_len
if len(hash_ids) * block_size < input_len:
raise ValueError(
f"Constraint violation: len(hash_ids) * block_size ({len(hash_ids) * block_size}) > input_len ({input_len})"
)
token_arrays: List[np.ndarray] = []
for i, hash_id in enumerate(hash_ids):
# Determine the block size for this hash_id
remaining_tokens = input_len - sum(len(arr) for arr in token_arrays)
current_block_size = min(block_size, remaining_tokens)
if current_block_size <= 0:
break
# Check if hash_id already exists in global dict
if hash_id in _hash_id_to_tokens:
# Use existing array, but assert it matches current_block_size
existing_array = _hash_id_to_tokens[hash_id]
assert (
len(existing_array) == current_block_size
), f"Existing array length {len(existing_array)} does not match current block size {current_block_size}"
token_array = existing_array
else:
# Generate new random array by sampling words, tokenizing, and taking first tokens
sampled_words = np.random.choice(words, size=current_block_size)
sampled_text = " ".join(sampled_words)
tokens = tokenizer.encode(sampled_text, add_special_tokens=False)
token_array = np.array(tokens[:current_block_size], dtype=np.int32)
if getattr(tokenizer, "bos_token_id", None) is not None:
token_array[0] = tokenizer.bos_token_id
_hash_id_to_tokens[hash_id] = token_array
token_arrays.append(token_array)
all_tokens = np.concatenate(token_arrays)
# Decode to text
text = tokenizer.decode(all_tokens, skip_special_tokens=False)
results.append(text)
return results
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Protocol-level constants for synthetic data graph structure.
"""
SUPER_ROOT = -1 # Dummy node preceding all real nodes; not an actual data root
CACHE_END = -2 # Special node indicating end of a path
END_NODE = -3 # Special node indicating to skip leaf sampling
......@@ -20,15 +20,17 @@ from typing import Any, Optional
import networkx as nx
import numpy as np
import pandas as pd
from aiperf.dataset.synthesis import RollingHasher
from prefix_data_generator.graph_utils import (
CACHE_END,
END_NODE,
SUPER_ROOT,
_mark_visited,
_merge_chains,
_precompute_transition_cdfs,
_remove_leaves,
_verify_tree,
)
from prefix_data_generator.hasher import RollingHasher
from prefix_data_generator.protocols import CACHE_END, END_NODE, SUPER_ROOT
from prefix_data_generator.sampler import EmpiricalSampler, sample_from_cdf
......@@ -111,7 +113,10 @@ class Synthesizer:
# Normalize hash_ids to consecutive integers starting from 0
hasher = RollingHasher()
hash_ids_list = [hasher([(h,) for h in hash_ids]) for hash_ids in hash_ids_list]
hash_ids_list = [
hasher.hash_token_blocks([(h,) for h in hash_ids])
for hash_ids in hash_ids_list
]
# represent prefix-tree as directed graph
self.G = nx.DiGraph()
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import random
import pytest
from prefix_data_generator.hasher import hashes_to_texts, texts_to_hashes
from tokenizers import Tokenizer, decoders, models, normalizers, pre_tokenizers
from transformers import AutoTokenizer, PreTrainedTokenizerFast
@pytest.fixture(scope="module")
def dummy_tokenizer():
vocab = [chr(i) for i in range(ord("a"), ord("z") + 1)]
vocab.append("[UNK]")
vocab_dict = {token: idx for idx, token in enumerate(vocab)}
tokenizer_model = models.WordLevel(vocab=vocab_dict, unk_token="[UNK]")
tokenizer = Tokenizer(tokenizer_model)
tokenizer.normalizer = normalizers.Sequence(
[normalizers.NFD(), normalizers.Lowercase()]
)
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
tokenizer.decoder = decoders.WordPiece(prefix="")
return PreTrainedTokenizerFast(
tokenizer_object=tokenizer,
unk_token="[UNK]",
pad_token="[PAD]",
bos_token="[BOS]",
eos_token="[EOS]",
)
@pytest.fixture(scope="module")
def deepseek_tokenizer():
return AutoTokenizer.from_pretrained("deepseek-ai/deepseek-coder-1.3b-base")
def test_texts_to_hashes_blocks(dummy_tokenizer):
dum1 = "a b c d"
dum2 = "e f g h"
dum3 = "i j k l"
texts = [dum1, dum1 + " " + dum2, dum1 + " " + dum3, dum2 + " " + dum1]
expected = [[0], [0, 1], [0, 2], [3, 4]]
result = texts_to_hashes(dummy_tokenizer, texts, block_size=4)
assert result == expected, f"Expected {expected}, got {result}"
def test_hashes_to_texts_with_deepseek(deepseek_tokenizer):
"""Test hashes_to_texts with deepseek tokenizer using increasing hash IDs globally."""
# Test parameters
block_size = 64
num_entries = 100
# Generate test data
hash_ids_list = []
input_lengths = []
global_hash_id = 0
for _ in range(num_entries):
# Random input length between 1 and 20 times block_size
input_length = random.randint(block_size, 20 * block_size)
input_lengths.append(input_length)
# Calculate number of hash_ids needed (ceil div)
num_hash_ids = math.ceil(input_length / block_size)
hash_ids = list(range(global_hash_id, global_hash_id + num_hash_ids))
hash_ids_list.append(hash_ids)
global_hash_id += num_hash_ids
# Call hashes_to_texts
texts = hashes_to_texts(
deepseek_tokenizer, hash_ids_list, input_lengths, block_size
)
# Retokenize and verify input lengths are preserved
for i, (text, expected_length) in enumerate(zip(texts, input_lengths)):
tokens = deepseek_tokenizer(text, add_special_tokens=False)["input_ids"]
actual_length = len(tokens)
assert (
actual_length == expected_length
), f"Entry {i}: expected length {expected_length}, got {actual_length}"
......@@ -41,7 +41,7 @@ classifiers = [
dependencies = [
"aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@release/0.6.0",
"aiperf @ git+https://github.com/ai-dynamo/aiperf.git@54cd6dc820bff8bfebc875da104e59d745e14f75",
"aiperf @ git+https://github.com/ai-dynamo/aiperf.git@40530ed231fb01ae1cfe3c9d22e43a0e7143780b",
"matplotlib",
"networkx",
"numpy",
......
......@@ -20,7 +20,7 @@ import math
import random
import numpy as np
from prefix_data_generator.hasher import RollingHasher
from aiperf.dataset.synthesis import RollingHasher
from tqdm import tqdm
logging.basicConfig(level=logging.INFO)
......@@ -56,7 +56,7 @@ def main(args):
(random.randrange(args.total_blocks),)
for _ in range(math.ceil(isl / args.block_size))
]
rolling_hash_ids = rolling_hasher(hash_ids)
rolling_hash_ids = rolling_hasher.hash_token_blocks(hash_ids)
output_data.append(
{
"timestamp": int(t_req * 1000), # in ms, integer
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment