Commit 909abb58 authored by maxiao's avatar maxiao
Browse files

adapt to sglang v0.5.2rc1 on dcu

parents
import os
import sys
from typing import List
import av
from datasets import load_dataset
def find_video_files(video_dir) -> List[str]:
if os.path.isfile(video_dir):
return [video_dir]
video_files = []
for root, dirs, files in os.walk(video_dir):
for file in files:
if file.endswith((".mp4", ".avi", ".mov")):
video_files.append(os.path.join(root, file))
# if file is dir
elif os.path.isdir(file):
video_files.extend(find_video_files(file))
return video_files
def video_frames(video_path, max_frames) -> int:
container = av.open(video_path)
total_frames = container.streams.video[0].frames
return min(total_frames, max_frames)
class Video:
def __init__(self, video_path, num_frames):
self.path = video_path
self.num_frames = num_frames
def __str__(self):
return f"Video({self.path}, {self.num_frames})"
def __iter__(self):
return iter((self.path, self.num_frames))
class VideoPrompt(Video):
def __init__(self, video_path, num_frames, prompt):
super().__init__(video_path, num_frames)
self.prompt = prompt
def __str__(self):
return f"VideoPrompt({self.path}, {self.num_frames}, {self.prompt})"
def __iter__(self):
return iter((self.path, self.num_frames, self.prompt))
class VideoLoader:
pass
class VideoFileLoader(VideoLoader):
"""
Load all the videos in a directory
"""
def __init__(self, video_dir, batch_size=1, max_frames=sys.maxsize):
super().__init__()
self.video_dir = video_dir
self.video_files = find_video_files(video_dir)
self.batch_size = batch_size
self.max_frames = max_frames
print(f"batch_size: {batch_size}, max_frames: {max_frames}")
def __iter__(self): # (file, number of frames)
if self.batch_size == 1:
for video_file in self.video_files:
yield Video(video_file, video_frames(video_file, self.max_frames))
else:
batch = []
for video_file in self.video_files:
video = Video(video_file, video_frames(video_file, self.max_frames))
batch.append(video)
if len(batch) == self.batch_size:
yield batch
batch = []
class NExTQALoader(VideoLoader):
"""
Load vdideos and prompts from NExT dataset
set: train, test or validation
"""
def __init__(
self, video_dir, batch_size=1, max_frames=sys.maxsize, dset="test", task="OE"
):
"""
task: 'MV' or 'OE'
"""
super().__init__()
self.task = task
print(f"Loading the {dset} data of {task} from lmms-lab/NExTQA")
self.ds = load_dataset("lmms-lab/NExTQA", task)
self.ds = self.ds[dset]
# self.n = ds.num_rows
self.video_dir = video_dir
self.video_files = find_video_files(video_dir)
self.video_to_path = dict()
for video_file in self.video_files:
video_id = video_file.split("/")[-1].split(".")[0]
self.video_to_path[video_id] = video_file
self.batch_size = batch_size
self.max_frames = max_frames
def get_video_prompt(self, entry, max_frames) -> VideoPrompt:
# Get video
video_id = entry["video"]
video_path = self.video_to_path[video_id]
assert os.path.exists(video_path), f"Video not found: {video_path}"
num_frames = min(entry["frame_count"], max_frames)
video = Video(video_path, num_frames)
prompt = entry["question"] + "?"
if self.task == "MC": # add choices
prompt += f' a0: {entry["a0"]}, a1: {entry["a1"]}, a2: {entry["a2"]}, a3: {entry["a3"]}'
return VideoPrompt(video_path, num_frames, prompt)
def __iter__(self):
if self.batch_size == 1:
for entry in self.ds:
yield self.get_video_prompt(entry, self.max_frames)
else:
batch = []
for entry in self.ds:
video = self.get_video_prompt(entry, self.max_frames)
batch.append(video)
if len(batch) == self.batch_size:
yield batch
batch = []
# main
if __name__ == "__main__":
video_dir = "./videos"
# video_loader = VideoFileLoader(video_dir, batch_size=16)
# for batch in video_loader:
# print(f"Number of videos in batch: {len(batch)}")
# for video_file, num_frames in batch:
# print(f"Video: {video_file} number of frames: {num_frames}")
video_loader = NExTQALoader(video_dir, batch_size=16, dset="test", task="OE")
for batch in video_loader:
print(f"Number of videos in batch: {len(batch)}")
for video_file, num_frames, prompt in batch:
print(
f"Video: {video_file} number of frames: {num_frames}, prompt: {prompt}"
)
# break
# for video_file, prompt in batch:
# print(f"Video: {video_file} prompt: {prompt}")
# break
## Run benchmark
### Build dataset
```
pip install wikipedia
python3 build_dataset.py
```
### Dependencies
```
llama_cpp_python 0.2.19
guidance 0.1.10
vllm 0.2.5
outlines 0.0.22
```
### Benchmark sglang
Run Llama-7B
```
python3 -m sglang.launch_server --model-path meta-llama/Llama-2-7b-chat-hf --port 30000
```
Run Mixtral-8x7B
```
python3 -m sglang.launch_server --model-path mistralai/Mixtral-8x7B-Instruct-v0.1 --port 30000 --tp-size 8
```
Benchmark
```
python3 bench_sglang.py --num-questions 10
```
### Benchmark Outlines + vLLM
Run Llama-7B
```
python3 -m outlines.serve.serve --tokenizer-mode auto --model meta-llama/Llama-2-7b-chat-hf --disable-log-requests --port 21000
```
Benchmark
```
python3 bench_other.py --backend outlines --num-questions 10
```
### Benchmark guidance
Run Llama-7B and benchmark
```
python3 bench_other.py --backend guidance --num-questions 10 --parallel 1 --n-ctx 4096 --model-path path/to/gguf
```
import argparse
import json
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from tqdm import tqdm
from sglang.lang.ir import REGEX_FLOAT, REGEX_INT, REGEX_STR
from sglang.test.test_utils import add_common_other_args_and_parse, get_call_generate
from sglang.utils import dump_state_text, read_jsonl
REGEX_LIST = r"\[(" + REGEX_STR + ", )*" + REGEX_STR + r"\]"
# fmt: off
def json_decode(document, generate):
s = "Please extract the information of a city from the following wikipedia page.\n"
s += "Page begin.\n" + document + "Page end.\n"
s += "Here is the name, country, and symbol of the city in JSON format.\n"
s += "{\n"
s += ' "name": '
s += generate(s, max_tokens=8, regex=REGEX_STR + ",") + "\n"
s += ' "country": '
s += generate(s, max_tokens=8, regex=REGEX_STR + ",") + "\n"
s += ' "latitude": '
s += generate(s, max_tokens=8, regex=REGEX_FLOAT + ",") + "\n"
s += ' "population": '
s += generate(s, max_tokens=8, regex=REGEX_INT + ",") + "\n"
s += ' "top 3 landmarks": '
s += generate(s, max_tokens=24, regex=REGEX_LIST) + "\n"
s += "}\n"
return s
# fmt: on
def main(args):
lines = read_jsonl(args.data_path)
arguments = []
for i in range(len(lines[: args.num_questions])):
arguments.append(
{
"document": lines[i]["document"],
}
)
states = [None] * len(arguments)
# Select backend
call_generate = partial(get_call_generate(args), temperature=0)
# Run requests
def get_one_answer(i):
states[i] = json_decode(generate=call_generate, **arguments[i])
tic = time.perf_counter()
if args.parallel == 1:
for i in tqdm(range(len(arguments))):
get_one_answer(i)
else:
with ThreadPoolExecutor(args.parallel) as executor:
rets = list(
tqdm(
executor.map(get_one_answer, list(range(len(arguments)))),
total=len(arguments),
)
)
for _ in rets:
pass
latency = time.perf_counter() - tic
# Compute accuracy
print(f"Latency: {latency:.3f}")
# Write results
dump_state_text(f"tmp_output_{args.backend}.txt", states)
with open(args.result_file, "a") as fout:
value = {
"task": "json_decode_regex",
"backend": args.backend,
"num_gpus": 1,
"latency": round(latency, 3),
"num_requests": args.num_questions,
"other": {
"parallel": args.parallel,
},
}
fout.write(json.dumps(value) + "\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", type=str, default="questions.jsonl")
parser.add_argument("--num-questions", type=int, default=20)
args = add_common_other_args_and_parse(parser)
main(args)
import argparse
import json
import time
import sglang as sgl
from sglang.lang.ir import REGEX_FLOAT, REGEX_INT, REGEX_STR
from sglang.test.test_utils import (
add_common_sglang_args_and_parse,
select_sglang_backend,
)
from sglang.utils import dump_state_text, read_jsonl
REGEX_LIST = r"\[(" + REGEX_STR + ", )*" + REGEX_STR + r"\]"
# fmt: off
@sgl.function
def json_warm_up(s):
s += "The information about Hogwarts is in the following JSON format.\n"
with s.var_scope("json_output"):
s += "{\n"
s += ' "name": ' + sgl.gen("name", max_tokens=8, regex=REGEX_STR + ",") + "\n"
s += ' "country": ' + sgl.gen("country", max_tokens=8, regex=REGEX_STR + ",") + "\n"
s += ' "latitude": ' + sgl.gen("latitude", max_tokens=8, regex=REGEX_FLOAT + ",") + "\n"
s += ' "population": ' + sgl.gen("population", max_tokens=8, regex=REGEX_INT + ",") + "\n"
s += ' "top 3 landmarks": ' + sgl.gen( "landmarks", max_tokens=24, regex=REGEX_LIST) + "\n"
s += "}\n"
print(f'The warmp up json result is:\n{s["json_output"]}')
# fmt: on
# fmt: off
@sgl.function
def json_decode(s, document):
s += "Please extract the information of a city from the following wikipedia page.\n"
s += "Page begin.\n" + document + "Page end.\n"
s += "Here is the name, country, and symbol of the city in JSON format.\n"
with s.var_scope("json_output"):
s += "{\n"
s += ' "name": ' + sgl.gen("name", max_tokens=8, regex=REGEX_STR + ",") + "\n"
s += ' "country": ' + sgl.gen("country", max_tokens=8, regex=REGEX_STR + ",") + "\n"
s += ' "latitude": ' + sgl.gen("latitude", max_tokens=8, regex=REGEX_FLOAT + ",") + "\n"
s += ' "population": ' + sgl.gen("population", max_tokens=8, regex=REGEX_INT + ",") + "\n"
s += ' "top 3 landmarks": ' + sgl.gen( "landmarks", max_tokens=24, regex=REGEX_LIST) + "\n"
s += "}\n"
# fmt: on
def main(args):
lines = read_jsonl(args.data_path)
lines = list(lines)
arguments = []
for i in range(len(lines[: args.num_questions])):
arguments.append(
{
"document": lines[i]["document"],
}
)
# Select backend
backend = select_sglang_backend(args)
sgl.set_default_backend(backend)
# Warm up
json_warm_up.run().sync()
# Run requests
tic = time.perf_counter()
states = json_decode.run_batch(
arguments, temperature=0, num_threads=args.parallel, progress_bar=True
)
latency = time.perf_counter() - tic
# Compute accuracy
print(f"Latency: {latency:.3f}")
# Write results
dump_state_text(f"tmp_output_{args.backend}.txt", states)
with open(f"tmp_{args.backend}_json_results.txt", "w") as fout:
for state in states:
fout.write(state["json_output"] + "\n")
with open(args.result_file, "a") as fout:
value = {
"task": "json_decode_regex",
"backend": args.backend,
"num_gpus": 1,
"latency": round(latency, 3),
"num_requests": args.num_questions,
"other": {
"parallel": args.parallel,
},
}
fout.write(json.dumps(value) + "\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", type=str, default="questions.jsonl")
parser.add_argument("--num-questions", type=int, default=20)
args = add_common_sglang_args_and_parse(parser)
main(args)
import json
import transformers
import wikipedia
model_path = "meta-llama/Llama-2-7b-chat-hf"
t = transformers.AutoTokenizer.from_pretrained(model_path)
city_names = [
"los angles",
"london",
"tokyo",
"beijing",
"singapore",
"paris",
"dubai",
"sydney",
"moscow",
"rome",
"toronto",
"rio de janeiro",
"istanbul",
"berlin",
"auckland",
"buenos aires",
"mexico city",
"mumbai",
"seoul",
"bangkok",
"cairo",
"athens",
"jerusalem",
]
def get_content(city_name):
content = str(wikipedia.page(city_name).content)
content = content.replace("\n\n", "\n")
tokens = t.encode(content)
expected_tokens = 3000
truncate_len = int((expected_tokens / len(tokens)) * len(content))
truncate_content = content[:truncate_len]
truncate_tokens = t.encode(truncate_content)
# Count token
print(
f"city_name: {city_name}, #tokens: {len(tokens)}, #truncate tokens: {len(truncate_tokens)}"
)
return truncate_content
if __name__ == "__main__":
with open("questions.jsonl", "w") as fout:
for city_name in city_names:
truncate_content = get_content(city_name)
fout.write(json.dumps({"document": truncate_content}) + "\n")
## Run benchmark
### Dependencies
```
llama_cpp_python 0.2.38
guidance 0.1.10
vllm 0.2.7
outlines 0.0.25
```
### Build dataset
When benchmarking long document information retrieval, run the following command to build the dataset:
```bash
pip install wikipedia
python3 build_dataset.py
```
### Benchmark sglang
Run Llama-7B
```bash
python3 -m sglang.launch_server --model-path meta-llama/Llama-2-7b-chat-hf --port 30000
```
Benchmark Character Generation
```bash
python3 bench_sglang.py --mode character
```
Benchmark City Information Retrieval
```bash
python3 bench_sglang.py --mode city
```
### Benchmark Outlines + vLLM
Run Llama-7B
```bash
python3 -m outlines.serve.serve --tokenizer-mode auto --model meta-llama/Llama-2-7b-chat-hf --disable-log-requests --port 21000
```
Benchmark Character Generation
```bash
python3 bench_other.py --mode character --backend outlines
```
Benchmark City Information Retrieval
```bash
python3 bench_other.py --mode city --backend outlines
```
### Benchmark guidance
Run Llama-7B and benchmark character generation
```bash
python3 bench_other.py --mode character --backend guidance --parallel 1 --n-ctx 4096 --model-path path/to/gguf
```
Run Llama-7B and benchmark city information retrieval
```bash
python3 bench_other.py --mode city --backend guidance --parallel 1 --n-ctx 4096 --model-path path/to/gguf
```
### Benchmark lmql
Run Llama-7B and benchmark character generation
```
python3 bench_other.py --mode character --backend lmql --parallel 1
```
Run Llama-7B and benchmark city information retrieval
```
python3 bench_other.py --mode city --backend lmql --parallel 1
```
import argparse
import json
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import guidance
from tqdm import tqdm
from sglang.test.test_utils import add_common_other_args_and_parse, get_call_generate
from sglang.utils import dump_state_text, read_jsonl
# there are some FSM bugs with json regex converted from pydantic model
# here use a string regex instead
# regex_string = build_regex_from_object(HarryPoterRole)
character_regex = (
r"""\{\n"""
+ r""" "name": "[\w\d\s]{1,16}",\n"""
+ r""" "house": "(Gryffindor|Slytherin|Ravenclaw|Hufflepuff)",\n"""
+ r""" "blood status": "(Pure-blood|Half-blood|Muggle-born)",\n"""
+ r""" "occupation": "(student|teacher|auror|ministry of magic|death eater|order of the phoenix)",\n"""
+ r""" "wand": \{\n"""
+ r""" "wood": "[\w\d\s]{1,16}",\n"""
+ r""" "core": "[\w\d\s]{1,16}",\n"""
+ r""" "length": [0-9]{1,2}\.[0-9]{0,2}\n"""
+ r""" \},\n"""
+ r""" "alive": "(Alive|Deceased)",\n"""
+ r""" "patronus": "[\w\d\s]{1,16}",\n"""
+ r""" "bogart": "[\w\d\s]{1,16}"\n"""
+ r"""\}"""
)
city_regex = (
r"""\{\n"""
+ r""" "name": "[\w\d\s]{1,16}",\n"""
+ r""" "country": "[\w\d\s]{1,16}",\n"""
+ r""" "latitude": [-+]?[0-9]*\.?[0-9]{0,2},\n"""
+ r""" "population": [-+]?[0-9]{1,9},\n"""
+ r""" "top 3 landmarks": \["[\w\d\s]{1,16}", "[\w\d\s]{1,16}", "[\w\d\s]{1,16}"\]\n"""
+ r"""\}"""
)
# fmt: off
def character_gen(name, generate):
s = name + " is a character in Harry Potter. Please fill in the following information about this character.\n"
s += generate(s, max_tokens=256, regex=character_regex)
return s
# fmt: on
# fmt: off
def city_gen(document, generate):
s = "Please extract the information of a city from the following wikipedia page.\n"
s += "Page begin.\n" + document + "Page end.\n"
s += "Here is the name, country, and symbol of the city in JSON format.\n"
s += generate(s, max_tokens=256, regex=city_regex)
return s
# fmt: on
@guidance
def character_maker(lm, name):
regex_str_no_quote = r"[\w\d\s]+"
regex_float = r"[0-9]+\.[0-9]+"
lm += f"""\
{name} is a character in Harry Potter. Please fill in the following information about this character.
{{
"name": "{guidance.gen("name", max_tokens=16, regex=regex_str_no_quote)}",
"house": "{guidance.select(options=['Gryffindor', 'Slytherin', 'Ravenclaw', 'Hufflepuff'], name='house')}",
"blood status": "{guidance.select(options=['Pure-blood', 'Half-blood', 'Muggle-born'], name='blood status')}",
"occupation": "{guidance.select(options=['student', 'teacher', 'auror', 'ministry of magic', 'death eater', 'order of the phoenix'], name='occupation')}",
"wand": {{
"wood": "{guidance.gen("wood", max_tokens=16, regex=regex_str_no_quote)}",
"core": "{guidance.gen('core', max_tokens=16, regex=regex_str_no_quote)}",
"length": {guidance.gen('length', max_tokens=10, regex=regex_float)}
}},
"alive": "{guidance.select(options=['Alive', 'Deceased'], name='alive')}",
"patronus": "{guidance.gen('patronus', max_tokens=16, regex=regex_str_no_quote)}",
"bogart": "{guidance.gen('bogart', max_tokens=16, regex=regex_str_no_quote)}"
}}
"""
return lm
async def call_generate_lmql(
prompt, temperature, max_tokens, regex, max_len=4096, model=None, **kwargs
):
assert model is not None
import lmql
@lmql.query(model=model)
async def program(question, max_tokens, regex):
'''lmql
"""{question}[ANSWER]""" where len(TOKENS(ANSWER)) < max_tokens and REGEX(ANSWER, regex)
return ANSWER
'''
return await program(
question=prompt,
temperature=temperature,
max_tokens=max_tokens,
max_len=max_len,
regex=regex,
**kwargs,
)
@guidance
def city_maker(lm, document):
regex_str_no_quote = r"[\w\d\s]+"
regex_float = r"[0-9]+\.[0-9]+"
lm += f"""\
Please extract the information of a city from the following wikipedia page.
Page begin.
{document}
Page end.
Here is the name, country, and symbol of the city in JSON format.
{{
"name": "{guidance.gen("name", max_tokens=16, regex=regex_str_no_quote)}",
"country": "{guidance.gen("country", max_tokens=16, regex=regex_str_no_quote)}",
"latitude": {guidance.gen("latitude", max_tokens=10, regex=regex_float)},
"population": {guidance.gen("population", max_tokens=10, regex=r"[0-9]+")},
"top 3 landmarks": [
"{guidance.gen("landmark1", max_tokens=16, regex=regex_str_no_quote)}", "{guidance.gen("landmark2", max_tokens=16, regex=regex_str_no_quote)}", "{guidance.gen("landmark3", max_tokens=16, regex=regex_str_no_quote)}"
]
}}
"""
return lm
def bench_character(args):
arguments = []
with open(args.data_path, "r") as f:
for line in f:
arguments.append({"name": line.strip()})
arguments = arguments[: args.num_jsons]
states = [None] * len(arguments)
# Select backend
if args.backend == "outlines":
call_generate = partial(get_call_generate(args), temperature=0)
def get_one_answer(i):
states[i] = character_gen(**arguments[i], generate=call_generate)
elif args.backend == "guidance":
model = guidance.models.LlamaCpp(
args.model_path,
n_gpu_layers=-1,
n_ctx=args.n_ctx,
)
def get_one_answer(i):
lm = model + character_maker(**arguments[i])
states[i] = lm
elif args.backend == "lmql":
import asyncio
import lmql
model = lmql.model(args.model_path, endpoint=f"{args.host}:{args.port}")
call_generate = partial(
call_generate_lmql,
model=model,
max_tokens=256,
regex=character_regex,
)
async def get_one_answer_async(i):
states[i] = await call_generate(prompt=arguments[i]["name"], temperature=0)
else:
raise ValueError(f"Invalid backend: {args.backend}")
tic = time.perf_counter()
if args.backend != "lmql":
if args.parallel == 1:
for i in tqdm(range(len(arguments))):
get_one_answer(i)
else:
with ThreadPoolExecutor(args.parallel) as executor:
rets = list(
tqdm(
executor.map(get_one_answer, list(range(len(arguments)))),
total=len(arguments),
)
)
for _ in rets:
pass
else:
batches = []
for i in range(0, len(arguments), args.parallel):
batches.append(list(range(i, min(i + args.parallel, len(arguments)))))
loop = asyncio.get_event_loop()
for bt in tqdm(batches):
loop.run_until_complete(
asyncio.gather(*[get_one_answer_async(i) for i in bt])
)
latency = time.perf_counter() - tic
return states, latency
def bench_city_doc(args):
arguments = []
for line in read_jsonl(args.data_path):
arguments.append({"document": line["document"]})
arguments = arguments[: args.num_jsons]
states = [None] * len(arguments)
# Select backend
if args.backend == "outlines":
call_generate = partial(get_call_generate(args), temperature=0)
def get_one_answer(i):
states[i] = city_gen(**arguments[i], generate=call_generate)
elif args.backend == "guidance":
model = guidance.models.LlamaCpp(
args.model_path,
n_gpu_layers=-1,
n_ctx=args.n_ctx,
)
def get_one_answer(i):
lm = model + city_maker(**arguments[i])
states[i] = lm
else:
raise ValueError(f"Invalid backend: {args.backend}")
tic = time.perf_counter()
if args.parallel == 1:
for i in tqdm(range(len(arguments))):
get_one_answer(i)
else:
with ThreadPoolExecutor(args.parallel) as executor:
rets = executor.map(get_one_answer, list(range(len(arguments))))
for _ in rets:
pass
latency = time.perf_counter() - tic
return states, latency
def main(args):
if args.mode == "character":
args.data_path = "dataset.txt"
states, latency = bench_character(args)
elif args.mode == "city":
args.data_path = "questions.jsonl"
states, latency = bench_city_doc(args)
# Compute accuracy
print(f"Latency: {latency:.3f}")
# Write results
dump_state_text(f"tmp_output_{args.backend}_{args.mode}.txt", states)
with open(args.result_file, "a") as fout:
value = {
"task": "json_jump_forward",
"backend": args.backend,
"latency": round(latency, 3),
"num_jsons": args.num_jsons,
"mode": args.mode,
"parallel": args.parallel,
}
fout.write(json.dumps(value) + "\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", type=str)
parser.add_argument("--num-jsons", type=int, default=50)
parser.add_argument(
"--mode", type=str, default="character", choices=["character", "city"]
)
args = add_common_other_args_and_parse(parser)
main(args)
import argparse
import json
import time
import sglang as sgl
from sglang.test.test_utils import (
add_common_sglang_args_and_parse,
select_sglang_backend,
)
from sglang.utils import dump_state_text, read_jsonl
# there are some FSM bugs with json regex converted from pydantic model
# here use a string regex instead
# regex_string = build_regex_from_object(HarryPoterRole)
character_regex = (
r"""\{\n"""
+ r""" "name": "[\w\d\s]{1,16}",\n"""
+ r""" "house": "(Gryffindor|Slytherin|Ravenclaw|Hufflepuff)",\n"""
+ r""" "blood status": "(Pure-blood|Half-blood|Muggle-born)",\n"""
+ r""" "occupation": "(student|teacher|auror|ministry of magic|death eater|order of the phoenix)",\n"""
+ r""" "wand": \{\n"""
+ r""" "wood": "[\w\d\s]{1,16}",\n"""
+ r""" "core": "[\w\d\s]{1,16}",\n"""
+ r""" "length": [0-9]{1,2}\.[0-9]{0,2}\n"""
+ r""" \},\n"""
+ r""" "alive": "(Alive|Deceased)",\n"""
+ r""" "patronus": "[\w\d\s]{1,16}",\n"""
+ r""" "bogart": "[\w\d\s]{1,16}"\n"""
+ r"""\}"""
)
city_regex = (
r"""\{\n"""
+ r""" "name": "[\w\d\s]{1,16}",\n"""
+ r""" "country": "[\w\d\s]{1,16}",\n"""
+ r""" "latitude": [-+]?[0-9]*\.?[0-9]{0,2},\n"""
+ r""" "population": [-+]?[0-9]{1,9},\n"""
+ r""" "top 3 landmarks": \["[\w\d\s]{1,16}", "[\w\d\s]{1,16}", "[\w\d\s]{1,16}"\]\n"""
+ r"""\}"""
)
# fmt: off
@sgl.function
def character_gen(s, name):
s += name + " is a character in Harry Potter. Please fill in the following information about this character.\n"
s += sgl.gen("json_output", max_tokens=256, regex=character_regex)
# fmt: on
# fmt: off
@sgl.function
def city_gen(s, document):
s += "Please extract the information of a city from the following wikipedia page.\n"
s += "Page begin.\n" + document + "Page end.\n"
s += "Here is the name, country, and symbol of the city in JSON format.\n"
s += sgl.gen("json_output",max_tokens=256, regex=city_regex)
# fmt: on
def bench_city_doc(args):
arguments = []
for line in read_jsonl(args.data_path):
arguments.append({"document": line["document"]})
arguments = arguments[: args.num_jsons]
# Select backend
backend = select_sglang_backend(args)
sgl.set_default_backend(backend)
# Run requests
tic = time.perf_counter()
states = city_gen.run_batch(
arguments,
temperature=0,
num_threads=args.parallel,
progress_bar=True,
)
latency = time.perf_counter() - tic
return states, latency
def bench_character(args):
arguments = []
with open(args.data_path, "r") as f:
for line in f:
arguments.append({"name": line.strip()})
arguments = arguments[: args.num_jsons]
# Select backend
backend = select_sglang_backend(args)
sgl.set_default_backend(backend)
# Run requests
tic = time.perf_counter()
states = character_gen.run_batch(
arguments,
temperature=0,
num_threads=args.parallel,
progress_bar=True,
)
latency = time.perf_counter() - tic
return states, latency
def main(args):
if args.mode == "character":
args.data_path = "dataset.txt"
states, latency = bench_character(args)
elif args.mode == "city":
args.data_path = "questions.jsonl"
states, latency = bench_city_doc(args)
# Compute accuracy
print(f"Latency: {latency:.3f}")
# Write results
dump_state_text(f"tmp_output_{args.backend}_{args.mode}.txt", states)
with open(f"{args.backend}_{args.mode}.json", "w") as fout:
for state in states:
fout.write(state["json_output"] + "\n")
with open(args.result_file, "a") as fout:
value = {
"task": "json_jump_forward",
"backend": args.backend,
"latency": round(latency, 3),
"num_jsons": args.num_jsons,
"mode": args.mode,
"parallel": args.parallel,
}
fout.write(json.dumps(value) + "\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", type=str)
parser.add_argument("--num-jsons", type=int, default=50)
parser.add_argument(
"--mode", type=str, default="character", choices=["character", "city"]
)
args = add_common_sglang_args_and_parse(parser)
main(args)
import json
import transformers
import wikipedia
model_path = "meta-llama/Llama-2-7b-chat-hf"
t = transformers.AutoTokenizer.from_pretrained(model_path)
city_names = [
"los angles",
"london",
"tokyo",
"beijing",
"singapore",
"paris",
"dubai",
"sydney",
"moscow",
"rome",
"toronto",
"rio de janeiro",
"istanbul",
"berlin",
"auckland",
"buenos aires",
"mexico city",
"mumbai",
"seoul",
"bangkok",
"cairo",
"athens",
"jerusalem",
]
def get_content(city_name):
content = str(wikipedia.page(city_name).content)
content = content.replace("\n\n", "\n")
tokens = t.encode(content)
expected_tokens = 3000
truncate_len = int((expected_tokens / len(tokens)) * len(content))
truncate_content = content[:truncate_len]
truncate_tokens = t.encode(truncate_content)
# Count token
print(
f"city_name: {city_name}, #tokens: {len(tokens)}, #truncate tokens: {len(truncate_tokens)}"
)
return truncate_content
if __name__ == "__main__":
with open("questions.jsonl", "w") as fout:
for city_name in city_names:
truncate_content = get_content(city_name)
fout.write(json.dumps({"document": truncate_content}) + "\n")
Harry Potter
Hermione Granger
Ron Weasley
Albus Dumbledore
Severus Snape
Rubeus Hagrid
Draco Malfoy
Ginny Weasley
Fred Weasley
George Weasley
Percy Weasley
Sirius Black
Remus Lupin
Neville Longbottom
Luna Lovegood
Cedric Diggory
Cho Chang
Lord Voldemort
Minerva McGonagall
Filius Flitwick
Dolores Umbridge
Bellatrix Lestrange
Lucius Malfoy
Molly Weasley
Arthur Weasley
Nymphadora Tonks
Dobby
Moaning Myrtle
Peter Pettigrew
Alastor 'Mad-Eye' Moody
Horace Slughorn
Vernon Dursley
Petunia Dursley
Dudley Dursley
Argus Filch
Sybill Trelawney
Gilderoy Lockhart
Fleur Delacour
Viktor Krum
Bill Weasley
Oliver Wood
Cornelius Fudge
Barty Crouch Sr.
Barty Crouch Jr.
Kingsley Shacklebolt
Quirinus Quirrell
Nearly Headless Nick
Aunt Marge
Griphook
Ludo Bagman
## Run benchmark
### Benchmark sglang
Run Llama-8b
```bash
python3 -m sglang.launch_server --model-path meta-llama/Llama-3.1-8B-Instruct --port 30000
```
Benchmark
```bash
python3 bench_sglang.py
```
import argparse
import json
import time
from typing import List, Tuple
import jsonschema
from datasets import load_dataset
import sglang as sgl
from sglang.global_config import global_config
from sglang.srt.hf_transformers_utils import get_tokenizer
from sglang.test.test_utils import (
add_common_sglang_args_and_parse,
select_sglang_backend,
)
from sglang.utils import dump_state_text
@sgl.function
def schema_gen(s, message: Tuple[str, str], json_schema: str):
system, user = message
s += sgl.system(system)
s += sgl.user(user)
s += sgl.assistant(
sgl.gen("json_output", temperature=0, max_tokens=256, json_schema=json_schema)
)
def contains_formats(schema, formats: List[str]):
if isinstance(schema, dict):
if schema.get("format", None) in formats:
return True
for value in schema.values():
if contains_formats(value, formats):
return True
elif isinstance(schema, list):
for item in schema:
if contains_formats(item, formats):
return True
return False
def convert_dataset(path: str):
raw_dataset = load_dataset(path)
dataset = []
for data in raw_dataset["train"]:
messages = data["prompt"]
schema = data["schema"]
obj = json.loads(schema)
# skip some corrupted examples
if obj.get("type", None) is None:
continue
# skip schema with format "email"
# which is not supported by outlines for now
if contains_formats(obj, ["email"]):
continue
system = messages[0]
user = messages[1]
assert system["role"] == "system", "invalid role"
assert user["role"] == "user", "invalid role"
assert len(messages) == 2, "invalid message length"
message = json.dumps(system["content"]), json.dumps(user["content"])
dataset.append(
{
"message": message,
"json_schema": schema,
}
)
return dataset
def bench_schema(args):
arguments = convert_dataset(args.data_path)
if args.num_jsons < 0 or args.num_jsons > len(arguments):
args.num_jsons = len(arguments)
arguments = arguments[: args.num_jsons]
# Select backend
backend = select_sglang_backend(args)
sgl.set_default_backend(backend)
# Run requests
tic = time.perf_counter()
states = schema_gen.run_batch(
arguments,
temperature=0,
num_threads=args.parallel,
progress_bar=True,
)
latency = time.perf_counter() - tic
# Check if the outputs are valid
indexes = []
for i, state in enumerate(states):
try:
schema = json.loads(arguments[i]["json_schema"])
obj = json.loads(state["json_output"])
assert jsonschema.validate(obj, schema) is None
except Exception as e:
print(e)
indexes.append(i)
return states, latency
def main(args):
states, latency = bench_schema(args)
# Compute accuracy
tokenizer = get_tokenizer(
global_config.default_backend.get_server_info()["tokenizer_path"]
)
output_jsons = [state["json_output"] for state in states]
num_output_tokens = sum(len(tokenizer.encode(x)) for x in output_jsons)
print(f"Latency: {latency:.3f}")
print(f"Output throughput: {num_output_tokens / latency:.3f} token/s")
print(f"#output tokens: {num_output_tokens}")
# Write results
dump_state_text(f"tmp_output_{args.backend}.txt", states)
with open(f"{args.backend}.jsonl", "w") as fout:
for state in states:
fout.write(state["json_output"] + "\n")
with open(args.result_file, "a") as fout:
value = {
"task": "json_schema",
"backend": args.backend,
"latency": round(latency, 3),
"num_jsons": args.num_jsons,
"parallel": args.parallel,
}
fout.write(json.dumps(value) + "\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", type=str, default="NousResearch/json-mode-eval")
parser.add_argument("--num-jsons", type=int, default=-1)
args = add_common_sglang_args_and_parse(parser)
main(args)
"""For Now, MSCCL is only supported on TP16 and TP8 case
export WORLD_SIZE=1
export RANK=0
export MASTER_ADDR=127.0.0.1
export MASTER_PORT=12345
torchrun --nproc_per_node gpu \
--nnodes $WORLD_SIZE \
--node_rank $RANK \
--master_addr $MASTER_ADDR \
--master_port $MASTER_PORT benchmark/kernels/all_reduce/benchmark_mscclpp.py
"""
import os
from contextlib import nullcontext
from typing import List
import torch
import torch.distributed as dist
from torch.distributed import ProcessGroup
from sglang.srt.distributed import init_distributed_environment
from sglang.srt.distributed.device_communicators.pymscclpp import PyMscclppCommunicator
from sglang.srt.distributed.device_communicators.pynccl import PyNcclCommunicator
from sglang.srt.distributed.parallel_state import (
get_tensor_model_parallel_group,
graph_capture,
initialize_model_parallel,
set_mscclpp_all_reduce,
)
def torch_allreduce(torch_input: torch.Tensor, group: ProcessGroup) -> torch.Tensor:
dist.all_reduce(torch_input, group=group)
return torch_input
def msccl_allreduce(
msccl_input: torch.Tensor, msccl_comm: PyMscclppCommunicator
) -> torch.Tensor:
return msccl_comm.all_reduce(msccl_input)
def pynccl_allreduce(
msccl_input: torch.Tensor, pynccl_comm: PyNcclCommunicator
) -> torch.Tensor:
pynccl_comm.all_reduce(msccl_input)
return msccl_input
def _bench_graph_time(func, inp_randn, warmup_loop=2, graph_loop=10, test_loop=10):
graph_input = inp_randn.clone()
with graph_capture() as graph_capture_context:
graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(graph, stream=graph_capture_context.stream):
for _ in range(graph_loop):
graph_out = func(graph_input)
graph.replay()
func_output = graph_out.clone()
for _ in range(warmup_loop):
graph.replay()
torch.cuda.synchronize()
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
latencies: List[float] = []
for _ in range(test_loop):
torch.cuda.synchronize()
dist.barrier()
start_event.record()
graph.replay()
end_event.record()
end_event.synchronize()
latencies.append(start_event.elapsed_time(end_event))
func_cost_us = sum(latencies) / len(latencies) / graph_loop * 1000
graph.reset()
return func_output, func_cost_us
def _bench_eager_time(func, inp_randn, warmup_loop=2, test_loop=10):
eager_input = inp_randn.clone()
eager_output = func(eager_input)
func_output = eager_output.clone()
for _ in range(warmup_loop):
func(eager_input)
torch.cuda.synchronize()
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
torch.cuda.synchronize()
start_event.record()
for _ in range(test_loop):
func(eager_input)
end_event.record()
torch.cuda.synchronize()
func_cost_us = start_event.elapsed_time(end_event) / test_loop * 1000
return func_output, func_cost_us
def get_torch_prof_ctx(do_prof: bool):
ctx = (
torch.profiler.profile(
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
],
record_shapes=True,
with_stack=True,
)
if do_prof
else nullcontext()
)
return ctx
def human_readable_size(size, decimal_places=1):
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]:
if size < 1024.0 or unit == "PiB":
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
try:
from tabulate import tabulate
except ImportError:
print("tabulate not installed, skipping table printing")
tabulate = None
def print_markdown_table(data):
if tabulate is not None:
print(tabulate(data, headers="keys", tablefmt="github"))
return
headers = data[0].keys()
header_row = "| " + " | ".join(headers) + " |"
separator = "| " + " | ".join(["---"] * len(headers)) + " |"
rows = []
for item in data:
row = "| " + " | ".join(str(item[key]) for key in headers) + " |"
rows.append(row)
markdown_table = "\n".join([header_row, separator] + rows)
print(markdown_table)
if __name__ == "__main__":
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
force=True,
)
if not dist.is_initialized():
dist.init_process_group(backend="nccl")
world, world_size = dist.group.WORLD, dist.get_world_size()
rank = dist.get_rank()
torch.cuda.set_device(rank % 8)
device = torch.cuda.current_device()
set_mscclpp_all_reduce(True)
init_distributed_environment(
world_size=world_size,
rank=rank,
local_rank=rank % 8,
)
initialize_model_parallel(tensor_model_parallel_size=world_size)
group = get_tensor_model_parallel_group().device_group
cpu_group = get_tensor_model_parallel_group().cpu_group
pynccl_comm = get_tensor_model_parallel_group().pynccl_comm
pymscclpp_comm = get_tensor_model_parallel_group().pymscclpp_comm
dist.barrier()
profile = False
dtype = torch.bfloat16
ctx = get_torch_prof_ctx(profile)
result = []
with ctx:
for i in range(10, 20):
sz = 2**i
if sz * dtype.itemsize > 2**20:
break
inp_randn = torch.randint(1, 16, (sz,), dtype=dtype, device=device)
memory = torch.empty_like(inp_randn)
memory_out = torch.empty_like(memory)
torch_eager_output, torch_eager_time = _bench_eager_time(
lambda inp: torch_allreduce(inp, group), inp_randn
)
msccl_eager_output, msccl_eager_time = _bench_eager_time(
lambda inp: msccl_allreduce(inp, pymscclpp_comm), inp_randn
)
msccl_graph_output, msccl_graph_time = _bench_graph_time(
lambda inp: msccl_allreduce(inp, pymscclpp_comm), inp_randn
)
# since pynccl is inplace op, this return result is not correct if graph loop > 1
_, pynccl_graph_time = _bench_graph_time(
lambda inp: pynccl_allreduce(inp, pynccl_comm), inp_randn
)
torch.testing.assert_close(torch_eager_output, msccl_graph_output)
torch.testing.assert_close(torch_eager_output, msccl_eager_output)
result.append(
{
"msg_size": human_readable_size(inp_randn.nbytes),
"torch eager time": torch_eager_time,
"msccl eager time": msccl_eager_time,
"msccl graph time": msccl_graph_time,
"pynccl graph time": pynccl_graph_time,
}
)
if rank == 0:
print(f"sz={sz}, dtype={dtype}: correctness check PASS!")
if rank == 0:
print_markdown_table(result)
if profile:
prof_dir = f"prof/msccl"
os.makedirs(prof_dir, exist_ok=True)
ctx.export_chrome_trace(f"{prof_dir}/trace_rank{dist.get_rank()}.json.gz")
import itertools
import math
import cudnn
import torch
import torch.utils.benchmark as benchmark
from flashinfer import BatchDecodeWithPagedKVCacheWrapper
from sglang.srt.layers.attention.triton_ops.decode_attention import decode_attention_fwd
from sglang.srt.utils import should_use_tensor_core
def benchmark_forward(
fn,
*inputs,
repeats=10,
amp=False,
amp_dtype=torch.float16,
**kwinputs,
):
def amp_wrapper(*inputs, **kwinputs):
with torch.autocast(device_type="cuda", dtype=amp_dtype, enabled=amp):
fn(*inputs, **kwinputs)
t = benchmark.Timer(
stmt="fn_amp(*inputs, **kwinputs)",
globals={"fn_amp": amp_wrapper, "inputs": inputs, "kwinputs": kwinputs},
num_threads=torch.get_num_threads(),
)
m = t.timeit(repeats)
return t, m
def time_fwd(func, *args, **kwargs):
time_f = benchmark_forward(func, *args, **kwargs)
return time_f[1].mean * 1e6
def decode_attention_sglang(
q,
kv_data,
batch_size,
kv_len,
head_num_q,
head_num_kv,
head_dim,
num_kv_splits,
warmup=10,
):
k_buffer = kv_data[0].view(-1, head_num_kv, head_dim)
v_buffer = kv_data[1].view(-1, head_num_kv, head_dim)
o = torch.empty_like(q)
total_tokens = batch_size * kv_len
req_to_token = torch.arange(0, total_tokens).to(0).int().view(batch_size, kv_len)
b_req_idx = torch.arange(0, batch_size).to(0).int()
b_seq_len = torch.full((batch_size,), kv_len, dtype=torch.int32, device="cuda")
max_len_in_batch = kv_len
sm_scale = 1.0 / (head_dim**0.5)
attn_logits = torch.empty(
(batch_size, head_num_q, num_kv_splits, head_dim + 1),
dtype=torch.float32,
device="cuda",
)
for _ in range(warmup):
decode_attention_fwd(
q,
k_buffer,
v_buffer,
o,
req_to_token,
b_req_idx,
b_seq_len,
attn_logits,
num_kv_splits,
sm_scale,
)
f = time_fwd(
decode_attention_fwd,
q,
k_buffer,
v_buffer,
o,
req_to_token,
b_req_idx,
b_seq_len,
attn_logits,
num_kv_splits,
sm_scale,
)
return f, o
def decode_attention_flashinfer(dtype, head_num_q, head_num_kv):
workspace_buffer = torch.empty(128 * 1024 * 1024, dtype=torch.int8, device="cuda")
use_tensor_cores = should_use_tensor_core(
kv_cache_dtype=dtype,
num_attention_heads=head_num_q,
num_kv_heads=head_num_kv,
)
flashinfer_decode_wrapper = BatchDecodeWithPagedKVCacheWrapper(
workspace_buffer, "NHD", use_tensor_cores=use_tensor_cores
)
class FlashinferAttention(torch.autograd.Function):
@staticmethod
def forward(
ctx,
q,
kv_data,
batch_size,
kv_len,
head_num_q,
head_num_kv,
head_dim,
dtype,
warmup=10,
):
total_tokens = batch_size * kv_len
kv_indptr = torch.arange(0, batch_size + 1).to(0).int() * kv_len
kv_indices = torch.arange(0, total_tokens).to(0).int()
kv_last_page_len = torch.full(
(batch_size,), 1, dtype=torch.int32, device="cuda"
)
flashinfer_decode_wrapper.end_forward()
flashinfer_decode_wrapper.begin_forward(
kv_indptr,
kv_indices,
kv_last_page_len,
head_num_q,
head_num_kv,
head_dim,
1,
pos_encoding_mode="NONE",
data_type=dtype,
)
for _ in range(warmup):
o = flashinfer_decode_wrapper.forward(
q.contiguous().view(-1, head_num_q, head_dim), kv_data
)
f = time_fwd(
flashinfer_decode_wrapper.forward,
q.contiguous().view(-1, head_num_q, head_dim),
kv_data,
)
return f, o
return FlashinferAttention
def convert_to_cudnn_type(torch_type):
if torch_type == torch.float16:
return cudnn.data_type.HALF
elif torch_type == torch.bfloat16:
return cudnn.data_type.BFLOAT16
elif torch_type == torch.float32:
return cudnn.data_type.FLOAT
elif torch_type == torch.int32:
return cudnn.data_type.INT32
elif torch_type == torch.int64:
return cudnn.data_type.INT64
else:
raise ValueError("Unsupported tensor data type.")
def decode_attention_cudnn(
q, kv_data, batch_size, kv_len, head_num_q, head_num_kv, head_dim, dtype, warmup=10
):
# Prepare data: continuous q,k,v
dims_q = (batch_size, head_num_q, 1, head_dim)
strides_q = (head_num_q * head_dim, head_dim, head_num_q * head_dim, 1)
q_gpu = q.as_strided(dims_q, strides_q)
o_gpu = (
torch.empty(batch_size * head_num_q * head_dim)
.half()
.cuda()
.as_strided(dims_q, strides_q)
)
dims_kv = (batch_size, head_num_kv, kv_len, head_dim)
strides_kv = (
kv_len * head_num_kv * head_dim,
head_dim,
head_num_kv * head_dim,
1,
)
k_gpu = kv_data[0].as_strided(dims_kv, strides_kv)
v_gpu = kv_data[1].as_strided(dims_kv, strides_kv)
seq_len_q_gpu = torch.full((batch_size, 1, 1, 1), 1, device="cuda")
seq_len_kv_gpu = torch.full((batch_size, 1, 1, 1), kv_len, device="cuda")
attn_scale = 1.0 / (head_dim**0.5)
# Prepare data: paged k,v
block_size = 1
blocks_per_batch = math.ceil(kv_len / block_size)
# [num_blocks, head_num_kv, block_size, head_dim], num_blocks = batch_size * blocks_per_batch
container_k_gpu = torch.cat(k_gpu.chunk(blocks_per_batch, dim=2), dim=0)
container_v_gpu = torch.cat(v_gpu.chunk(blocks_per_batch, dim=2), dim=0)
page_table_k_gpu = (
torch.linspace(
0,
batch_size * blocks_per_batch - 1,
batch_size * blocks_per_batch,
device="cuda",
dtype=torch.int32,
)
.reshape(blocks_per_batch, 1, batch_size, 1)
.transpose(0, 2)
)
page_table_v_gpu = page_table_k_gpu.clone()
graph = cudnn.pygraph(
io_data_type=convert_to_cudnn_type(dtype),
intermediate_data_type=cudnn.data_type.FLOAT,
compute_data_type=cudnn.data_type.FLOAT,
)
q = graph.tensor_like(q_gpu)
container_k = graph.tensor_like(container_k_gpu)
container_v = graph.tensor_like(container_v_gpu)
page_table_k = graph.tensor_like(page_table_k_gpu)
page_table_v = graph.tensor_like(page_table_v_gpu)
seq_len_q = graph.tensor_like(seq_len_q_gpu)
seq_len_kv = graph.tensor_like(seq_len_kv_gpu)
o, _ = graph.sdpa(
name="sdpa",
q=q,
k=container_k, # Container K: non contiguous container with K blocks
v=container_v, # Container V: non contiguous container with V blocks
is_inference=True,
attn_scale=attn_scale,
use_causal_mask=False,
use_padding_mask=True,
seq_len_q=seq_len_q,
seq_len_kv=seq_len_kv,
paged_attention_k_table=page_table_k, # Page Table K: Tensor containing offsets to the container with K blocks
paged_attention_v_table=page_table_v, # Page Table V: Tensor containing offsets to the container with V blocks
paged_attention_max_seq_len_kv=kv_len, # The maximum sequence length for K caches (this is optional, but recommended)
)
o.set_output(True).set_dim(dims_q).set_stride(strides_q)
graph.validate()
graph.build_operation_graph()
graph.create_execution_plans([cudnn.heur_mode.A])
graph.check_support()
graph.build_plans()
workspace = torch.empty(
graph.get_workspace_size(), device="cuda", dtype=torch.uint8
)
variant_pack = {
q: q_gpu,
container_k: container_k_gpu,
container_v: container_v_gpu,
page_table_k: page_table_k_gpu,
page_table_v: page_table_v_gpu,
seq_len_q: seq_len_q_gpu,
seq_len_kv: seq_len_kv_gpu,
o: o_gpu,
}
for _ in range(warmup):
graph.execute(variant_pack, workspace)
f = time_fwd(
graph.execute,
variant_pack,
workspace,
)
return f, o_gpu.squeeze(dim=2)
def calculate_diff():
dtype = torch.float16
batch_size = 64
kv_len = 4096
head_num_q = 64
head_num_kv = 8
head_dim = 128
q = torch.randn(batch_size, head_num_q, head_dim, dtype=dtype, device="cuda")
kv_data = (
torch.randn(
batch_size * kv_len, head_num_kv, head_dim, dtype=dtype, device="cuda"
),
torch.randn(
batch_size * kv_len, head_num_kv, head_dim, dtype=dtype, device="cuda"
),
)
_, output_sglang = decode_attention_sglang(
q,
kv_data,
batch_size,
kv_len,
head_num_q,
head_num_kv,
head_dim,
num_kv_splits=8,
)
attn_flashinfer = decode_attention_flashinfer(dtype, head_num_q, head_num_kv).apply
_, output_flashinfer = attn_flashinfer(
q, kv_data, batch_size, kv_len, head_num_q, head_num_kv, head_dim, dtype
)
_, output_cudnn = decode_attention_cudnn(
q, kv_data, batch_size, kv_len, head_num_q, head_num_kv, head_dim, dtype
)
print(f"SGLang output={output_sglang}")
print(f"FlashInfer output={output_flashinfer}")
print(f"cuDNN output={output_cudnn}")
if torch.allclose(output_sglang, output_flashinfer, atol=1e-2, rtol=1e-2):
print("✅ SGLang[Triton] and FlashInfer match")
else:
print("❌ SGLang[Triton] and FlashInfer differ")
if torch.allclose(output_sglang, output_cudnn, atol=1e-2, rtol=1e-2):
print("✅ SGLang[Triton] and cuDNN match")
else:
print("❌ SGLang[Triton] and cuDNN differ")
if __name__ == "__main__":
calculate_diff()
head_dim = 128
dtype = torch.float16
batch_size_range = [2**i for i in range(0, 8, 2)]
kv_len_range = [2**i for i in range(6, 13, 1)]
configs = list(itertools.product(batch_size_range, kv_len_range))
for head_num_q, head_num_kv in [[32, 32], [64, 8], [40, 8]]:
attn_flashinfer = decode_attention_flashinfer(
dtype, head_num_q, head_num_kv
).apply
for batch_size, kv_len in configs:
q = torch.randn(
batch_size, head_num_q, head_dim, dtype=dtype, device="cuda"
)
kv_data = (
torch.randn(
batch_size * kv_len,
head_num_kv,
head_dim,
dtype=dtype,
device="cuda",
),
torch.randn(
batch_size * kv_len,
head_num_kv,
head_dim,
dtype=dtype,
device="cuda",
),
)
us_cudnn, output_cudnn = decode_attention_cudnn(
q, kv_data, batch_size, kv_len, head_num_q, head_num_kv, head_dim, dtype
)
us_sglang, output_sglang = decode_attention_sglang(
q,
kv_data,
batch_size,
kv_len,
head_num_q,
head_num_kv,
head_dim,
num_kv_splits=8,
)
us_flashinfer, _ = attn_flashinfer(
q, kv_data, batch_size, kv_len, head_num_q, head_num_kv, head_dim, dtype
)
print(
head_num_q,
" ",
head_num_kv,
" ",
batch_size,
" ",
kv_len,
" ",
us_cudnn,
" ",
us_sglang,
" ",
us_flashinfer,
)
# ADAPTED FROM https://github.com/deepseek-ai/DeepEP/blob/main/tests/utils.py
import os
import sys
from typing import Optional
import numpy as np
import torch
import torch.distributed as dist
def init_dist(local_rank: int, num_local_ranks: int, args):
ip = args.master_addr
port = args.master_port
num_nodes = args.nnodes
node_rank = args.node_rank
assert (num_local_ranks < 8 and num_nodes == 1) or num_local_ranks == 8
dist.init_process_group(
backend="nccl",
init_method=f"tcp://{ip}:{port}",
world_size=num_nodes * num_local_ranks,
rank=node_rank * num_local_ranks + local_rank,
)
torch.set_default_dtype(torch.bfloat16)
torch.set_default_device("cuda")
torch.cuda.set_device(local_rank)
return (
dist.get_rank(),
dist.get_world_size(),
dist.new_group(list(range(num_local_ranks * num_nodes))),
)
def calc_diff(x: torch.Tensor, y: torch.Tensor):
x, y = x.double() + 1, y.double() + 1
denominator = (x * x + y * y).sum()
sim = 2 * (x * y).sum() / denominator
return (1 - sim).item()
def per_token_cast_to_fp8(x: torch.Tensor):
assert x.dim() == 2 and x.size(1) % 128 == 0
m, n = x.shape
x_view = x.view(m, -1, 128)
x_amax = x_view.abs().float().amax(dim=2).view(m, -1).clamp(1e-4)
return (x_view * (448.0 / x_amax.unsqueeze(2))).to(torch.float8_e4m3fn).view(
m, n
), (x_amax / 448.0).view(m, -1)
def per_token_cast_back(x_fp8: torch.Tensor, x_scales: torch.Tensor):
x_fp32 = x_fp8.to(torch.float32).view(x_fp8.size(0), -1, 128)
x_scales = x_scales.view(x_fp8.size(0), -1, 1)
return (x_fp32 * x_scales).view(x_fp8.shape).to(torch.bfloat16)
def inplace_unique(x: torch.Tensor, num_slots: int):
assert x.dim() == 2
mask = x < 0
x_padded = x.masked_fill(mask, num_slots)
bin_count = torch.zeros((x.size(0), num_slots + 1), dtype=x.dtype, device=x.device)
bin_count.scatter_add_(1, x_padded, torch.ones_like(x_padded))
bin_count = bin_count[:, :num_slots]
sorted_bin_count, sorted_bin_idx = torch.sort(bin_count, dim=-1, descending=True)
sorted_bin_idx.masked_fill_(sorted_bin_count == 0, -1)
sorted_bin_idx = torch.sort(sorted_bin_idx, descending=True, dim=-1).values
x[:, :].fill_(-1)
valid_len = min(num_slots, x.size(1))
x[:, :valid_len] = sorted_bin_idx[:, :valid_len]
def create_grouped_scores(
scores: torch.Tensor, group_idx: torch.Tensor, num_groups: int
):
num_tokens, num_experts = scores.shape
scores = scores.view(num_tokens, num_groups, -1)
mask = torch.zeros((num_tokens, num_groups), dtype=torch.bool, device=scores.device)
mask = mask.scatter_(1, group_idx, True).unsqueeze(-1).expand_as(scores)
return (scores * mask).view(num_tokens, num_experts)
def bench(fn, num_warmups: int = 20, num_tests: int = 30, post_fn=None):
# Flush L2 cache with 256 MB data
torch.cuda.synchronize()
cache = torch.empty(int(256e6 // 4), dtype=torch.int, device="cuda")
# Warmup
for _ in range(num_warmups):
fn()
# Flush L2
cache.zero_()
# Testing
start_events = [torch.cuda.Event(enable_timing=True) for _ in range(num_tests)]
end_events = [torch.cuda.Event(enable_timing=True) for _ in range(num_tests)]
for i in range(num_tests):
# Record
start_events[i].record()
fn()
end_events[i].record()
if post_fn is not None:
post_fn()
torch.cuda.synchronize()
times = np.array(
[s.elapsed_time(e) / 1e3 for s, e in zip(start_events, end_events)]
)[1:]
return np.average(times), np.min(times), np.max(times)
class empty_suppress:
def __enter__(self):
return self
def __exit__(self, *_):
pass
class suppress_stdout_stderr:
def __enter__(self):
self.outnull_file = open(os.devnull, "w")
self.errnull_file = open(os.devnull, "w")
self.old_stdout_fileno_undup = sys.stdout.fileno()
self.old_stderr_fileno_undup = sys.stderr.fileno()
self.old_stdout_fileno = os.dup(sys.stdout.fileno())
self.old_stderr_fileno = os.dup(sys.stderr.fileno())
self.old_stdout = sys.stdout
self.old_stderr = sys.stderr
os.dup2(self.outnull_file.fileno(), self.old_stdout_fileno_undup)
os.dup2(self.errnull_file.fileno(), self.old_stderr_fileno_undup)
sys.stdout = self.outnull_file
sys.stderr = self.errnull_file
return self
def __exit__(self, *_):
sys.stdout = self.old_stdout
sys.stderr = self.old_stderr
os.dup2(self.old_stdout_fileno, self.old_stdout_fileno_undup)
os.dup2(self.old_stderr_fileno, self.old_stderr_fileno_undup)
os.close(self.old_stdout_fileno)
os.close(self.old_stderr_fileno)
self.outnull_file.close()
self.errnull_file.close()
def bench_kineto(
fn,
kernel_names,
num_tests: int = 30,
suppress_kineto_output: bool = False,
trace_path: Optional[str] = None,
barrier_comm_profiling: bool = False,
):
# Profile
suppress = suppress_stdout_stderr if suppress_kineto_output else empty_suppress
with suppress():
schedule = torch.profiler.schedule(wait=0, warmup=1, active=1, repeat=1)
with torch.profiler.profile(
activities=[torch.profiler.ProfilerActivity.CUDA], schedule=schedule
) as prof:
for i in range(2):
# NOTES: use a large kernel and a barrier to eliminate the unbalanced CPU launch overhead
if barrier_comm_profiling:
lhs = torch.randn((8192, 8192), dtype=torch.float, device="cuda")
rhs = torch.randn((8192, 8192), dtype=torch.float, device="cuda")
lhs @ rhs
dist.all_reduce(torch.ones(1, dtype=torch.float, device="cuda"))
for _ in range(num_tests):
fn()
prof.step()
# Parse the profiling table
assert isinstance(kernel_names, str) or isinstance(kernel_names, tuple)
is_tupled = isinstance(kernel_names, tuple)
prof_lines = (
prof.key_averages()
.table(sort_by="cuda_time_total", max_name_column_width=100)
.split("\n")
)
kernel_names = (kernel_names,) if isinstance(kernel_names, str) else kernel_names
assert all([isinstance(name, str) for name in kernel_names])
for name in kernel_names:
assert (
sum([name in line for line in prof_lines]) == 1
), f"Errors of the kernel {name} in the profiling table"
# Save chrome traces
if trace_path is not None:
prof.export_chrome_trace(trace_path)
# Return average kernel times
units = {"ms": 1e3, "us": 1e6}
kernel_times = []
for name in kernel_names:
for line in prof_lines:
if name in line:
time_str = line.split()[-2]
for unit, scale in units.items():
if unit in time_str:
kernel_times.append(float(time_str.replace(unit, "")) / scale)
break
break
return tuple(kernel_times) if is_tupled else kernel_times[0]
def hash_tensor(t: torch.Tensor):
return t.view(torch.int64).sum().item()
# MODIFIED FROM https://github.com/deepseek-ai/DeepEP/blob/main/tests/test_internode.py
"""
Example usage:
python tuning_deepep.py --nnodes 4 --node-rank $MY_NODE_RANK --master-addr 1.2.3.4
Then check `deepep_tuned.json`
"""
import argparse
import json
import time
from copy import deepcopy
from pathlib import Path
# noinspection PyUnresolvedReferences
import deep_ep
import torch
import torch.distributed as dist
from deepep_utils import (
bench,
calc_diff,
create_grouped_scores,
init_dist,
inplace_unique,
per_token_cast_back,
per_token_cast_to_fp8,
)
def test_main(
num_sms: int,
local_rank: int,
num_local_ranks: int,
num_ranks: int,
num_nodes: int,
rank: int,
buffer: deep_ep.Buffer,
group: dist.ProcessGroup,
args,
):
# Settings
num_tokens, hidden, num_topk_groups, num_topk, num_experts = (
4096,
7168,
min(num_nodes, 4),
8,
(256 // num_ranks) * num_ranks,
)
assert num_experts % num_ranks == 0 and num_local_ranks == 8
if local_rank == 0:
print(
f"[config] num_tokens={num_tokens}, hidden={hidden}, num_topk_groups={num_topk_groups}, num_topk={num_topk}",
flush=True,
)
# Random data
x = torch.ones((num_tokens, hidden), dtype=torch.bfloat16, device="cuda") * rank
x_pure_rand = torch.randn((num_tokens, hidden), dtype=torch.bfloat16, device="cuda")
x_e4m3 = per_token_cast_to_fp8(x)
scores = (
torch.randn((num_tokens, num_experts), dtype=torch.float32, device="cuda").abs()
+ 1
)
group_scores = scores.view(num_tokens, num_nodes, -1).amax(dim=-1)
group_idx = torch.topk(
group_scores, k=num_topk_groups, dim=-1, sorted=False
).indices
masked_scores = create_grouped_scores(scores, group_idx, num_nodes)
topk_idx = torch.topk(masked_scores, num_topk, dim=-1, largest=True, sorted=False)[
1
]
topk_weights = (
torch.ones((num_tokens, num_topk), dtype=torch.float32, device="cuda") * rank
)
topk_weights_pure_rand = torch.randn(
(num_tokens, num_topk), dtype=torch.float32, device="cuda"
)
rank_idx = topk_idx // (num_experts // num_ranks)
rank_idx.masked_fill_(topk_idx == -1, -1)
inplace_unique(rank_idx, num_ranks)
rdma_rank_idx = rank_idx // num_local_ranks
rdma_rank_idx.masked_fill_(rank_idx == -1, -1)
inplace_unique(rdma_rank_idx, num_nodes)
# RDMA dispatch counts
rdma_idx = topk_idx // (num_experts // num_nodes)
rdma_idx.masked_fill_(topk_idx == -1, -1)
inplace_unique(rdma_idx, num_nodes)
num_rdma_token_sent = rdma_idx.ne(-1).sum().item()
# Expert meta
num_tokens_per_expert = torch.zeros((num_experts,), dtype=torch.int, device="cuda")
for i in range(num_experts):
num_tokens_per_expert[i] = (topk_idx == i).sum()
gbl_num_tokens_per_expert = num_tokens_per_expert.clone()
dist.all_reduce(gbl_num_tokens_per_expert, group=group)
# Rank layout meta
num_tokens_per_rank = torch.empty((num_ranks,), dtype=torch.int, device="cuda")
num_tokens_per_rdma_rank = torch.empty((num_nodes,), dtype=torch.int, device="cuda")
token_idx_in_rank = torch.full(
(num_ranks, num_tokens), -1, dtype=torch.long, device="cuda"
)
for i in range(num_ranks):
num_tokens_per_rank[i] = (rank_idx == i).sum()
token_sel = (rank_idx == i).max(dim=-1)[0]
count = token_sel.sum().item()
tokens = torch.sort(token_sel.to(torch.int), descending=True)[1]
tokens[:count] = torch.sort(tokens[:count])[0]
token_idx_in_rank[i][tokens[:count]] = torch.arange(
count, dtype=torch.long, device="cuda"
)
for i in range(num_nodes):
num_tokens_per_rdma_rank[i] = (rdma_rank_idx == i).sum()
token_idx_in_rank = token_idx_in_rank.T.contiguous().to(torch.int)
is_token_in_rank = token_idx_in_rank >= 0
gbl_num_tokens_per_rank = num_tokens_per_rank.clone()
dist.all_reduce(gbl_num_tokens_per_rank, group=group)
(
ref_num_tokens_per_rank,
ref_num_tokens_per_rdma_rank,
ref_num_tokens_per_expert,
ref_is_token_in_rank,
_,
) = buffer.get_dispatch_layout(topk_idx, num_experts)
assert torch.allclose(ref_num_tokens_per_rank, num_tokens_per_rank)
assert torch.allclose(ref_num_tokens_per_rdma_rank, num_tokens_per_rdma_rank)
assert torch.allclose(ref_num_tokens_per_expert, num_tokens_per_expert)
assert torch.allclose(ref_is_token_in_rank, is_token_in_rank)
t = bench(lambda: buffer.get_dispatch_layout(topk_idx, num_experts))[0]
if local_rank == 0:
print(f"[layout] Kernel performance: {t * 1000:.3f} ms", flush=True)
print("", flush=True)
group.barrier()
time.sleep(1)
# Config
rdma_buffer_size, nvl_buffer_size = 128, (720 if num_ranks in (144, 160) else 512)
config = deep_ep.Config(num_sms, 8, nvl_buffer_size, 16, rdma_buffer_size)
# Test dispatch
# noinspection PyShadowingNames
def check_data(check_x, recv_gbl_rank_prefix_sum):
assert torch.allclose(check_x.amin(dim=1), check_x.amax(dim=1))
check_start = 0
for i in range(num_ranks):
check_end = recv_gbl_rank_prefix_sum[i].item()
assert (check_x[check_start:check_end, :].int() - i).sum().item() == 0
check_start = check_end
for previous_mode in (False, True):
for async_mode in (False, True):
for current_x in (x_pure_rand, x, x_e4m3):
for with_topk in (False, True):
if local_rank == 0:
print(
f'[testing] Running with {"FP8" if isinstance(current_x, tuple) else "BF16"}, {"with" if with_topk else "without"} top-k (async={async_mode}, previous={previous_mode}) ...',
flush=True,
end="",
)
dispatch_args = {
"x": current_x,
"num_tokens_per_rank": num_tokens_per_rank,
"num_tokens_per_rdma_rank": num_tokens_per_rdma_rank,
"is_token_in_rank": is_token_in_rank,
"num_tokens_per_expert": num_tokens_per_expert,
"config": config,
"async_finish": async_mode,
}
if with_topk:
dispatch_args.update(
{
"topk_idx": topk_idx,
"topk_weights": (
topk_weights_pure_rand
if current_x is x_pure_rand
else topk_weights
),
}
)
if previous_mode:
dispatch_args.update({"previous_event": buffer.capture()})
(
recv_x,
recv_topk_idx,
recv_topk_weights,
recv_num_tokens_per_expert_list,
handle,
event,
) = buffer.dispatch(**dispatch_args)
event.current_stream_wait() if async_mode else ()
recv_x = (
per_token_cast_back(*recv_x)
if isinstance(recv_x, tuple)
else recv_x
)
# Checks
recv_gbl_rank_prefix_sum = handle[-4]
assert gbl_num_tokens_per_rank[rank].item() == recv_x.size(
0
), f"{gbl_num_tokens_per_rank[rank].item()} != {recv_x.size(0)}"
assert (
gbl_num_tokens_per_expert.view(num_ranks, -1)[rank].tolist()
== recv_num_tokens_per_expert_list
)
if current_x is not x_pure_rand:
check_data(recv_x, recv_gbl_rank_prefix_sum)
if with_topk:
# Check `topk_idx`
assert (
recv_topk_idx.eq(-1)
| (
(recv_topk_idx >= 0)
& (recv_topk_idx < (num_experts // num_ranks))
)
).sum().item() == recv_topk_idx.numel()
for i, count in enumerate(recv_num_tokens_per_expert_list):
assert recv_topk_idx.eq(i).sum().item() == count
# Check `topk_weights`
if current_x is not x_pure_rand:
recv_topk_weights[recv_topk_idx.eq(-1)] = (
recv_topk_weights.amax(dim=1, keepdim=True).expand_as(
recv_topk_weights
)[recv_topk_idx.eq(-1)]
)
check_data(recv_topk_weights, recv_gbl_rank_prefix_sum)
# Test cached dispatch (must without top-k staffs)
if not with_topk:
dispatch_args = {
"x": current_x,
"handle": handle,
"config": config,
"async_finish": async_mode,
}
if previous_mode:
dispatch_args.update({"previous_event": buffer.capture()})
recv_x, _, _, _, _, event = buffer.dispatch(**dispatch_args)
event.current_stream_wait() if async_mode else ()
recv_x = (
per_token_cast_back(*recv_x)
if isinstance(recv_x, tuple)
else recv_x
)
if current_x is not x_pure_rand:
check_data(recv_x, recv_gbl_rank_prefix_sum)
# Test combine
combine_args = {
"x": recv_x,
"handle": handle,
"config": config,
"async_finish": async_mode,
}
if with_topk:
combine_args.update({"topk_weights": recv_topk_weights})
if previous_mode:
combine_args.update({"previous_event": buffer.capture()})
combined_x, combined_topk_weights, event = buffer.combine(
**combine_args
)
event.current_stream_wait() if async_mode else ()
check_x = combined_x.float() / is_token_in_rank.sum(
dim=1
).unsqueeze(1)
ref_x = x_pure_rand if current_x is x_pure_rand else x
assert calc_diff(check_x, ref_x) < 5e-6
if with_topk:
check_topk_weights = (
combined_topk_weights
if (current_x is x_pure_rand)
else (
combined_topk_weights
/ is_token_in_rank.sum(dim=1).unsqueeze(1)
)
)
ref_topk_weights = (
topk_weights_pure_rand
if current_x is x_pure_rand
else topk_weights
)
assert calc_diff(check_topk_weights, ref_topk_weights) < 1e-9
# For later tuning
dispatch_bf16_rdma_send_bytes = num_rdma_token_sent * hidden * 2
dispatch_bf16_nvl_recv_bytes = recv_x.numel() * 2
combine_bf16_nvl_send_bytes = dispatch_bf16_nvl_recv_bytes
combine_bf16_rdma_recv_bytes = dispatch_bf16_rdma_send_bytes
if local_rank == 0:
print(" passed", flush=True)
if local_rank == 0:
print("", flush=True)
output_data = {}
# Tune dispatch performance
best_dispatch_results = None
fp8_factor = (1 + 4 / 128) / 2
for current_x in (x_e4m3, x):
best_time, best_results = 1e10, None
rdma_send_bytes = (
(dispatch_bf16_rdma_send_bytes * fp8_factor)
if isinstance(current_x, tuple)
else dispatch_bf16_rdma_send_bytes
)
nvl_recv_bytes = (
(dispatch_bf16_nvl_recv_bytes * fp8_factor)
if isinstance(current_x, tuple)
else dispatch_bf16_nvl_recv_bytes
)
for nvl_chunk_size in range(4, 33, 4):
for rdma_chunk_size in range(4, 33, 4):
config_kwargs = {
"num_sms": num_sms,
"num_max_nvl_chunked_send_tokens": nvl_chunk_size,
"num_max_nvl_chunked_recv_tokens": nvl_buffer_size,
"num_max_rdma_chunked_send_tokens": rdma_chunk_size,
"num_max_rdma_chunked_recv_tokens": rdma_buffer_size,
}
config = deep_ep.Config(**config_kwargs)
tune_args = {"x": current_x, "handle": handle, "config": config}
t = bench(lambda: buffer.dispatch(**tune_args))[0]
if t < best_time:
best_time, best_results = t, (
num_sms,
nvl_chunk_size,
rdma_chunk_size,
config_kwargs,
)
if local_rank == 0:
print(
f"[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}, RDMA chunk {rdma_chunk_size}: {rdma_send_bytes / 1e9 / t:.2f} GB/s (RDMA), {nvl_recv_bytes / 1e9 / t:.2f} GB/s (NVL) ",
flush=True,
)
if local_rank == 0:
print(
f'[tuning] Best dispatch ({"FP8" if isinstance(current_x, tuple) else "BF16"}): SMs {best_results[0]}, NVL chunk {best_results[1]}, RDMA chunk {best_results[2]}: {rdma_send_bytes / 1e9 / best_time:.2f} GB/s (RDMA), {nvl_recv_bytes / 1e9 / best_time:.2f} GB/s (NVL)',
flush=True,
)
print("", flush=True)
is_fp8 = isinstance(current_x, tuple)
if is_fp8:
output_data["normal_dispatch"] = deepcopy(best_results[3])
if isinstance(current_x, tuple):
# Gather FP8 the best config from rank 0
best_dispatch_results = torch.tensor(
[best_results[0], best_results[1], best_results[2]],
dtype=torch.int32,
device="cuda",
)
all_best_fp8_results_list = [
torch.zeros_like(best_dispatch_results)
for _ in range(torch.distributed.get_world_size())
]
dist.all_gather(
all_best_fp8_results_list, best_dispatch_results, group=group
)
best_dispatch_results = all_best_fp8_results_list[0].tolist()
dispatch_config = deep_ep.Config(
best_dispatch_results[0],
best_dispatch_results[1],
nvl_buffer_size,
best_dispatch_results[2],
rdma_buffer_size,
)
dispatch_args = {
"x": x,
"num_tokens_per_rank": num_tokens_per_rank,
"num_tokens_per_rdma_rank": num_tokens_per_rdma_rank,
"is_token_in_rank": is_token_in_rank,
"num_tokens_per_expert": num_tokens_per_expert,
"config": dispatch_config if dispatch_config is not None else config,
}
recv_x, _, _, _, handle, _ = buffer.dispatch(**dispatch_args)
# Tune combine performance
best_time, best_results = 1e10, None
for nvl_chunk_size in range(1, 5, 1):
for rdma_chunk_size in range(8, 33, 4):
config_kwargs = {
"num_sms": num_sms,
"num_max_nvl_chunked_send_tokens": nvl_chunk_size,
"num_max_nvl_chunked_recv_tokens": nvl_buffer_size,
"num_max_rdma_chunked_send_tokens": rdma_chunk_size,
"num_max_rdma_chunked_recv_tokens": rdma_buffer_size,
}
config = deep_ep.Config(**config_kwargs)
tune_args = {"x": recv_x, "handle": handle, "config": config}
t = bench(lambda: buffer.combine(**tune_args))[0]
if local_rank == 0:
print(
f"[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}, RDMA chunk {rdma_chunk_size}: {combine_bf16_rdma_recv_bytes / 1e9 / t:.2f} GB/s (RDMA), {combine_bf16_nvl_send_bytes / 1e9 / t:.2f} GB/s (NVL) ",
flush=True,
)
if t < best_time:
best_time, best_results = t, (
num_sms,
nvl_chunk_size,
rdma_chunk_size,
config_kwargs,
)
if local_rank == 0:
print(
f"[tuning] Best combine: SMs {best_results[0]}, NVL chunk {best_results[1]}, RDMA chunk {best_results[2]}: {combine_bf16_rdma_recv_bytes / 1e9 / best_time:.2f} GB/s (RDMA), {combine_bf16_nvl_send_bytes / 1e9 / best_time:.2f} GB/s (NVL)",
flush=True,
)
print("", flush=True)
output_data["normal_combine"] = deepcopy(best_results[3])
if rank == 0 and local_rank == 0:
_write_output(args, output_data)
def _write_output(args, output_data):
text = json.dumps(output_data, indent=4)
output_path = args.output_path
print(f"Write to {output_path} with {text}")
Path(output_path).write_text(text)
# noinspection PyUnboundLocalVariable
def test_loop(local_rank: int, num_local_ranks: int, args):
num_nodes = args.nnodes
rank, num_ranks, group = init_dist(local_rank, num_local_ranks, args)
num_sms = args.num_sms
num_qps_per_rank = num_sms // 2
buffer = deep_ep.Buffer(
group,
int(1e9),
int(1e9),
low_latency_mode=False,
num_qps_per_rank=num_qps_per_rank,
)
assert num_local_ranks == 8 and num_ranks > 8
torch.manual_seed(rank)
for i in (num_sms,):
test_main(
i,
local_rank,
num_local_ranks,
num_ranks,
num_nodes,
rank,
buffer,
group,
args,
)
if local_rank == 0:
print("", flush=True)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--num-sms", type=int, default=24)
parser.add_argument("--output-path", type=str, default="deepep_tuned.json")
parser.add_argument("--nnodes", type=int, default=1)
parser.add_argument("--node-rank", type=int, default=0)
parser.add_argument("--master-addr", type=str, default="127.0.0.1")
parser.add_argument("--master-port", type=int, default=8361)
args = parser.parse_args()
print(f"Start system with {args=}")
num_processes = 8
torch.multiprocessing.spawn(
test_loop, args=(num_processes, args), nprocs=num_processes
)
## DeepSeek kernels benchmark
### Prerequisites
- You should install [DeepGemm](https://github.com/deepseek-ai/DeepGEMM) from source before run `benchmark_deepgemm_fp8_gemm.py` and `benchmark_deepgemm_fp8_group_gemm.py`.
### Benchmark
- `benchmark_deepgemm_fp8_gemm.py`
```bash
python benchmark_deepgemm_fp8_gemm.py --run_correctness --tp_size 1
```
- `benchmark_deepgemm_fp8_group_gemm.py`
```bash
python benchmark_deepgemm_fp8_group_gemm.py --run_correctness --tp_size 1
```
- You can use the `--run_correctness` parameter to verify all kernels results's correctness.
- You can use the `--tp_size` parameter to benchmark all FP8 w8a8 block-wise matrix multiplications involved in DeepSeek V3/R1 under the current tensor parallelism (TP) setting. This benchmark compares DeepSeek's open-source [DeepGemm](https://github.com/deepseek-ai/DeepGEMM) implementation with SGLang's and VLLM Triton implementation.
from typing import Tuple
import deep_gemm
import tilelang
import tilelang.language as T
import torch
import triton
from deep_gemm import ceil_div, get_col_major_tma_aligned_tensor
from vllm.model_executor.layers.quantization.utils.fp8_utils import (
w8a8_block_fp8_matmul as vllm_w8a8_block_fp8_matmul,
)
from sglang.srt.layers.quantization.fp8_kernel import (
w8a8_block_fp8_matmul_deepgemm as w8a8_block_fp8_matmul,
)
# Adapted from https://github.com/tile-ai/tilelang/blob/a8cfdce92795cb861c9033573534653ee040b5ed/examples/deepseek_deepgemm/example_deepgemm_fp8_2xAcc.py#L1
def tl_gemm(
M,
N,
K,
in_dtype,
out_dtype,
accum_dtype,
):
assert in_dtype in [
"e4m3_float8",
], "Currently only e4m3_float8 is supported"
assert out_dtype in [
"bfloat16",
"float16",
], "Currently only bfloat16 and float16 are supported"
TILE_SIZE = (128, 128, 128)
block_M = TILE_SIZE[0]
block_N = TILE_SIZE[1]
block_K = TILE_SIZE[2]
A_shape = (M, K)
Scales_A_shape = (M, T.ceildiv(K, block_K))
B_shape = (N, K)
Scales_B_shape = (T.ceildiv(N, block_N), T.ceildiv(K, block_K))
A_shared_shape = (block_M, block_K)
B_shared_shape = (block_N, block_K)
C_shared_shape = (block_M, block_N)
@T.prim_func
def main(
A: T.Buffer(A_shape, in_dtype),
scales_a: T.Buffer(Scales_A_shape, "float32"),
B: T.Buffer(B_shape, in_dtype),
scales_b: T.Buffer(Scales_B_shape, "float32"),
C: T.Buffer((M, N), out_dtype),
):
with T.Kernel(T.ceildiv(N, block_N), T.ceildiv(M, block_M), threads=128) as (
bx,
by,
):
A_shared = T.alloc_shared(A_shared_shape, in_dtype)
B_shared = T.alloc_shared(B_shared_shape, in_dtype)
C_shared = T.alloc_shared(C_shared_shape, out_dtype)
Scale_C_shared = T.alloc_shared((block_M), "float32")
C_local = T.alloc_fragment(C_shared_shape, accum_dtype)
C_local_accum = T.alloc_fragment(C_shared_shape, accum_dtype)
# Improve L2 Cache
T.use_swizzle(panel_size=10)
T.clear(C_local)
T.clear(C_local_accum)
K_iters = T.ceildiv(K, block_K)
for k in T.Pipelined(K_iters, num_stages=4):
# Load A into shared memory
T.copy(A[by * block_M, k * block_K], A_shared)
# Load B into shared memory
T.copy(B[bx * block_N, k * block_K], B_shared)
# Load scale into shared memory
Scale_B = scales_b[bx, k]
for i in T.Parallel(block_M):
Scale_C_shared[i] = scales_a[by * block_M + i, k] * Scale_B
T.gemm(A_shared, B_shared, C_local, transpose_B=True)
# Promote to enable 2xAcc
for i, j in T.Parallel(block_M, block_N):
C_local_accum[i, j] += C_local[i, j] * Scale_C_shared[i]
T.clear(C_local)
# TMA store
T.copy(C_local_accum, C_shared)
T.copy(C_shared, C[by * block_M, bx * block_N])
return main
def per_token_cast_to_fp8(x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
assert x.dim() == 2 and x.size(1) % 128 == 0
m, n = x.shape
x_view = x.view(m, -1, 128)
x_amax = x_view.abs().float().amax(dim=2).view(m, -1).clamp(1e-4)
return (x_view * (448.0 / x_amax.unsqueeze(2))).to(torch.float8_e4m3fn).view(
m, n
), (x_amax / 448.0).view(m, -1)
def per_block_cast_to_fp8(x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
assert x.dim() == 2
m, n = x.shape
x_padded = torch.zeros(
(ceil_div(m, 128) * 128, ceil_div(n, 128) * 128), dtype=x.dtype, device=x.device
)
x_padded[:m, :n] = x
x_view = x_padded.view(-1, 128, x_padded.size(1) // 128, 128)
x_amax = x_view.abs().float().amax(dim=(1, 3), keepdim=True).clamp(1e-4)
x_scaled = (x_view * (448.0 / x_amax)).to(torch.float8_e4m3fn)
return x_scaled.view_as(x_padded)[:m, :n].contiguous(), (x_amax / 448.0).view(
x_view.size(0), x_view.size(2)
)
def fp8_gemm_deepgemm(
x_fp8: torch.Tensor,
x_scale: torch.Tensor,
y_fp8: torch.Tensor,
y_scale: torch.Tensor,
m: int,
n: int,
k: int,
):
"""DeepGEMM implementation of FP8 GEMM"""
out = torch.empty((m, n), device="cuda", dtype=torch.bfloat16)
# Run DeepGEMM kernel
deep_gemm.gemm_fp8_fp8_bf16_nt((x_fp8, x_scale), (y_fp8, y_scale), out)
return out
def fp8_gemm_sglang(
x_fp8: torch.Tensor,
x_scale: torch.Tensor,
y_fp8: torch.Tensor,
y_scale: torch.Tensor,
m: int,
n: int,
k: int,
):
"""SGLang implementation of FP8 GEMM"""
block_size = [128, 128] # Matches the block size in per_block_cast_to_fp8
# Run SGLang kernel
out = w8a8_block_fp8_matmul(
x_fp8, y_fp8, x_scale, y_scale, block_size, torch.bfloat16
)
return out
def fp8_gemm_vllm(
x_fp8: torch.Tensor,
x_scale: torch.Tensor,
y_fp8: torch.Tensor,
y_scale: torch.Tensor,
m: int,
n: int,
k: int,
):
"""vLLM implementation of FP8 GEMM"""
block_size = [128, 128] # Matches the block size in per_block_cast_to_fp8
# Run vLLM kernel
out = vllm_w8a8_block_fp8_matmul(
x_fp8, y_fp8, x_scale, y_scale, block_size, torch.bfloat16
)
return out
def calculate_diff(m: int, n: int, k: int):
x = torch.randn((m, k), device="cuda", dtype=torch.bfloat16)
y = torch.randn((n, k), device="cuda", dtype=torch.bfloat16)
x_fp8, x_scale = per_token_cast_to_fp8(x.clone())
y_fp8, y_scale = per_block_cast_to_fp8(y.clone())
x_scale_col_major = get_col_major_tma_aligned_tensor(x_scale.clone())
out_deepgemm = fp8_gemm_deepgemm(
x_fp8.clone(),
x_scale_col_major.clone(),
y_fp8.clone(),
y_scale.clone(),
m,
n,
k,
)
out_sglang = fp8_gemm_sglang(
x_fp8.clone(), x_scale.clone(), y_fp8.clone(), y_scale.clone(), m, n, k
)
tilelang_func = tl_gemm(m, n, k, "e4m3_float8", "bfloat16", "float32")
tilelang_kernel = tilelang.compile(tilelang_func, out_idx=[-1])
out_tilelang = tilelang_kernel(
x_fp8.clone(), x_scale.clone(), y_fp8.clone(), y_scale.clone()
)
diff_sglang_deepgemm = torch.abs(out_deepgemm - out_sglang).mean().item()
diff_tilelang_deepgemm = torch.abs(out_deepgemm - out_tilelang).mean().item()
diff_tilelang_sglang = torch.abs(out_tilelang - out_sglang).mean().item()
print(f"Shape m={m}, n={n}, k={k}:")
print(f"DeepGEMM output: {out_deepgemm[0, 0:5]}")
print(f"SGLang output: {out_sglang[0, 0:5]}")
print(f"TileLang output: {out_tilelang[0, 0:5]}")
print(f"Mean absolute difference (SGLang-DeepGEMM): {diff_sglang_deepgemm}")
print(f"Mean absolute difference (TileLang-DeepGEMM): {diff_tilelang_deepgemm}")
print(f"Mean absolute difference (TileLang-SGLang): {diff_tilelang_sglang}")
sglang_deepgemm_match = torch.allclose(
out_deepgemm, out_sglang, atol=1e-2, rtol=1e-2
)
tilelang_deepgemm_match = torch.allclose(
out_deepgemm, out_tilelang, atol=1e-2, rtol=1e-2
)
tilelang_sglang_match = torch.allclose(
out_tilelang, out_sglang, atol=1e-2, rtol=1e-2
)
if sglang_deepgemm_match and tilelang_deepgemm_match and tilelang_sglang_match:
print("✅ All implementations match\n")
else:
print("❌ Some implementations differ:")
print(f" - SGLang vs DeepGEMM: {'✅' if sglang_deepgemm_match else '❌'}")
print(f" - TileLang vs DeepGEMM: {'✅' if tilelang_deepgemm_match else '❌'}")
print(f" - TileLang vs SGLang: {'✅' if tilelang_sglang_match else '❌'}\n")
def get_weight_shapes(tp_size):
# cannot TP
total = [
(512 + 64, 7168),
((128 + 64) * 128, 7168),
(128 * (128 + 128), 512),
(7168, 16384),
(7168, 18432),
]
# N can TP
n_tp = [
(18432 * 2, 7168),
((128 + 64) * 128, 7168),
(128 * (128 + 128), 512),
(24576, 1536),
(4096, 7168),
]
# K can TP
k_tp = [(7168, 18432), (7168, 16384), (7168, 2048)]
weight_shapes = []
for t in total:
weight_shapes.append(t)
for n_t in n_tp:
new_t = (n_t[0] // tp_size, n_t[1])
weight_shapes.append(new_t)
for k_t in k_tp:
new_t = (k_t[0], k_t[1] // tp_size)
weight_shapes.append(new_t)
return weight_shapes
def create_benchmark_configs(tp_size):
configs = []
weight_shapes = get_weight_shapes(tp_size)
batch_sizes = [8, 16, 32, 64, 128, 256, 1024, 2048, 4096]
for n, k in weight_shapes:
for m in batch_sizes:
configs.append((m, n, k, tp_size))
return configs
def get_benchmark(tp_size):
all_configs = create_benchmark_configs(tp_size)
@triton.testing.perf_report(
triton.testing.Benchmark(
x_names=["m", "n", "k", "tp_size"],
x_vals=[list(config) for config in all_configs],
line_arg="provider",
line_vals=["deepgemm", "sglang", "tilelang"],
line_names=["DeepGEMM", "SGLang", "TileLang"],
styles=[("blue", "-"), ("red", "-"), ("green", "-")],
ylabel="ms",
plot_name=f"fp8-gemm-performance-comparison-tp{tp_size}",
args={},
)
)
def benchmark(m, n, k, tp_size, provider):
print(f"Shape (m={m}, n={n}, k={k}, tp={tp_size}), Provider: {provider}")
x = torch.randn((m, k), device="cuda", dtype=torch.bfloat16)
y = torch.randn((n, k), device="cuda", dtype=torch.bfloat16)
# Preprocess data before benchmarking
x_fp8, x_scale = per_token_cast_to_fp8(x)
y_fp8, y_scale = per_block_cast_to_fp8(y)
x_scale_col_major = get_col_major_tma_aligned_tensor(x_scale.clone())
quantiles = [0.5, 0.2, 0.8]
if provider == "deepgemm":
ms, min_ms, max_ms = triton.testing.do_bench(
lambda: fp8_gemm_deepgemm(
x_fp8.clone(),
x_scale_col_major.clone(),
y_fp8.clone(),
y_scale.clone(),
m,
n,
k,
),
quantiles=quantiles,
)
elif provider == "sglang":
ms, min_ms, max_ms = triton.testing.do_bench(
lambda: fp8_gemm_sglang(
x_fp8.clone(),
x_scale.clone(),
y_fp8.clone(),
y_scale.clone(),
m,
n,
k,
),
quantiles=quantiles,
)
else: # tilelang
tilelang_func = tl_gemm(m, n, k, "e4m3_float8", "bfloat16", "float32")
tilelang_kernel = tilelang.compile(tilelang_func, out_idx=[-1])
ms, min_ms, max_ms = triton.testing.do_bench(
lambda: tilelang_kernel(
x_fp8.clone(),
x_scale.clone(),
y_fp8.clone(),
y_scale.clone(),
),
quantiles=quantiles,
)
# Calculate TFLOPS
flops = 2 * m * n * k # multiply-adds
tflops = flops / (ms * 1e-3) / 1e12
# Print shape-specific results with TFLOPS
print(f"Time: {ms*1000:.2f} ms, TFLOPS: {tflops:.2f}")
return ms * 1000, max_ms * 1000, min_ms * 1000 # convert to ms
return benchmark
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--save_path",
type=str,
default="./configs/benchmark_ops/fp8_gemm/",
help="Path to save fp8 gemm benchmark results",
)
parser.add_argument(
"--run_correctness",
action="store_true",
default=True,
help="Whether to run correctness test",
)
parser.add_argument(
"--tp_size",
type=int,
default=1,
help="Tensor parallelism size to benchmark (default: 1)",
)
args = parser.parse_args()
# Set random seed for reproducibility
torch.manual_seed(0)
torch.cuda.manual_seed(0)
# Enable TF32, adapted from https://github.com/deepseek-ai/DeepGEMM/blob/main/tests/test_core.py#L148
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
# Run correctness tests on a few examples
if args.run_correctness:
print("Running correctness tests...")
calculate_diff(64, 512, 7168) # Small test
calculate_diff(64, 7168, 16384) # Medium test
calculate_diff(64, 18432, 7168) # Large test
# Get the benchmark function with the specified tp_size
benchmark = get_benchmark(args.tp_size)
print(f"Running performance benchmark for TP size = {args.tp_size}...")
benchmark.run(print_data=True, save_path=args.save_path)
from typing import Tuple
import deep_gemm
import torch
import triton
import triton.language as tl
from deep_gemm import calc_diff, get_col_major_tma_aligned_tensor
# Import shared functionality from the regular GEMM benchmark
from sglang.benchmark.kernels.deepseek.benchmark_deepgemm_fp8_gemm import (
per_block_cast_to_fp8,
per_token_cast_to_fp8,
)
def construct_grouped_and_flat_fp8(
x: torch.Tensor, y: torch.Tensor, num_groups: int, is_masked: bool
) -> Tuple[
Tuple[torch.Tensor, torch.Tensor], # grouped x_fp8
Tuple[torch.Tensor, torch.Tensor], # grouped y_fp8
Tuple[torch.Tensor, torch.Tensor], # flat x_fp8
Tuple[torch.Tensor, torch.Tensor], # flat y_fp8
torch.Tensor, # output
torch.Tensor, # reference output
]:
# Verify input shapes
m, k = x.shape
n, k_y = y.shape
assert k == k_y, f"Incompatible shapes: x({m}, {k}), y({n}, {k_y})"
assert m % num_groups == 0, f"m({m}) must be divisible by num_groups({num_groups})"
assert m % 4 == 0, f"TMA alignment error: {m}"
# Reshape inputs for grouped processing
m_per_group = m // num_groups
x_grouped = x.view(num_groups, m_per_group, k)
y_grouped = y.unsqueeze(0).expand(num_groups, n, k)
# Initialize output tensors
out = torch.empty((num_groups, m_per_group, n), device="cuda", dtype=torch.bfloat16)
ref_out = torch.einsum("gmk,gnk->gmn", x_grouped, y_grouped)
# Quantize grouped tensors
x_fp8_grouped = (
torch.empty_like(x_grouped, dtype=torch.float8_e4m3fn),
torch.empty(
(num_groups, m_per_group, k // 128), device="cuda", dtype=torch.float
),
)
y_fp8_grouped = (
torch.empty_like(y_grouped, dtype=torch.float8_e4m3fn),
torch.empty(
(num_groups, (n + 127) // 128, k // 128), device="cuda", dtype=torch.float
),
)
for i in range(num_groups):
x_fp8_grouped[0][i], x_fp8_grouped[1][i] = per_token_cast_to_fp8(x_grouped[i])
y_fp8_grouped[0][i], y_fp8_grouped[1][i] = per_block_cast_to_fp8(y_grouped[i])
# Quantize flat tensors
x_fp8_flat = per_token_cast_to_fp8(x)
y_fp8_flat = per_block_cast_to_fp8(y)
# For non-masked input, merge the group and M dims in output
if not is_masked:
x_fp8_grouped = (
x_fp8_grouped[0].view(-1, k),
per_token_cast_to_fp8(x_grouped.view(-1, k))[1],
)
out, ref_out = out.view(-1, n), ref_out.view(-1, n)
# Transpose earlier for testing
x_fp8_grouped = (
x_fp8_grouped[0],
get_col_major_tma_aligned_tensor(x_fp8_grouped[1]),
)
x_fp8_flat = (x_fp8_flat[0], get_col_major_tma_aligned_tensor(x_fp8_flat[1]))
return x_fp8_grouped, y_fp8_grouped, x_fp8_flat, y_fp8_flat, out, ref_out
# Since we don't have a group gemm kernel in SGLang/vLLM, we implemented a
# custom kernel based on the Triton tutorial.
# https://triton-lang.org/main/getting-started/tutorials/03-matrix-multiplication.html
@triton.jit
def fp8_gemm_group_triton_kernel(
# Pointers to matrices
a_ptr,
b_ptr,
c_ptr,
# Pointers to scaling factors
a_scale_ptr,
b_scale_ptr,
# Matrix dimensions
M,
N,
K,
# The stride variables represent how much to increase the ptr by when moving by 1
# element in a particular dimension.
stride_am,
stride_ak,
stride_bk,
stride_bn,
stride_cm,
stride_cn,
# Strides for scaling factors
stride_a_scale_m,
stride_a_scale_k,
stride_b_scale_n,
stride_b_scale_k,
# Meta-parameters
BLOCK_SIZE_M: tl.constexpr,
BLOCK_SIZE_N: tl.constexpr,
BLOCK_SIZE_K: tl.constexpr,
GROUP_SIZE_M: tl.constexpr,
):
"""Kernel for computing the matmul C = A x B with FP8 inputs and scaling factors.
A has shape (M, K), B has shape (K, N) and C has shape (M, N)
Note: Block sizes must be multiples of 32 for optimal TMA performance.
"""
# Map program ids to the block of C it should compute
pid_group = tl.program_id(axis=0) # Group ID
pid_n = tl.program_id(axis=1) # N dimension ID
# Compute the M block ID within this group
group_size_m = min(M - pid_group * GROUP_SIZE_M, GROUP_SIZE_M)
pid_m_within_group = tl.program_id(axis=2) % group_size_m
pid_m = pid_group * GROUP_SIZE_M + pid_m_within_group
# Create pointers for the first blocks of A and B
offs_am = (pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M)) % M
offs_bn = (pid_n * BLOCK_SIZE_N + tl.arange(0, BLOCK_SIZE_N)) % N
offs_k = tl.arange(0, BLOCK_SIZE_K)
a_ptrs = a_ptr + (offs_am[:, None] * stride_am + offs_k[None, :] * stride_ak)
b_ptrs = b_ptr + (offs_k[:, None] * stride_bk + offs_bn[None, :] * stride_bn)
# Initialize accumulator
accumulator = tl.zeros((BLOCK_SIZE_M, BLOCK_SIZE_N), dtype=tl.float32)
# Main loop
for k_block in range(0, tl.cdiv(K, BLOCK_SIZE_K)):
k_offset = k_block * BLOCK_SIZE_K
# Load the next block of A and B, with masks
a = tl.load(a_ptrs, mask=offs_k[None, :] < K - k_offset, other=0.0)
b = tl.load(b_ptrs, mask=offs_k[:, None] < K - k_offset, other=0.0)
# Calculate indices for scaling factors for this K block
a_scale_ptrs = a_scale_ptr + (
offs_am * stride_a_scale_m + k_block * stride_a_scale_k
)
b_scale_ptrs = b_scale_ptr + (
pid_n * stride_b_scale_n + k_block * stride_b_scale_k
)
# Perform matrix multiplication in FP8
res = tl.dot(a, b)
# Load scaling factors for the current block
a_scale = tl.load(a_scale_ptrs)[:, None] # [BLOCK_SIZE_M, 1]
b_scale = tl.load(b_scale_ptrs)
# Apply scaling factors to the accumulated result
accumulator += res * a_scale * b_scale
# Advance pointers
a_ptrs += BLOCK_SIZE_K * stride_ak
b_ptrs += BLOCK_SIZE_K * stride_bk
# Convert to bfloat16 for output
c = accumulator.to(tl.bfloat16)
# Write back the result
offs_cm = pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M)
offs_cn = pid_n * BLOCK_SIZE_N + tl.arange(0, BLOCK_SIZE_N)
c_ptrs = c_ptr + stride_cm * offs_cm[:, None] + stride_cn * offs_cn[None, :]
c_mask = (offs_cm[:, None] < M) & (offs_cn[None, :] < N)
tl.store(c_ptrs, c, mask=c_mask)
def fp8_gemm_group_triton(a_tuple, b_tuple, c, num_groups):
"""
Perform matrix multiplication with FP8 inputs and proper scaling.
Args:
a_tuple: Tuple of (quantized_tensor, scale_factors) for input A
b_tuple: Tuple of (quantized_tensor, scale_factors) for input B
c: Output tensor in BF16 format
num_groups: Number of groups for grouped GEMM
Returns:
Result tensor in BF16 format
"""
# Unpack the tuples
a, a_scale = a_tuple
b, b_scale = b_tuple
M, K = a.shape
_, N = b.shape
# Configure block sizes - must be multiples of 32 for TMA alignment
BLOCK_SIZE_M = 128
BLOCK_SIZE_N = 128
BLOCK_SIZE_K = 128
# Calculate grid dimensions
num_pid_m = triton.cdiv(M, BLOCK_SIZE_M)
num_pid_n = triton.cdiv(N, BLOCK_SIZE_N)
num_groups_grid = triton.cdiv(num_pid_m, num_groups)
# 3D grid launch - (group, n_blocks, m_blocks_per_group)
grid = (num_groups_grid, num_pid_n, min(num_groups, num_pid_m))
fp8_gemm_group_triton_kernel[grid](
a,
b,
c,
a_scale,
b_scale,
M,
N,
K,
a.stride(0),
a.stride(1),
b.stride(0),
b.stride(1),
c.stride(0),
c.stride(1),
a_scale.stride(0),
1, # Stride in the K dimension may be 1
b_scale.stride(0),
1 if b_scale.dim() > 1 else 0,
BLOCK_SIZE_M=BLOCK_SIZE_M,
BLOCK_SIZE_N=BLOCK_SIZE_N,
BLOCK_SIZE_K=BLOCK_SIZE_K,
GROUP_SIZE_M=num_groups,
)
return c
def fp8_gemm_group_deepgemm(x_fp8_grouped, y_fp8_grouped, out, m_indices):
deep_gemm.m_grouped_gemm_fp8_fp8_bf16_nt_contiguous(
x_fp8_grouped,
y_fp8_grouped,
out,
m_indices,
)
return out
def calculate_diff(m: int, n: int, k: int, num_groups: int):
print(f"Shape (m={m}, n={n}, k={k}")
x = torch.randn((m, k), device="cuda", dtype=torch.bfloat16)
y = torch.randn((n, k), device="cuda", dtype=torch.bfloat16)
x_fp8_grouped, y_fp8_grouped, x_fp8_flat, y_fp8_flat, out, out_torch = (
construct_grouped_and_flat_fp8(x, y, num_groups, is_masked=False)
)
m_per_group = m // num_groups
out_deepgemm = out.clone()
m_indices = torch.arange(0, num_groups, device="cuda", dtype=torch.int)
m_indices = (
m_indices.unsqueeze(-1).expand(num_groups, m_per_group).contiguous().view(-1)
)
fp8_gemm_group_deepgemm(
x_fp8_grouped,
y_fp8_grouped,
out_deepgemm,
m_indices,
)
torch.cuda.synchronize()
# Prepare inputs for Triton
a, a_scale = x_fp8_flat
b, b_scale = y_fp8_flat
b = b.T.contiguous()
# Ensure scales are in the right format and contiguous
a_scale, b_scale = a_scale.contiguous(), b_scale.contiguous()
M, _ = a.shape
_, N = b.shape
c = torch.empty((M, N), device=a.device, dtype=torch.bfloat16)
out_triton = fp8_gemm_group_triton((a, a_scale), (b, b_scale), c, num_groups)
torch.cuda.synchronize()
diff_torch_deepgemm = torch.abs(out_torch - out_deepgemm).mean().item()
diff_torch_triton = torch.abs(out_torch - out_triton).mean().item()
diff_deepgemm_triton = torch.abs(out_deepgemm - out_triton).mean().item()
print(f"Shape m={m}, n={n}, k={k}:")
print(f"Torch output: {out_torch[0, 0:5]}")
print(f"DeepGEMM output: {out_deepgemm[0, 0:5]}")
print(f"Triton output: {out_triton[0, 0:5]}")
print(f"Mean absolute difference (Torch-DeepGEMM): {diff_torch_deepgemm}")
print(f"Mean absolute difference (Torch-Triton): {diff_torch_triton}")
print(f"Mean absolute difference (DeepGEMM-Triton): {diff_deepgemm_triton}")
deepgemm_torch_diff = calc_diff(out_deepgemm, out_torch)
triton_torch_diff = calc_diff(out_triton, out_torch)
deepgemm_triton_diff = calc_diff(out_deepgemm, out_triton)
DIFF_THRESHOLD = 0.001
all_match = (
deepgemm_torch_diff < DIFF_THRESHOLD
and triton_torch_diff < DIFF_THRESHOLD
and deepgemm_triton_diff < DIFF_THRESHOLD
)
if all_match:
print("✅ All implementations match\n")
else:
print("❌ Some implementations differ:")
print(
f" - Torch vs DeepGEMM: {'✅' if deepgemm_torch_diff < DIFF_THRESHOLD else '❌'}"
f" - Torch vs Triton: {'✅' if triton_torch_diff < DIFF_THRESHOLD else '❌'}"
f" - DeepGEMM vs Triton: {'✅' if deepgemm_triton_diff < DIFF_THRESHOLD else '❌'}"
)
def get_weight_shapes(tp_size):
# cannot TP
total = [
(512 + 64, 7168),
((128 + 64) * 128, 7168),
(128 * (128 + 128), 512),
(7168, 16384),
(7168, 18432),
]
# N can TP
n_tp = [
(18432 * 2, 7168),
((128 + 64) * 128, 7168),
(128 * (128 + 128), 512),
(24576, 1536),
(4096, 7168),
]
# K can TP
k_tp = [(7168, 18432), (7168, 16384), (7168, 2048)]
weight_shapes = []
for t in total:
weight_shapes.append(t)
for n_t in n_tp:
new_t = (n_t[0] // tp_size, n_t[1])
weight_shapes.append(new_t)
for k_t in k_tp:
new_t = (k_t[0], k_t[1] // tp_size)
weight_shapes.append(new_t)
return weight_shapes
def create_benchmark_configs(tp_size):
configs = []
weight_shapes = get_weight_shapes(tp_size)
batch_sizes = [2048, 4096]
group_sizes = [4, 8]
for n, k in weight_shapes:
for m in batch_sizes:
for num_groups in group_sizes:
configs.append((m, n, k, num_groups, tp_size))
return configs
def get_benchmark(tp_size):
all_configs = create_benchmark_configs(tp_size)
@triton.testing.perf_report(
triton.testing.Benchmark(
x_names=["m", "n", "k", "num_groups", "tp_size"],
x_vals=[config for config in all_configs],
line_arg="provider",
line_vals=["deepgemm", "triton"],
line_names=["DeepGEMM", "Triton"],
styles=[("blue", "-"), ("red", "-")],
ylabel="ms",
plot_name=f"fp8-group-gemm-performance-comparison-tp{tp_size}",
args={},
)
)
def benchmark(m, n, k, num_groups, tp_size, provider):
print(
f"Shape (m={m}, n={n}, k={k}, tp={tp_size}, num_groups={num_groups}, Provider: {provider}"
)
x = torch.randn((m, k), device="cuda", dtype=torch.bfloat16)
y = torch.randn((n, k), device="cuda", dtype=torch.bfloat16)
x_fp8_grouped, y_fp8_grouped, x_fp8_flat, y_fp8_flat, out, out_torch = (
construct_grouped_and_flat_fp8(x, y, num_groups, is_masked=False)
)
m_per_group = m // num_groups
m_indices = torch.arange(0, num_groups, device="cuda", dtype=torch.int)
m_indices = (
m_indices.unsqueeze(-1)
.expand(num_groups, m_per_group)
.contiguous()
.view(-1)
)
quantiles = [0.5, 0.2, 0.8]
if provider == "deepgemm":
ms, min_ms, max_ms = triton.testing.do_bench(
lambda: fp8_gemm_group_deepgemm(
x_fp8_grouped,
y_fp8_grouped,
out,
m_indices,
),
quantiles=quantiles,
)
elif provider == "triton":
# Prepare inputs for Triton
# We did it outside of the lambda function to make it fair comparison like deepgemm
a, a_scale = x_fp8_flat
b, b_scale = y_fp8_flat
b = b.T.contiguous()
# Ensure scales are in the right format and contiguous
a_scale, b_scale = a_scale.contiguous(), b_scale.contiguous()
M, _ = a.shape
_, N = b.shape
c = torch.empty((M, N), device=a.device, dtype=torch.bfloat16)
ms, min_ms, max_ms = triton.testing.do_bench(
lambda: fp8_gemm_group_triton(
(a, a_scale),
(b, b_scale),
c,
num_groups,
),
quantiles=quantiles,
)
# Calculate TFLOPS
flops = 2 * m * n * k # multiply-adds
tflops = flops / (ms * 1e-3) / 1e12
print(f"Time: {ms*1000:.2f} ms, TFLOPS: {tflops:.2f}")
return ms * 1000, max_ms * 1000, min_ms * 1000 # convert to ms
return benchmark
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--save_path",
type=str,
default="./configs/benchmark_ops/fp8_group_gemm/",
help="Path to save deepgemm fp8 group gemm benchmark results",
)
parser.add_argument(
"--run_correctness",
action="store_true",
help="Whether to run correctness test",
)
parser.add_argument(
"--tp_size",
type=int,
default=1,
help="Tensor parallelism size to benchmark (default: 1)",
)
args = parser.parse_args()
# Set random seed for reproducibility
torch.manual_seed(0)
torch.cuda.manual_seed(0)
# Enable TF32, adapted from https://github.com/deepseek-ai/DeepGEMM/blob/main/tests/test_core.py#L148
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
# Run correctness tests on a few examples
if args.run_correctness:
print("Running correctness tests...")
calculate_diff(8192, 7168, 4096, 4)
calculate_diff(8192, 2048, 7168, 4)
calculate_diff(4096, 7168, 4096, 8)
calculate_diff(4096, 2048, 7168, 8)
calculate_diff(4096, 576, 7168, 8)
# Get the benchmark function with the specified tp_size
benchmark = get_benchmark(args.tp_size)
print(f"Running performance benchmark for TP size = {args.tp_size}...")
benchmark.run(print_data=True, save_path=args.save_path)
## Benchmark FBGEMM Grouped GEMM
Benchmark FBGEMM Grouped GEMM in both Triton and CUDA version and SGLang Triton Grouped GEMM, it will be used to compare the bandwidth of different implementations.
### Requirements
```shell
pip install fbgemm-gpu-genai
```
### Usage
```bash
python3 benchmark/fbgemm/benchmark_fbgemm_grouped_gemm.py --model Qwen/Qwen2-57B-A14B-Instruct --tp-size 4 --use-fp8-w8a8
```
For example, in H200, the Qwen2-57B-A14B-Instruct TP4 fp8w8a8 grouped gemm bandwidth result is as follows:
```shell
grouped-gemm-performance:
batch_size FBGEMM Triton Grouped GEMM FP8 FBGEMM CUTLASS F8F8BF16 Rowwise SGLang Grouped GEMM FP8
0 256.0 3704.841339 3042.626402 2254.725030
1 512.0 3691.426346 3029.065684 2269.504543
2 1024.0 3653.938629 2258.471467 2358.319020
3 2048.0 3596.644313 2271.611904 2476.895397
4 4096.0 3468.496435 2231.283986 2179.473910
```
The theoretical peak bandwidth of H200 is 4.8 TB/s. Taking batch_size 256 as an example, the bandwidth of FBGEMM Triton Grouped GEMM FP8 is 3704.841339 GB/s, the bandwidth of FBGEMM CUTLASS F8F8BF16 Rowwise is 3042.626402 GB/s, and the bandwidth of SGLang Grouped GEMM FP8 is 2254.725030 GB/s. Therefore, FBGEMM Triton Grouped GEMM FP8 achieves 77.9% of H200's theoretical peak bandwidth, FBGEMM CUTLASS F8F8BF16 Rowwise achieves 63.4% of H200's theoretical peak bandwidth, and SGLang Grouped GEMM FP8 achieves 46.9% of H200's theoretical peak bandwidth.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment