Commit 24b257f1 authored by sunzhq2's avatar sunzhq2
Browse files

init

parent 920b3c0f
import os
import sys
import time
import grpc
import random
import pathlib
import argparse
from tqdm import tqdm
import numpy as np
import prettytable as pt
import csv
import concurrent.futures
import subprocess
import threading
from functools import partial
from typing import List
FILE_DIR = pathlib.Path(__file__).parent.absolute()
INFER_ROOT_DIR = FILE_DIR.parents[1]
CUR_DIR = pathlib.Path.cwd().absolute()
sys.path.insert(0, str(INFER_ROOT_DIR))
from llm_perf.server import server_pb2, server_pb2_grpc
from llm_perf.server.pb import deserialize_value, serialize_value
def gen_stream_request(
stub: server_pb2_grpc.InferenceStub,
index: int,
prompt: str,
min_new_tokens: int,
max_new_tokens: int,
top_p: float,
top_k: int,
get_input_logits: int
):
req = server_pb2.InferenceRequest(
req_id=str(index) + "_" + str(int(time.time())),
inputs={
"input_messages": serialize_value(prompt),
"min_new_tokens": serialize_value(min_new_tokens),
"max_new_tokens": serialize_value(max_new_tokens),
"top_p": serialize_value(top_p),
"top_k": serialize_value(top_k),
"get_input_logits": serialize_value(get_input_logits),
},
)
for res in stub.StreamingInference(req, wait_for_ready=False):
yield res
start_index = 0
start_condition = threading.Condition()
finish_num = 0
finish_num_lock = threading.Lock()
finish_index = 0
finish_condition = threading.Condition()
def thread_func(
local_rank: int,
world_size: int,
promp_list: List[str],
max_new_tokens: int,
save_logits: bool,
host: str,
port: int
):
global start_index
global finish_num
global finish_index
prompt = promp_list[local_rank]
result_list = []
with grpc.insecure_channel(f"{host}:{port}") as channel:
# wait for start
start_condition.acquire()
while local_rank != start_index:
start_condition.wait()
stub = server_pb2_grpc.InferenceStub(channel)
infer_timeline = tqdm(total=max_new_tokens)
infer_timeline.set_description(f"infer_{local_rank} (max_new_tokens={max_new_tokens})")
infer_timeline.set_postfix({"max_new_tokens": max_new_tokens})
for res in gen_stream_request(
stub,
index=local_rank,
prompt=prompt,
min_new_tokens=1,
max_new_tokens=max_new_tokens,
top_p=0,
top_k=1,
get_input_logits=1 if save_logits else 0,
):
# make sure that each request that in one decode batch is at different stage (different kv len)
if local_rank == start_index:
start_index += 1
start_condition.notify_all()
start_condition.release()
res = {k: deserialize_value(v) for k, v in res.outputs.items()}
result_list.append(res)
if res["choice"]["message"] != "":
infer_timeline.update(1)
# update finish_num
finish_num_lock.acquire()
finish_num += 1
finish_num_lock.release()
finish_condition.acquire()
if finish_num == world_size:
finish_condition.notify_all()
while finish_num != world_size or local_rank != finish_index:
finish_condition.wait()
finish_index += 1
finish_condition.notify_all()
finish_condition.release()
return result_list
def test_infer(
promp_list: List[str],
batch_size: int,
max_new_tokens: int,
save_logits: bool,
workspace: pathlib.Path,
host: str,
port: int
):
collect_results = []
start_time = time.perf_counter_ns()
with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
# create tasks
to_do = []
for i in range(batch_size):
future = executor.submit(thread_func, i, batch_size, promp_list, max_new_tokens, save_logits, host, port)
to_do.append(future)
# wait for all tasks to complete
for future in concurrent.futures.as_completed(to_do):
collect_results.append(future.result())
duration_s = round((time.perf_counter_ns() - start_time) / 1e9, 3)
print(f"\nquery duration: {duration_s}s\n")
valid_res_list = []
logits_list = []
for result_list in collect_results:
if save_logits:
valid_res_list.append(result_list[:-1])
logits_file = INFER_ROOT_DIR.joinpath(result_list[-1]["choice"]["logits_dump"])
logits = np.load(logits_file)
logits_list.append(logits)
logits_file.unlink()
else:
valid_res_list.append(result_list)
prompt_tokens_list = [valid_res[-1]["usage"]["prompt_tokens"] for valid_res in valid_res_list]
completion_tokens_list = [valid_res[-1]["usage"]["completion_tokens"] for valid_res in valid_res_list]
output_messages_list = [valid_res[-1]["choice"]["message"] for valid_res in valid_res_list]
response_match = [output_messages_list[0] == output_messages_i for output_messages_i in output_messages_list]
logits_diff = []
tokens_diff = []
tokens_fork_index = []
if save_logits:
logits = logits_list[0]
for i in range(batch_size):
cur_logits_diff = []
cur_token_diff = []
# logits shape: [1, completion_tokens, vocab_size]
for j in range(min(completion_tokens_list[0], completion_tokens_list[i])):
cur_logits_diff.append(round(abs(logits[0, j, :] - logits_list[i][0, j, :]).mean(), 3))
cur_token_diff.append(abs(np.argmax(logits[0, j, :]) - np.argmax(logits_list[i][0, j, :])))
logits_diff.append(cur_logits_diff)
tokens_diff.append(cur_token_diff)
tokens_fork_index.append(np.flatnonzero(np.array(cur_token_diff) > 0))
output_txt = workspace.joinpath("output.txt")
with open(output_txt, "w") as f:
print_func = partial(print, file=f)
print_func("-" * 150)
print_func(f"* prompts (batch_size={batch_size})")
for i in range(batch_size):
print_func(f"{i}: {promp_list[i]}")
print_func("")
print_func(f"* response (batch_size={batch_size})")
for i in range(batch_size):
print_func(f"{i}: {output_messages_list[i]}")
print_func("")
print_func(f"* response match: {response_match}")
if save_logits:
print_func("")
print_func(f"* logits mean diff: {[round(sum(diff) / len(diff), 3) for diff in logits_diff]}")
print_func("")
print_func(f"* token diverge index: {[None if len(diff) == 0 else diff[0] for diff in tokens_fork_index]}")
print_func("-" * 150)
print(output_txt.read_text())
subprocess.run(f"rm -rf {workspace}/batch_*", shell=True)
for index in range(batch_size):
batch_workspace = workspace.joinpath(f"batch_{index}")
batch_workspace.mkdir()
prompt_txt = batch_workspace.joinpath("prompt.txt")
with open(prompt_txt, "w") as f:
f.write(promp_list[index])
response_txt = batch_workspace.joinpath("response.txt")
with open(response_txt, "w") as f:
f.write(output_messages_list[index])
output_txt = batch_workspace.joinpath("output.txt")
latency_csv = batch_workspace.joinpath("latency.csv")
if save_logits:
logits_npy = batch_workspace.joinpath(f"logits.npy")
np.save(logits_npy, logits_list[index])
prompt_tokens = prompt_tokens_list[index]
completion_tokens = completion_tokens_list[index]
output_messages = output_messages_list[index]
valid_res = valid_res_list[index]
model_time_list = [res["choice"]["model_time"] for res in valid_res]
post_process_time_list = [res["choice"]["post_process_time"] for res in valid_res]
wait_time_list = [res["choice"]["wait_time"] for res in valid_res]
first_token_latency = model_time_list[0]
first_token_latency = round(first_token_latency, 3)
per_token_latency = sum(model_time_list[1:]) / (completion_tokens - 1) if completion_tokens > 1 else 0
per_token_latency = round(per_token_latency, 3)
summart_tb = pt.PrettyTable()
summart_tb.field_names = ["Metric", "Value"]
summart_tb.add_row(["Min new tokens", 1])
summart_tb.add_row(["Max new tokens", f"{max_new_tokens}"])
summart_tb.add_row(["Prompt tokens", f"{prompt_tokens}"])
summart_tb.add_row(["Completion tokens", f"{completion_tokens}"])
summart_tb.add_row(["First token latency", f"{first_token_latency} ms"])
summart_tb.add_row(["Per token latency (Avg)", f"{per_token_latency} ms"])
if save_logits:
summart_tb.add_row(["Output logits shape", f"{logits.shape}"])
if len(tokens_fork_index[index]) > 0:
summart_tb.add_row(["Token diverge index", f"{tokens_fork_index[index][0]}"])
else:
summart_tb.add_row(["Token diverge index", "None"])
file_tb = pt.PrettyTable()
file_tb.field_names = ["File", "Path"]
file_tb.add_row(["Prompt text", str(prompt_txt)])
file_tb.add_row(["Response text", str(response_txt)])
file_tb.add_row(["Output text", str(output_txt)])
file_tb.add_row(["Latency csv", str(latency_csv)])
if save_logits:
file_tb.add_row(["Logits npy", str(logits_npy)])
with open(output_txt, "w") as file:
print("-" * 150, file=file)
print(f"* prompt: ({prompt_tokens} tokens)", file=file)
print(promp_list[index], file=file)
print("", file=file)
print(f"* response: ({completion_tokens} tokens)", file=file)
print(output_messages, file=file)
print("-" * 150, file=file)
print(summart_tb, file=file)
if save_logits:
print(f"logits_diff: {logits_diff[index]}", file=file)
print(f"token_diff: {tokens_diff[index]}", file=file)
print(file_tb, file=file)
with open(latency_csv, "w") as f:
csv_writer = csv.writer(f)
csv_writer.writerow([
"mode", "past_kv", "q_len", "token_generated",
"wait_time (ms)", "model_time (ms)", "post_process_time (ms)",
"message"])
for i, res in enumerate(valid_res):
if i == 0:
csv_writer.writerow([
"prefill",
i,
prompt_tokens,
res["usage"]["completion_tokens"],
round(res["choice"]["wait_time"], 3),
round(res["choice"]["model_time"], 3),
round(res["choice"]["post_process_time"], 3),
res["choice"]["message"]
])
else:
csv_writer.writerow([
"decode",
prompt_tokens + i - 1,
1,
res["usage"]["completion_tokens"],
round(res["choice"]["wait_time"], 3),
round(res["choice"]["model_time"], 3),
round(res["choice"]["post_process_time"], 3),
res["choice"]["message"]
])
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--prompt", type=str, nargs='+', default=["How are you?"])
parser.add_argument("--prompt_tokens", type=int)
parser.add_argument("--batch_size", type=int, default=1)
parser.add_argument("--max_new_tokens", type=int, default=100)
parser.add_argument("--save_logits", action="store_true")
parser.add_argument("--workspace", type=str, default=None)
parser.add_argument("--host", type=str, default="127.0.0.1")
parser.add_argument("--port", type=int, default=51000)
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
prompt = args.prompt
prompt_tokens = args.prompt_tokens
batch_size = args.batch_size
max_new_tokens = args.max_new_tokens
save_logits = args.save_logits
workspace = args.workspace
host = args.host
port = args.port
# create prompt list
candidate_words = [
"you", "who", "that", "info", "debug", "error", "correct",
"true", "false", "token", "model", "batch", "launch", "server",
"setup"
]
prompt_list = []
if prompt_tokens is not None:
for _ in range(batch_size):
prompt_list.append(" ".join([random.choice(candidate_words) for _ in range(prompt_tokens)]))
else:
if len(prompt) == 1:
assert batch_size >= 1 and batch_size <= 32
prompt_list = prompt * batch_size
else:
batch_size = len(prompt)
prompt_list = prompt
if workspace is not None:
workspace = pathlib.Path(args.workspace).absolute()
else:
workspace = INFER_ROOT_DIR.joinpath(
"llm_perf", "reports", "single_query")
if not workspace.exists():
workspace.mkdir(parents=True, exist_ok=True)
test_infer(prompt_list, batch_size, max_new_tokens, save_logits, workspace, host, port)
import sys
import json
import pathlib
import asyncio
import importlib
from typing import Any, AsyncIterable, Dict, Iterable
from transformers import AutoTokenizer, PreTrainedTokenizer
from llm_perf.core.generation import GenerateConfig, GenerateRequest, GenerateResult
from llm_perf.core.scheduler import CoreScheduler
from llm_perf.utils.logger import logger
class LLMPerfEndpoint:
def __init__(self, xpu_cfg) -> None:
super().__init__()
self.xpu_cfg = xpu_cfg
hardware_type = xpu_cfg["hardware_type"]
model_config = xpu_cfg["model_config"]
# load tokenizer
try:
tokenizer_config = model_config["tokenizer"]
tokenizer_path = tokenizer_config["path"]
self.tokenizer = AutoTokenizer.from_pretrained(
pretrained_model_name_or_path=tokenizer_path,
local_files_only=True,
trust_remote_code=True
)
self.support_chn = tokenizer_config.get("support_chn", False)
self.apply_chat_template = tokenizer_config.get("apply_chat_template", False)
except Exception as e:
logger.error(f"load tokenizer error: {e}")
sys.exit(-1)
logger.info(f"load tokenizer: {tokenizer_path}")
logger.info("*"*50)
logger.info(f"bos_token_id: {self.tokenizer.bos_token_id}")
logger.info(f"eos_token_id: {self.tokenizer.eos_token_id}")
logger.info(f"unk_token_id: {self.tokenizer.unk_token_id}")
logger.info(f"pad_token_id: {self.tokenizer.pad_token_id}")
logger.info("*"*50)
xpu_cfg["bos_token_id"] = self.tokenizer.bos_token_id
xpu_cfg["eos_token_id"] = self.tokenizer.eos_token_id
xpu_cfg["unk_token_id"] = self.tokenizer.unk_token_id
xpu_cfg["pad_token_id"] = self.tokenizer.pad_token_id
# import setup according to hardware_type
setup = importlib.import_module(
".setup", package=f"llm_perf.backends.{hardware_type}"
)
logger.info(f"import setup: {setup}")
# setup scheduler
self.scheduler : CoreScheduler = setup.setup_scheduler(xpu_cfg)
self.scheduler.start()
self.warmup(xpu_cfg["max_batch_size"])
def __del__(self):
if hasattr(self, "scheduler") and self.scheduler is not None:
self.scheduler.stop()
def warmup(self, max_batch_size):
if self.support_chn:
prompt = "7年前,我的年龄是我的儿子的6倍,我的儿子今年12岁,我今年多少岁?"
else:
prompt = "7 years ago, I was 6 times older than my son. My son is 12 years old now. How old am I now?"
generate_config = {
"min_new_tokens": 1,
"max_new_tokens": 512,
"top_k": 1,
"get_input_logits": 0
}
logger.info(f"warmup prompt: {prompt}, config: {generate_config}")
async def _steram_warmup():
message = ""
async for result in self.streaming_inference(prompt, generate_config):
message = result["choice"]["message"]
result["choice"]["message"] = message
return result
async def _multiple_warmup():
tasks = []
for _ in range(max_batch_size):
tasks.append(_steram_warmup())
res = await asyncio.gather(*tasks)
return res
single_result = asyncio.run(_steram_warmup())
message = single_result["choice"]["message"]
logger.info(f"single warmup response: {message}\n")
multiple_result = asyncio.run(_multiple_warmup())
for i, result in enumerate(multiple_result):
message = result["choice"]["message"]
logger.info(f"multiple warmup reponse {i}: {message}\n")
async def prepare_request(
self, prompt: str, generate_config: Dict[str, Any]
) -> GenerateRequest:
if not self.apply_chat_template:
input_ids = self.tokenizer.encode(prompt)
else:
input_ids = self.tokenizer.apply_chat_template(
[
{"role": "user", "content": prompt}
],
add_generation_prompt=True
)
# create generate config
config = GenerateConfig(
min_new_tokens=generate_config.get("min_new_tokens", 0),
max_new_tokens=generate_config.get("max_new_tokens", 0),
top_k=generate_config.get("top_k", 0),
top_p=generate_config.get("top_p", 1.0),
temperature=generate_config.get("temperature", 1.0),
presence_penalty=generate_config.get("presence_penalty", 1.0),
eos_token_id=self.tokenizer.eos_token_id,
pad_token_id=self.tokenizer.pad_token_id,
get_input_logits=bool(generate_config.get("get_input_logits", 0)),
)
req = GenerateRequest(input_ids, config)
return req
async def streaming_inference(
self,
prompt: str,
generate_config: Dict[str, Any]
) -> AsyncIterable[Dict[str, Any]]:
try:
# create GenerateRequest object
req = await self.prepare_request(prompt, generate_config)
prompt_tokens = len(req.input_ids)
completion_tokens = 0
tokens_buffer = []
async for gen_res in self.scheduler.generate(req):
result = gen_res["result"]
if result is not None:
completion_tokens += 1
infer_outputs = {
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
"choice": {
"message": ""
}
}
if result is not None:
tokens_buffer.append(result.token_id)
text = self.tokenizer.decode(tokens_buffer, skip_special_tokens=True, clean_up_tokenization_spaces=True)
infer_outputs["choice"].update(
{
"message": text,
"wait_time": result.wait_time,
"model_time": result.model_time,
"post_process_time": result.post_process_time
}
)
if req.generate_config.get_input_logits:
infer_outputs["choice"].update(
{
"perplexity": gen_res["perplexity"],
"logits_dump": gen_res["dump_file"]
}
)
logger.debug(f"steam inference result: {infer_outputs}")
yield infer_outputs
except Exception as e:
logger.error(f"stream inference error: {e}")
raise e
import os
import sys
import json
import pathlib
import argparse
import asyncio
from concurrent import futures
from typing import Any, AsyncIterable, Dict, Iterable, List
import grpc
import signal
# ${prj_root}/byte_infer_perf
CUR_DIR = pathlib.Path.cwd().absolute()
BYTE_MLPERF_ROOT = pathlib.Path(__file__).absolute().parents[2].__str__()
os.chdir(BYTE_MLPERF_ROOT)
sys.path.insert(0, BYTE_MLPERF_ROOT)
from llm_perf.server import server_pb2, server_pb2_grpc
from llm_perf.server.pb import deserialize_value, serialize_value
from llm_perf.server.endpoint import LLMPerfEndpoint
from llm_perf.utils.logger import logger, setup_logger
class TestServer(server_pb2_grpc.InferenceServicer):
def __init__(self, generator: LLMPerfEndpoint) -> None:
super().__init__()
self.generator = generator
async def StreamingInference(
self,
request: server_pb2.InferenceRequest,
context: grpc.ServicerContext
) -> AsyncIterable[server_pb2.InferenceResponse]:
logger.debug(f"StreamingInference request id {request.req_id}")
req = {k: deserialize_value(v) for k, v in request.inputs.items()}
prompt = req["input_messages"]
generate_config = {
"min_new_tokens": req["min_new_tokens"],
"max_new_tokens": req["max_new_tokens"],
"top_p": req["top_p"],
"top_k": req["top_k"],
"get_input_logits": req["get_input_logits"],
}
# Generating
async for result in self.generator.streaming_inference(
prompt=prompt, generate_config=generate_config
):
yield server_pb2.InferenceResponse(
req_id=request.req_id,
outputs={k: serialize_value(v) for k, v in result.items()},
)
async def serve(port, generator: LLMPerfEndpoint) -> None:
server = grpc.aio.server(
migration_thread_pool=futures.ThreadPoolExecutor(
max_workers=min(os.cpu_count(), 100),
thread_name_prefix="server"
)
)
server_pb2_grpc.add_InferenceServicer_to_server(
TestServer(generator), server
)
server.add_insecure_port(f"[::]:{port}")
await server.start()
logger.info(f"GRPC Server start at {port}")
fifo_name = "./server_fifo"
with open(fifo_name, "w") as fifo_fd:
fifo_fd.write("Server Ready")
fifo_fd.flush()
await server.wait_for_termination()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--model_config", type=str, required=True)
parser.add_argument("--hardware_type", type=str, default="GPU")
parser.add_argument("--tp_size", type=int, default=1)
parser.add_argument("--max_batch_size", type=int, default=8)
parser.add_argument("--port", type=int, default=51000)
parser.add_argument("--log_level", type=str, default="info")
return parser.parse_args()
def main():
args = parse_args()
setup_logger(args.log_level)
# create xpu config
xpu_cfg = {}
xpu_cfg["hardware_type"] = args.hardware_type
xpu_cfg["tp_size"] = args.tp_size
xpu_cfg["max_batch_size"] = args.max_batch_size
model_config_path = CUR_DIR / args.model_config
if not model_config_path.exists():
logger.error(f"model_config_path not exist")
sys.exit(-1)
with open(model_config_path, 'r') as file:
model_config = json.load(file)
xpu_cfg["model_config"] = model_config
generator = LLMPerfEndpoint(xpu_cfg)
asyncio.run(serve(args.port, generator))
if __name__ == "__main__":
main()
from typing import Any, Generator, Iterable, List
from llm_perf.server import server_pb2, server_pb2_grpc
def deserialize_value(value: server_pb2.Value) -> Any:
kind = value.WhichOneof("kind")
if kind == "float_":
return value.float_
elif kind == "int64_":
return value.int64_
elif kind == "bytes_":
return value.bytes_
elif kind == "string_":
return value.string_
elif kind == "float_list":
return [v for v in value.float_list.values]
elif kind == "int64_list":
return [v for v in value.int64_list.values]
elif kind == "bytes_list":
return [v for v in value.bytes_list.values]
elif kind == "string_list":
return [v for v in value.string_list.values]
elif kind == "struct_":
return {k: deserialize_value(v) for k, v in value.struct_.fields.items()}
else:
raise TypeError(f"Invalid type {type(value)}")
def serialize_value(value: Any) -> server_pb2.Value:
if isinstance(value, float):
return server_pb2.Value(float_=value)
elif isinstance(value, int):
return server_pb2.Value(int64_=value)
elif isinstance(value, bytes):
return server_pb2.Value(bytes_=value)
elif isinstance(value, str):
return server_pb2.Value(string_=value)
elif isinstance(value, list):
if isinstance(value[0], float):
return server_pb2.Value(float_list=server_pb2.FloatList(values=value))
elif isinstance(value[0], int):
return server_pb2.Value(int64_list=server_pb2.Int64List(values=value))
elif isinstance(value[0], bytes):
return server_pb2.Value(bytes_list=server_pb2.BytesList(values=value))
elif isinstance(value[0], str):
return server_pb2.Value(string_list=server_pb2.StringList(values=value))
elif isinstance(value, dict):
return server_pb2.Value(
struct_=server_pb2.Struct(
fields={k: serialize_value(v) for k, v in value.items()}
)
)
else:
raise TypeError(f"Invalid type {type(value)}")
\ No newline at end of file
syntax = "proto3";
package llm_perf;
// Containers to hold repeated fundamental values.
message FloatList {
repeated double values = 1;
}
message Int64List {
repeated int64 values = 1;
}
message BytesList {
repeated bytes values = 1;
}
message StringList {
repeated string values = 1;
}
message Struct {
// Unordered map of dynamically typed values.
map<string, Value> fields = 1;
}
// Container for non-sequential data.
message Value {
oneof kind {
FloatList float_list = 1;
Int64List int64_list = 2;
BytesList bytes_list = 3;
float float_ = 4;
int64 int64_ = 5;
bytes bytes_ = 6;
StringList string_list = 7;
string string_ = 8;
Struct struct_ = 9;
}
}
message InferenceRequest {
string req_id = 1;
map<string, Value> inputs = 2;
}
message InferenceResponse {
string req_id = 1;
map<string, Value> outputs = 2;
}
service Inference {
// 1 request -> 1 response
rpc CompleteInference(InferenceRequest) returns (InferenceResponse) {}
// 1 request -> m response
rpc StreamingInference(InferenceRequest) returns (stream InferenceResponse) {}
}
\ No newline at end of file
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: server.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x0cserver.proto\x12\x08llm_perf"\x1b\n\tFloatList\x12\x0e\n\x06values\x18\x01 \x03(\x01"\x1b\n\tInt64List\x12\x0e\n\x06values\x18\x01 \x03(\x03"\x1b\n\tBytesList\x12\x0e\n\x06values\x18\x01 \x03(\x0c"\x1c\n\nStringList\x12\x0e\n\x06values\x18\x01 \x03(\t"v\n\x06Struct\x12,\n\x06\x66ields\x18\x01 \x03(\x0b\x32\x1c.llm_perf.Struct.FieldsEntry\x1a>\n\x0b\x46ieldsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1e\n\x05value\x18\x02 \x01(\x0b\x32\x0f.llm_perf.Value:\x02\x38\x01"\xab\x02\n\x05Value\x12)\n\nfloat_list\x18\x01 \x01(\x0b\x32\x13.llm_perf.FloatListH\x00\x12)\n\nint64_list\x18\x02 \x01(\x0b\x32\x13.llm_perf.Int64ListH\x00\x12)\n\nbytes_list\x18\x03 \x01(\x0b\x32\x13.llm_perf.BytesListH\x00\x12\x10\n\x06\x66loat_\x18\x04 \x01(\x02H\x00\x12\x10\n\x06int64_\x18\x05 \x01(\x03H\x00\x12\x10\n\x06\x62ytes_\x18\x06 \x01(\x0cH\x00\x12+\n\x0bstring_list\x18\x07 \x01(\x0b\x32\x14.llm_perf.StringListH\x00\x12\x11\n\x07string_\x18\x08 \x01(\tH\x00\x12#\n\x07struct_\x18\t \x01(\x0b\x32\x10.llm_perf.StructH\x00\x42\x06\n\x04kind"\x9a\x01\n\x10InferenceRequest\x12\x0e\n\x06req_id\x18\x01 \x01(\t\x12\x36\n\x06inputs\x18\x02 \x03(\x0b\x32&.llm_perf.InferenceRequest.InputsEntry\x1a>\n\x0bInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1e\n\x05value\x18\x02 \x01(\x0b\x32\x0f.llm_perf.Value:\x02\x38\x01"\x9f\x01\n\x11InferenceResponse\x12\x0e\n\x06req_id\x18\x01 \x01(\t\x12\x39\n\x07outputs\x18\x02 \x03(\x0b\x32(.llm_perf.InferenceResponse.OutputsEntry\x1a?\n\x0cOutputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1e\n\x05value\x18\x02 \x01(\x0b\x32\x0f.llm_perf.Value:\x02\x38\x01\x32\xae\x01\n\tInference\x12N\n\x11\x43ompleteInference\x12\x1a.llm_perf.InferenceRequest\x1a\x1b.llm_perf.InferenceResponse"\x00\x12Q\n\x12StreamingInference\x12\x1a.llm_perf.InferenceRequest\x1a\x1b.llm_perf.InferenceResponse"\x00\x30\x01\x62\x06proto3'
)
_FLOATLIST = DESCRIPTOR.message_types_by_name["FloatList"]
_INT64LIST = DESCRIPTOR.message_types_by_name["Int64List"]
_BYTESLIST = DESCRIPTOR.message_types_by_name["BytesList"]
_STRINGLIST = DESCRIPTOR.message_types_by_name["StringList"]
_STRUCT = DESCRIPTOR.message_types_by_name["Struct"]
_STRUCT_FIELDSENTRY = _STRUCT.nested_types_by_name["FieldsEntry"]
_VALUE = DESCRIPTOR.message_types_by_name["Value"]
_INFERENCEREQUEST = DESCRIPTOR.message_types_by_name["InferenceRequest"]
_INFERENCEREQUEST_INPUTSENTRY = _INFERENCEREQUEST.nested_types_by_name["InputsEntry"]
_INFERENCERESPONSE = DESCRIPTOR.message_types_by_name["InferenceResponse"]
_INFERENCERESPONSE_OUTPUTSENTRY = _INFERENCERESPONSE.nested_types_by_name[
"OutputsEntry"
]
FloatList = _reflection.GeneratedProtocolMessageType(
"FloatList",
(_message.Message,),
{
"DESCRIPTOR": _FLOATLIST,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.FloatList)
},
)
_sym_db.RegisterMessage(FloatList)
Int64List = _reflection.GeneratedProtocolMessageType(
"Int64List",
(_message.Message,),
{
"DESCRIPTOR": _INT64LIST,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.Int64List)
},
)
_sym_db.RegisterMessage(Int64List)
BytesList = _reflection.GeneratedProtocolMessageType(
"BytesList",
(_message.Message,),
{
"DESCRIPTOR": _BYTESLIST,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.BytesList)
},
)
_sym_db.RegisterMessage(BytesList)
StringList = _reflection.GeneratedProtocolMessageType(
"StringList",
(_message.Message,),
{
"DESCRIPTOR": _STRINGLIST,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.StringList)
},
)
_sym_db.RegisterMessage(StringList)
Struct = _reflection.GeneratedProtocolMessageType(
"Struct",
(_message.Message,),
{
"FieldsEntry": _reflection.GeneratedProtocolMessageType(
"FieldsEntry",
(_message.Message,),
{
"DESCRIPTOR": _STRUCT_FIELDSENTRY,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.Struct.FieldsEntry)
},
),
"DESCRIPTOR": _STRUCT,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.Struct)
},
)
_sym_db.RegisterMessage(Struct)
_sym_db.RegisterMessage(Struct.FieldsEntry)
Value = _reflection.GeneratedProtocolMessageType(
"Value",
(_message.Message,),
{
"DESCRIPTOR": _VALUE,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.Value)
},
)
_sym_db.RegisterMessage(Value)
InferenceRequest = _reflection.GeneratedProtocolMessageType(
"InferenceRequest",
(_message.Message,),
{
"InputsEntry": _reflection.GeneratedProtocolMessageType(
"InputsEntry",
(_message.Message,),
{
"DESCRIPTOR": _INFERENCEREQUEST_INPUTSENTRY,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.InferenceRequest.InputsEntry)
},
),
"DESCRIPTOR": _INFERENCEREQUEST,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.InferenceRequest)
},
)
_sym_db.RegisterMessage(InferenceRequest)
_sym_db.RegisterMessage(InferenceRequest.InputsEntry)
InferenceResponse = _reflection.GeneratedProtocolMessageType(
"InferenceResponse",
(_message.Message,),
{
"OutputsEntry": _reflection.GeneratedProtocolMessageType(
"OutputsEntry",
(_message.Message,),
{
"DESCRIPTOR": _INFERENCERESPONSE_OUTPUTSENTRY,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.InferenceResponse.OutputsEntry)
},
),
"DESCRIPTOR": _INFERENCERESPONSE,
"__module__": "server_pb2"
# @@protoc_insertion_point(class_scope:llm_perf.InferenceResponse)
},
)
_sym_db.RegisterMessage(InferenceResponse)
_sym_db.RegisterMessage(InferenceResponse.OutputsEntry)
_INFERENCE = DESCRIPTOR.services_by_name["Inference"]
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_STRUCT_FIELDSENTRY._options = None
_STRUCT_FIELDSENTRY._serialized_options = b"8\001"
_INFERENCEREQUEST_INPUTSENTRY._options = None
_INFERENCEREQUEST_INPUTSENTRY._serialized_options = b"8\001"
_INFERENCERESPONSE_OUTPUTSENTRY._options = None
_INFERENCERESPONSE_OUTPUTSENTRY._serialized_options = b"8\001"
_FLOATLIST._serialized_start = 26
_FLOATLIST._serialized_end = 53
_INT64LIST._serialized_start = 55
_INT64LIST._serialized_end = 82
_BYTESLIST._serialized_start = 84
_BYTESLIST._serialized_end = 111
_STRINGLIST._serialized_start = 113
_STRINGLIST._serialized_end = 141
_STRUCT._serialized_start = 143
_STRUCT._serialized_end = 261
_STRUCT_FIELDSENTRY._serialized_start = 199
_STRUCT_FIELDSENTRY._serialized_end = 261
_VALUE._serialized_start = 264
_VALUE._serialized_end = 563
_INFERENCEREQUEST._serialized_start = 566
_INFERENCEREQUEST._serialized_end = 720
_INFERENCEREQUEST_INPUTSENTRY._serialized_start = 658
_INFERENCEREQUEST_INPUTSENTRY._serialized_end = 720
_INFERENCERESPONSE._serialized_start = 723
_INFERENCERESPONSE._serialized_end = 882
_INFERENCERESPONSE_OUTPUTSENTRY._serialized_start = 819
_INFERENCERESPONSE_OUTPUTSENTRY._serialized_end = 882
_INFERENCE._serialized_start = 885
_INFERENCE._serialized_end = 1059
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import server_pb2 as server__pb2
class InferenceStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.CompleteInference = channel.unary_unary(
"/llm_perf.Inference/CompleteInference",
request_serializer=server__pb2.InferenceRequest.SerializeToString,
response_deserializer=server__pb2.InferenceResponse.FromString,
)
self.StreamingInference = channel.unary_stream(
"/llm_perf.Inference/StreamingInference",
request_serializer=server__pb2.InferenceRequest.SerializeToString,
response_deserializer=server__pb2.InferenceResponse.FromString,
)
class InferenceServicer(object):
"""Missing associated documentation comment in .proto file."""
def CompleteInference(self, request, context):
"""1 request -> 1 response"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def StreamingInference(self, request, context):
"""1 request -> m response"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def add_InferenceServicer_to_server(servicer, server):
rpc_method_handlers = {
"CompleteInference": grpc.unary_unary_rpc_method_handler(
servicer.CompleteInference,
request_deserializer=server__pb2.InferenceRequest.FromString,
response_serializer=server__pb2.InferenceResponse.SerializeToString,
),
"StreamingInference": grpc.unary_stream_rpc_method_handler(
servicer.StreamingInference,
request_deserializer=server__pb2.InferenceRequest.FromString,
response_serializer=server__pb2.InferenceResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
"llm_perf.Inference", rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class Inference(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def CompleteInference(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/llm_perf.Inference/CompleteInference",
server__pb2.InferenceRequest.SerializeToString,
server__pb2.InferenceResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def StreamingInference(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_stream(
request,
target,
"/llm_perf.Inference/StreamingInference",
server__pb2.InferenceRequest.SerializeToString,
server__pb2.InferenceResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
import os
import torch
import torch.distributed as dist
def check_dist():
mp_size = int(os.environ.get("WORLD_SIZE", "1"))
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
buffer = torch.zeros([1], dtype=torch.int32).cuda()
if local_rank == 0:
buffer = buffer + 1
print(f"rank={local_rank}, before, {buffer}")
dist.broadcast(buffer, 0)
print(f"rank={local_rank}, after, {buffer}")
import logging
import sys
logger = logging.getLogger("llm_perf")
def setup_logger(loglevel: str):
fmt = logging.Formatter(
fmt="%(asctime)s.%(msecs)03d %(filename)s:%(lineno)d [%(levelname)s]: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(loglevel.upper())
logger.propagate = False
import gc
import os
import psutil
import torch
from llm_perf.utils.logger import logger
def check_memory_usage(tag):
# dist config
mp_size = int(os.environ.get("WORLD_SIZE", "1"))
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
# python doesn't do real-time garbage collection so do it explicitly to get the correct RAM reports
gc.collect()
vm_stats = psutil.virtual_memory()
used_GB = round((vm_stats.total - vm_stats.available) / (1024**3), 2)
dev_mem_reserved = 0
dev_mem_allocated = 0
if torch.cuda.is_available():
dev_mem_reserved = round(torch.cuda.memory_reserved() / (1024**3), 2)
dev_mem_allocated = round(torch.cuda.memory_allocated() / (1024**3), 2)
else:
pass
msg = f"<<{tag}>> CPU VM State: Used = {used_GB} GB, Percent = {vm_stats.percent}% | "\
f"DEV MEM State(Rank{local_rank}): Used = {dev_mem_allocated} GB, Reserved = {dev_mem_reserved} GB"
if local_rank == 0:
print(msg)
# logger.info(msg)
import json
import os
import shutil
import subprocess
import sys
import threading
import time
from enum import Enum
from queue import Queue
from typing import Any, Dict, List
import matplotlib.pyplot as plt
import numpy as np
import torch
from llm_perf.utils.logger import logger
# Time To First Token
__TTFT_AVG__ = "First Token Latency(AVG)"
__TTFT_P90__ = "First Token Latency(P90)"
__CONTEXT_WAIT_AVG__ = "Context Wait Time(AVG)"
__CONTEXT_WAIT_P90__ = "Context Wait Time(P90)"
__CONTEXT_MODEL_AVG__ = "Context Model Time(AVG)"
__CONTEXT_MODEL_P90__ = "Context Model Time(P90)"
# Time Per Output Token
__TPOT_AVG__ = "Per Token Latency(AVG)"
__TPOT_P90__ = "Per Token Latency(P90)"
__DECODE_WAIT_AVG__ = "Decode Wait Time(AVG)"
__DECODE_WAIT_P90__ = "Decode Wait Time(P90)"
__DECODE_MODEL_AVG__ = "Decode Model Time(AVG)"
__DECODE_MODEL_P90__ = "Decode Model Time(P90)"
class ReportType(Enum):
ACCURACY = 0
PERFORMANCE = 1
def get_cpu_name():
command = "lscpu | grep 'Model name' | awk -F: '{print $2}'"
cpu_name = subprocess.check_output(command, shell=True)
return cpu_name.decode().strip()
def calc_perplexity(input_logits: torch.FloatTensor, labels: torch.LongTensor) -> float:
# Shift so that tokens < n predict n
shift_logits = input_logits
shift_labels = labels[..., 1:].contiguous()
# Flatten the tokens
loss_fct = torch.nn.CrossEntropyLoss()
loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
perplexity = torch.exp(loss)
logger.debug(f"Loss: {loss}, PPL: {perplexity}")
return perplexity.tolist()
class Reporter:
def __init__(
self,
task: str,
backend: str,
tp_size: int,
batch_size: int,
input_tokens: int,
min_new_tokens: int,
max_new_tokens: int,
test_perf: bool,
test_accuracy: bool,
version: str="",
) -> None:
self._running: bool = False
self.cond: threading.Condition = threading.Condition()
self.accuracy_datas: List[Dict[str:Any]] = []
self.performance_datas: List[Dict[str:Any]] = []
self._is_performance: bool = False
self.logits_diff: List[Dict[str:Any]] = []
self.token_diff: List[Dict[str:Any]] = []
self.max_token_diff_num = 16
self.test_perf = test_perf
self.test_accuracy = test_accuracy
self.backend = backend
self.task: str = task
# these configs will be update
self.tp_size = tp_size
self.batch_size = batch_size
self.input_tokens = input_tokens
self.version = version
# result template
self.result: Dict[str, Any] = {
"Model": self.task,
"Backend": self.backend,
"Host Info": get_cpu_name(),
"Version": self.version,
"Execution Date": time.strftime("%Y-%m-%d %H:%M:%S"),
"Min New Tokens": min_new_tokens,
"Max New Tokens": max_new_tokens,
"Accuracy": {"PPL": [], "Token Diff": {}, "Logits Diff": {}},
"Performance": [
{
"TP Size": self.tp_size,
"Batch Size": self.batch_size,
"Input Tokens": self.input_tokens,
}
],
}
def update_meta(self, tp_size: int, batch_size: int, input_tokens: int):
# update config
self.tp_size = tp_size
self.batch_size = batch_size
self.input_tokens = input_tokens
self.start_time = time.perf_counter_ns()
self.request = 0
self.performance_datas.clear()
logger.info(
f"Update reporter meta: TP={self.tp_size}, BS={self.batch_size}, Inputs={self.input_tokens}"
)
def start(self):
self._worker = threading.Thread(target=self.worker)
self._running = True
self._worker.start()
self.start_time = time.perf_counter_ns()
self.request = 0
def stop(self):
with self.cond:
self._running = False
self.cond.notify()
self._worker.join()
def submit(self, data: Dict[str, Any], report_type: ReportType):
with self.cond:
if report_type == ReportType.ACCURACY:
self.accuracy_datas.append(data)
elif report_type == ReportType.PERFORMANCE:
self._is_performance = True
self.performance_datas.append(data)
self.request += 1
self.last_submit_time = time.perf_counter_ns()
self.cond.notify()
def worker(self):
with self.cond:
while self._running:
self.cond.wait()
if self._running:
self.calc()
def _calc_performance(self):
# Calc avg/p99/sum of data, return result
completion_tokens = 0
time_since_start = (self.last_submit_time - self.start_time) / 1e9
ttfts = []
tpots = []
context_wait_time = []
context_model_time = []
decode_wait_time = []
decode_model_time = []
for data in self.performance_datas:
completion_tokens += data["completion_tokens"]
ttfts.append(data["first_token_latency"])
tpots.append(data["per_token_latency"])
context_wait_time.append(data["context_wait_time"])
context_model_time.append(data["context_model_time"])
decode_wait_time.append(data["decode_wait_time"])
decode_model_time.append(data["decode_model_time"])
# context
cur_ttft_avg = np.mean(ttfts)
cur_ttft_p90 = np.percentile(ttfts, 90)
cur_context_wait_avg = np.mean(context_wait_time)
cur_context_wait_p90 = np.percentile(context_wait_time, 90)
cur_context_model_avg = np.mean(context_model_time)
cur_context_model_p90 = np.percentile(context_model_time, 90)
# decode
cur_tpot_avg = np.mean(tpots)
cur_tpot_p90 = np.percentile(tpots, 90)
cur_decode_wait_avg = np.mean(decode_wait_time)
cur_decode_wait_p90 = np.percentile(decode_wait_time, 90)
cur_decode_model_avg = np.mean(decode_model_time)
cur_decode_model_p90 = np.percentile(decode_model_time, 90)
performance = None
for perf in self.result["Performance"]:
if (
perf["TP Size"] == self.tp_size
and perf["Batch Size"] == self.batch_size
and perf["Input Tokens"] == self.input_tokens
):
performance = perf
if performance is None:
performance = {
"TP Size": self.tp_size,
"Batch Size": self.batch_size,
"Input Tokens": self.input_tokens,
}
self.result["Performance"].append(performance)
performance["client"] = {
__TTFT_AVG__: cur_ttft_avg,
__TTFT_P90__: cur_ttft_p90,
__TPOT_AVG__: cur_tpot_avg,
__TPOT_P90__: cur_tpot_p90,
}
performance["server"] = {
__CONTEXT_WAIT_AVG__ : cur_context_wait_avg,
__CONTEXT_WAIT_P90__ : cur_context_wait_p90,
__CONTEXT_MODEL_AVG__ : cur_context_model_avg,
__CONTEXT_MODEL_P90__ : cur_context_model_p90,
__DECODE_WAIT_AVG__ : cur_decode_wait_avg,
__DECODE_WAIT_P90__ : cur_decode_wait_p90,
__DECODE_MODEL_AVG__ : cur_decode_model_avg,
__DECODE_MODEL_P90__ : cur_decode_model_p90,
}
logger.debug(
f"TTFT(AVG)={cur_ttft_avg}, TTFT(P90)={cur_ttft_p90}, TPOT(AVG)={cur_tpot_avg}, TPOT(P90)={cur_tpot_p90}"
)
performance["Token Throughput"] = completion_tokens / time_since_start
performance["Request Number"] = self.request
performance["QPS"] = self.request / time_since_start
logger.info(
f"Request Number={performance['Request Number']}, Token Throughput={performance['Token Throughput']}, QPS={performance['QPS']}"
)
def _calc_accuracy(self):
accuracy = self.result["Accuracy"]
perplexity_list = []
dump_files = []
for i, data in enumerate(self.accuracy_datas):
perplexity_list.append(data["perplexity"])
if data["logits_dump"] != "":
dump_files.append(data["logits_dump"])
# 1. PPL
accuracy["PPL"] = [
sum(prompt_ppl) / len(prompt_ppl) for prompt_ppl in perplexity_list
]
logger.debug(f"PPL={accuracy['PPL']}")
# Diff Prepare
diff_index = -1
# prepare backend's logits numpy data file
for i in range(len(dump_files)):
diff_index += 1
dump_file = dump_files[i]
# If not exists, may already move to reports dir
if not os.path.exists(dump_file):
continue
logits_dump_path = (
"llm_perf/reports/" + self.backend + "/" + self.task + "/logits"
)
os.makedirs(logits_dump_path, exist_ok=True)
shutil.move(dump_file, f"{logits_dump_path}/{i}.npy")
logger.info(f"move {dump_file} to {logits_dump_path}/{i}.npy")
# 2. Logits Diff: First token diff
def calc_logits_diff(diff_index: int):
# 2.1 Get base logits
base_file = f"llm_perf/reports/base/{self.task}/logits/{diff_index}.npy"
base_logits = np.load(base_file).astype(np.float32)
# 2.2 Get Backend logits
backend_file = (
f"llm_perf/reports/{self.backend}/{self.task}/logits/{diff_index}.npy"
)
backend_logits = np.load(backend_file).astype(np.float32)
# check shape
if base_logits.shape != backend_logits.shape:
logger.warn(
f"base and {self.backend} logits shape mismatch! Make sure generate config is the same. \nGPU: {base_logits.shape}, {self.backend}: {backend_logits.shape}"
)
# Only care about first token
base_logits = base_logits[:, 0:1, :]
backend_logits = backend_logits[:, 0:1, :]
# 2.3 Calc Diff
diff = base_logits - backend_logits
max_difference = np.max(np.abs(diff))
mse = np.mean(np.square(diff))
mae = np.mean(np.abs(diff))
cos_similarity = np.dot(base_logits.flatten(), backend_logits.flatten()) / (
np.linalg.norm(base_logits.flatten())
* np.linalg.norm(backend_logits.flatten())
)
logger.info(
f"Logits Diff: Prompt Index={diff_index}, Max Difference={max_difference}, Mean Squared Error={mse}, Mean Absolute Error={mae}, Cosine Similarity={cos_similarity}"
)
last_logits_diff: Dict[str, Any] = {}
last_logits_diff["Max Difference"] = max_difference
last_logits_diff["Mean Squared Error"] = mse
last_logits_diff["Mean Absolute Error"] = mae
last_logits_diff["Cosine Similarity"] = cos_similarity
last_logits_diff["Diff Data"] = diff.flatten()
return last_logits_diff
if diff_index >= 0:
_diff = calc_logits_diff(diff_index)
self.logits_diff.append(_diff)
result_logits_diff = accuracy["Logits Diff"]
if len(self.logits_diff) != 0:
result_logits_diff["Max Difference"] = np.max(
[l["Max Difference"] for l in self.logits_diff]
).tolist()
result_logits_diff["Mean Squared Error"] = np.mean(
[l["Mean Squared Error"] for l in self.logits_diff]
).tolist()
result_logits_diff["Mean Absolute Error"] = np.mean(
[l["Mean Absolute Error"] for l in self.logits_diff]
).tolist()
result_logits_diff["Cosine Similarity"] = np.mean(
[l["Cosine Similarity"] for l in self.logits_diff]
).tolist()
# 3. Token Diff
def calc_token_diff(diff_index: int):
# 2.1 Get GPU base logits
base_file = f"llm_perf/reports/base/{self.task}/logits/{diff_index}.npy"
base_logits = np.load(base_file).astype(np.float32)
# 2.2 Get Backend logits
backend_file = (
f"llm_perf/reports/{self.backend}/{self.task}/logits/{diff_index}.npy"
)
backend_logits = np.load(backend_file).astype(np.float32)
# check shape
if base_logits.shape != backend_logits.shape:
logger.warn(
f"GPU and {self.backend} logits shape mismatch! Make sure generate config is the same. \nGPU: {base_logits.shape}, {self.backend}: {backend_logits.shape}"
)
return -1
# Only care about max prob token (greedy search)
base_logits = np.amax(base_logits, axis=2, keepdims=True)
backend_logits = np.amax(backend_logits, axis=2, keepdims=True)
# 2.3 Calc Diff
diff = np.abs(base_logits - backend_logits)
max_difference = np.max(diff)
logger.info(
f"Token Diff: Prompt Index={diff_index}, Max Difference={max_difference}"
)
last_token_diff: Dict[str, Any] = {}
last_token_diff["Logits Index"] = diff_index
last_token_diff["Max Difference"] = max_difference
last_token_diff["Diff Data"] = diff.flatten()
return last_token_diff
if diff_index >= 0 and len(self.token_diff) < self.max_token_diff_num:
_diff = calc_token_diff(diff_index)
if _diff == -1:
pass
else:
self.token_diff.append(_diff)
result_token_diff = accuracy["Token Diff"]
if len(self.token_diff) != 0:
result_token_diff["Max Difference"] = np.max(
[l["Max Difference"] for l in self.token_diff]
).tolist()
result_token_diff["Prompt Num"] = len(self.token_diff)
def calc(self):
if self.test_accuracy and self.accuracy_datas and not self._is_performance:
self._calc_accuracy()
elif self.test_perf and self.performance_datas and self._is_performance:
self._calc_performance()
def summary(self):
logger.info(f"summary...{self.result}")
output_report_path = f"llm_perf/reports/{self.backend}/{self.task}"
os.makedirs(output_report_path, exist_ok=True)
if self.test_accuracy:
# Save accuracy logits diff plt result
logits_diff_png_path = f"{output_report_path}/logits_diff.png"
logits_diff = np.concatenate(
[l["Diff Data"] for l in self.logits_diff], axis=0
)
plt.hist(logits_diff, bins=150, alpha=0.75)
plt.title("Logits Difference")
plt.xlabel("Difference")
plt.ylabel("Frequency")
plt.grid(True)
plt.savefig(logits_diff_png_path, dpi=300)
self.result["Accuracy"]["Logits Diff"]["Png"] = logits_diff_png_path
plt.clf()
plt.cla()
# Save token diff plt result
token_diff_png_path = f"{output_report_path}/token_diff.png"
plt.figure(figsize=(10, 6))
for diff in self.token_diff:
plt.plot(diff["Diff Data"], label=f"Prompt {diff['Logits Index']}")
plt.legend()
plt.title("Token Difference")
plt.xlabel("Token")
plt.ylabel("Difference")
plt.grid(True)
plt.savefig(token_diff_png_path, dpi=300)
self.result["Accuracy"]["Token Diff"]["Png"] = logits_diff_png_path
if not self.test_perf:
self.result.pop("Min New Tokens", None)
self.result.pop("Max New Tokens", None)
self.result.pop("Performance", None)
# Save Result
with open(f"{output_report_path}/result.json", "w") as file:
json.dump(self.result, file, indent=4)
logger.info(f"Summary result to {output_report_path}/result.json")
{
"model": "chatglm2-6b",
"test_accuracy": true,
"min_tp_size": 1,
"accuracy_config": {
"dataset": "llm_perf/datasets/merged_52_test.csv",
"min_new_tokens": 1,
"max_new_tokens": 512
},
"test_perf": true,
"perf_config": {
"tp_sizes": [1, 2, 4, 8],
"batch_sizes": [1, 4, 8, 16, 24, 32],
"input_tokens": [1024, 2048],
"output_tokens": 200,
"perf_time": 100
}
}
\ No newline at end of file
{
"model": "falcon-180b",
"test_accuracy": false,
"min_tp_size": 8,
"accuracy_config": {
"dataset": "llm_perf/datasets/merged_52_test.csv",
"min_new_tokens": 1,
"max_new_tokens": 512
},
"test_perf": true,
"perf_config": {
"tp_sizes": [1, 2, 4, 8],
"batch_sizes": [1, 4, 8, 16, 24, 32],
"input_tokens": [1024, 2048],
"output_tokens": 200,
"perf_time": 100
}
}
\ No newline at end of file
{
"model": "llama3-70b",
"test_accuracy": true,
"min_tp_size": 8,
"accuracy_config": {
"dataset": "llm_perf/datasets/merged_52_test.csv",
"min_new_tokens": 1,
"max_new_tokens": 512
},
"test_perf": true,
"perf_config": {
"tp_sizes": [1, 2, 4, 8],
"batch_sizes": [1, 4, 8, 16, 24, 32],
"input_tokens": [1024, 2048],
"output_tokens": 200,
"perf_time": 100
}
}
\ No newline at end of file
{
"model": "mixtral-8x22b",
"test_accuracy": false,
"min_tp_size": 8,
"accuracy_config": {
"dataset": "llm_perf/datasets/merged_52_test.csv",
"min_new_tokens": 1,
"max_new_tokens": 512
},
"test_perf": true,
"perf_config": {
"tp_sizes": [1, 2, 4, 8],
"batch_sizes": [1, 4, 8, 16, 24, 32],
"input_tokens": [1024, 2048],
"output_tokens": 200,
"perf_time": 100
}
}
\ No newline at end of file
# ByteMicroPerf
## Introduction
ByteMicroPerf is a part of ByteMLPerf, which is mainly used to evaluate the performance of frequent computation and communication operators in mainstream deep learning models on new emerging heterogeneous hardwares. The main characteristics are as follows:
- Easy and quick access for diverse heterogeneous hardware
- Evaluation process fitting realistic business scenarios
- Coverage of frequent operators across multiple categories
## Quickstart
### Prepare running environment
```
git clone https://github.com/bytedance/ByteMLPerf.git
cd ByteMLPerf/byte_micro_perf
```
### Prepare hardware configuration(optional)
Please follow the given style at `ByteMLPerf/vendor_zoo` directory to create a new hardware config file for your own heterogeneous hardware. Because this helps the framework evaluate operator performance on new hardware more precisely.
### An example
```
python3 launch.py --task exp --hardware_type GPU
```
#### Usage
```
--task: operator name please create a workload file for new operators by following the existing style in byte_micro_perf/workloads.
--hardware_type: hardware category name please derive a Backend class for your heterogeneous hardware in byte_micro_perf/backends.
```
### Expected Output
For different types of operators (Compute-bound / Memory-bound), we adopt various metrics to comprehensively evaluate the performance of the operator. Regarding the various metrics, the explanations are as follows:
| Metric | Description |
| -------- | ------- |
| Memory Size(MB) | the rough sum of read/write bytes |
| Kernel bandwidth(GB/s) | the achieved bandwidth under given input size of this kernel |
| Bandwidth Utilization(%) | the ratio of achieved bandwidth and theoretical bandwidth |
| Avg latency(us) |the average of kernel latencies|
Example:
```
{
"Operator": "EXP",
"Backend": "GPU",
"Host Info": "Intel(R) Xeon(R) Platinum 8336C CPU @ 2.30GHz",
"Device Info": "NVIDIA A800-SXM4-80GB",
"Performance": [
{
"Dtype": "float32",
"Tensor Shapes": [
[
256,
8192
]
],
"Read IO Size(MB)": 8.0,
"Write IO Size(MB)": 8.0,
"Memory Size(MB)": 16.0,
"Kernel bandwidth(GB/s)": 1790.52,
"Bandwidth Utilization(%)": 87.81,
"Avg latency(us)": 9.37,
"QPS": 27321.24
}
]
}
```
## Trouble Shooting
For more details, you can visit our offical website here: [bytemlperf.ai](https://bytemlperf.ai/). Please let us know if you need any help or have additional questions and issues!
# Copyright 2023 ByteDance and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
import logging
from datetime import timedelta
from typing import Any, Dict, List
import torch
import torch.distributed as dist
import torch.distributed.distributed_c10d as dist_c10d
from backends import module_store
from backends.backend import Backend
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("PerfEngine")
class BackendGPU(Backend):
def __init__(self, workload_dict, vendor_path):
super().__init__(workload_dict, vendor_path)
def get_device_name(self):
return torch.cuda.get_device_name(0)
def get_torch_device_name(self):
return "cuda"
def get_device_properties(self):
return torch.cuda.get_device_properties(0)
def get_device_count(self):
return torch.cuda.device_count()
def set_device(self, device_index : int):
torch.cuda.set_device(device_index)
def get_device(self):
return torch.cuda.current_device()
def device_synchronize(self):
torch.cuda.synchronize()
def empty_cache(self):
torch.cuda.empty_cache()
def get_dist_module(self):
return dist
def initialize_ccl(self, rank, world_size):
# check device_count
device_count = self.get_device_count()
if world_size > device_count:
world_size = device_count
if rank >= world_size:
return False
self.set_device(rank)
# set envs and internal vars
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "49373"
os.environ["LOCAL_RANK"] = str(rank)
os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)
# init process group
self.get_dist_module().init_process_group(
backend="nccl",
world_size=world_size,
rank=rank,
timeout=timedelta(seconds=1800)
)
return True
-i https://download.pytorch.org/whl/cu121
torch==2.3.1
# Copyright 2023 ByteDance and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import json
import math
import random
import logging
import traceback
from abc import ABC, abstractmethod
from datetime import timedelta
from typing import Any, Dict, List, final
import torch
import torch.distributed as dist
import torch.distributed.distributed_c10d as dist_c10d
from backends import module_store
from backends.utils import dump_communication_ops_report, dump_computation_ops_report
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("PerfEngine")
default_op_registry = module_store.op_registry.copy()
default_op_compute_size_registry = module_store.op_compute_size_funcs.copy()
default_op_create_tensors_registry = module_store.op_create_tensors_funcs.copy()
class Backend(ABC):
def __init__(self, workload_dict: Dict[str, Any], vendor_path: str):
self.op_name = workload_dict["operator"]
self.iterations = workload_dict["iterations"]
self.op = None
# communication params
self.world_size = None
self.rank = None
# hardware info
self.device_name = self.get_device_name()
self.memory_limit = int(
self.get_device_properties().total_memory / (1024**3)
)
if vendor_path is not None and os.path.exists(vendor_path) and (vendor_path).endswith(".json"):
with open(vendor_path, "r") as f:
self.hw_info_dict = json.load(f)
# if the vendor path does not exist, please set this param manaually
self.bandwidth_limit = self.hw_info_dict["内存参数"]["内存"]["内存带宽(GB/s)"]
"""
op
"""
def get_op_instance(self):
if self.op_name in default_op_registry:
self.op = default_op_registry[self.op_name]
else:
raise NotImplementedError
def get_op_compute_size_func(self):
if self.op_name in default_op_compute_size_registry:
return default_op_compute_size_registry[self.op_name]
else:
raise NotImplementedError
def get_op_create_tensors_func(self):
if self.op_name in default_op_create_tensors_registry:
return default_op_create_tensors_registry[self.op_name]
else:
raise NotImplementedError
"""
device management related
"""
# torch.get_device_name()
def get_device_name(self):
raise NotImplementedError
# "cuda"
def get_torch_device_name(self):
raise NotImplementedError
def get_device_properties(self):
raise NotImplementedError
def get_device_count(self):
raise NotImplementedError
def set_device(self, device_index : int):
raise NotImplementedError
def get_device(self):
raise NotImplementedError
def device_synchronize(self):
raise NotImplementedError
def empty_cache(self):
raise NotImplementedError
"""
ccl related
"""
def get_dist_module(self):
raise NotImplementedError
def initialize_ccl(self, rank, world_size):
raise NotImplementedError
def destroy_process_group(self):
dist = self.get_dist_module()
if dist.is_initialized():
dist.destroy_process_group()
def barrier(self):
dist = self.get_dist_module()
if dist.is_initialized():
dist.barrier()
def _run_operation(self, operation, inputs):
result = operation(*inputs)
return result
def build_tensor(self, input_shapes, torch_dtype):
# get funcs
compute_size_func = self.get_op_compute_size_func()
create_tensors_func = self.get_op_create_tensors_func()
_, tensor_size, _, _ = compute_size_func(input_shapes, torch_dtype)
# avoid use cache, assume cache size is 1 GiB, and use 80% of device memory
assume_cache_size = 1 * 1024**3
assume_avail_bytes = self.memory_limit * 0.9 * 1024**3
if self.op_name in ["allreduce", "allgather", "reducescatter", "alltoall", "broadcast", "p2p", "device2host", "host2device"]:
if tensor_size > assume_avail_bytes:
return []
else:
max_data_cnt = 1
else:
if tensor_size > assume_avail_bytes:
return [], 0, tensor_size
elif 2 * tensor_size > assume_avail_bytes:
max_data_cnt = 1
elif tensor_size > assume_cache_size:
max_data_cnt = 2
else:
max_data_cnt = min(math.floor(assume_avail_bytes / tensor_size), self.iterations)
# create tensor_list for each op
tensor_list = [
create_tensors_func(input_shapes, torch_dtype, self.get_torch_device_name()) for _ in range(max_data_cnt)
]
return tensor_list
def perf(self, input_shapes: List[List[int]], dtype):
error = ""
# create necessary tensors
torch_dtype = getattr(torch, dtype)
tensor_list = self.build_tensor(input_shapes, torch_dtype)
if len(tensor_list) > 0:
try:
warm_iterations = 5
test_iterations = 5
max_total_duration = 10.
prefer_iterations = self.iterations
# warmup
self.device_synchronize()
self.barrier()
for _ in range(warm_iterations):
self._run_operation(self.op, random.choice(tensor_list))
# test perf
self.device_synchronize()
self.barrier()
start_time = time.perf_counter_ns()
for i in range(test_iterations):
self._run_operation(self.op, random.choice(tensor_list))
self.device_synchronize()
self.barrier()
end_time = time.perf_counter_ns()
avg_op_duration = (end_time - start_time) / 1e9 / test_iterations
if avg_op_duration > max_total_duration:
prefer_iterations = 2
else:
prefer_iterations = min(math.ceil(max_total_duration / avg_op_duration), self.iterations)
# perf
self.device_synchronize()
self.barrier()
start_time = time.perf_counter_ns()
for i in range(prefer_iterations):
self._run_operation(self.op, tensor_list[i % len(tensor_list)])
self.device_synchronize()
self.barrier()
end_time = time.perf_counter_ns()
# time in us
total_exec_time = (end_time - start_time) / 1e3
latency = round(total_exec_time / prefer_iterations, 2)
except Exception as e:
traceback.print_exc()
latency = 0
error = "RUN_OP_ERROR"
else:
latency = 0
error = "OOM"
# clean tensors and device memory
del tensor_list
self.empty_cache()
# create report for communication ops and computation ops
if self.op_name in [
"allreduce", "allgather", "reducescatter", "alltoall", "broadcast", "p2p",
"device2host", "host2device"
]:
report = dump_communication_ops_report(
self.op_name, torch_dtype, input_shapes,
self.get_op_compute_size_func(),
self.world_size,
None,
latency,
error
)
else:
report = dump_computation_ops_report(
self.op_name, torch_dtype, input_shapes,
self.get_op_compute_size_func(),
None,
latency,
error
)
return report
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment