Unverified Commit 2e6e62e1 authored by Lianmin Zheng's avatar Lianmin Zheng Committed by GitHub
Browse files

Increase the number of thread limitation for tp worker managers. (#567)

parent a385ee27
...@@ -250,9 +250,14 @@ def main(args: argparse.Namespace): ...@@ -250,9 +250,14 @@ def main(args: argparse.Namespace):
np.random.seed(args.seed) np.random.seed(args.seed)
api_url = f"http://{args.host}:{args.port}/generate" api_url = f"http://{args.host}:{args.port}/generate"
tokenizer = AutoTokenizer.from_pretrained( if args.tokenizer.endswith(".json") or args.tokenizer.endswith(".model"):
args.tokenizer, trust_remote_code=args.trust_remote_code from sglang.srt.hf_transformers_utils import get_tokenizer
)
tokenizer = get_tokenizer(args.tokenizer)
else:
tokenizer = AutoTokenizer.from_pretrained(
args.tokenizer, trust_remote_code=args.trust_remote_code
)
if args.dataset: if args.dataset:
input_requests = sample_requests(args.dataset, args.num_prompts, tokenizer) input_requests = sample_requests(args.dataset, args.num_prompts, tokenizer)
...@@ -272,7 +277,7 @@ def main(args: argparse.Namespace): ...@@ -272,7 +277,7 @@ def main(args: argparse.Namespace):
for i in range(args.num_prompts): for i in range(args.num_prompts):
prompt = tokenizer.decode( prompt = tokenizer.decode(
[ [
(offsets[i] + i + j) % tokenizer.vocab_size (offsets[i] + i + j) % (tokenizer.vocab_size - 129) + 128
for j in range(input_lens[i]) for j in range(input_lens[i])
] ]
) )
......
...@@ -8,6 +8,7 @@ if __name__ == "__main__": ...@@ -8,6 +8,7 @@ if __name__ == "__main__":
parser.add_argument("--host", type=str, default="http://127.0.0.1") parser.add_argument("--host", type=str, default="http://127.0.0.1")
parser.add_argument("--port", type=int, default=None) parser.add_argument("--port", type=int, default=None)
parser.add_argument("--backend", type=str, default="srt") parser.add_argument("--backend", type=str, default="srt")
parser.add_argument("--batch-size", type=int, default=1)
parser.add_argument("--max-tokens", type=int, default=256) parser.add_argument("--max-tokens", type=int, default=256)
args = parser.parse_args() args = parser.parse_args()
...@@ -33,7 +34,7 @@ if __name__ == "__main__": ...@@ -33,7 +34,7 @@ if __name__ == "__main__":
response = requests.post( response = requests.post(
url + "/generate", url + "/generate",
json={ json={
"text": prompt, "text": [prompt] * args.batch_size,
"sampling_params": { "sampling_params": {
"temperature": 0, "temperature": 0,
"max_new_tokens": max_new_tokens, "max_new_tokens": max_new_tokens,
...@@ -90,5 +91,5 @@ if __name__ == "__main__": ...@@ -90,5 +91,5 @@ if __name__ == "__main__":
ret = response.json() ret = response.json()
print(ret) print(ret)
speed = max_new_tokens / latency speed = args.batch_size * max_new_tokens / latency
print(f"latency: {latency:.2f} s, speed: {speed:.2f} token/s") print(f"latency: {latency:.2f} s, speed: {speed:.2f} token/s")
...@@ -48,24 +48,45 @@ def gen_prompt(train_df, subject, k=-1): ...@@ -48,24 +48,45 @@ def gen_prompt(train_df, subject, k=-1):
return prompt return prompt
def evaluate(args, subject, dev_df, test_df): def main(args):
prompts = [] subjects = sorted(
[
f.split("_test.csv")[0]
for f in os.listdir(os.path.join(args.data_dir, "test"))
if "_test.csv" in f
]
)
# Build prompts
arguments = []
labels = [] labels = []
num_questions = []
for subject in subjects[: args.nsub]:
dev_df = pd.read_csv(
os.path.join(args.data_dir, "dev", subject + "_dev.csv"), header=None
)[: args.ntrain]
test_df = pd.read_csv(
os.path.join(args.data_dir, "test", subject + "_test.csv"), header=None
)
num_questions.append(test_df.shape[0])
k = args.ntrain k = args.ntrain
few_shot_examples = gen_prompt(dev_df, subject, k)
while len(tokenizer.encode(few_shot_examples)) > 1536:
k -= 1
few_shot_examples = gen_prompt(dev_df, subject, k) few_shot_examples = gen_prompt(dev_df, subject, k)
while len(tokenizer.encode(few_shot_examples)) > 1536:
k -= 1
few_shot_examples = gen_prompt(dev_df, subject, k)
for i in range(test_df.shape[0]): for i in range(test_df.shape[0]):
prompt_end = format_example(test_df, i, include_answer=False) prompt_end = format_example(test_df, i, include_answer=False)
prompts.append(prompt_end)
label = test_df.iloc[i, test_df.shape[1] - 1] arguments.append({
labels.append(label) "examples": few_shot_examples,
"question": prompt_end,
})
arguments = [{"question": p} for p in prompts] label = test_df.iloc[i, test_df.shape[1] - 1]
labels.append(label)
##################################### #####################################
######### SGL Program Begin ######### ######### SGL Program Begin #########
...@@ -93,62 +114,33 @@ def evaluate(args, subject, dev_df, test_df): ...@@ -93,62 +114,33 @@ def evaluate(args, subject, dev_df, test_df):
# Select backend # Select backend
backend = select_sglang_backend(args) backend = select_sglang_backend(args)
# Run
tic = time.time() tic = time.time()
states = few_shot_mmlu.bind(examples=few_shot_examples).run_batch( states = few_shot_mmlu.run_batch(
arguments, arguments,
temperature=0, temperature=0,
max_new_tokens=1, max_new_tokens=1,
backend=backend, backend=backend,
num_threads=args.parallel, num_threads=args.parallel,
progress_bar=True,
) )
preds = [ preds = [
s["answer"].strip()[0] if len(s["answer"].strip()) > 0 else "" for s in states s["answer"].strip()[0] if len(s["answer"].strip()) > 0 else "" for s in states
] ]
latency = time.time() - tic latency = time.time() - tic
# Compute accuracy
cors = [pred == label for pred, label in zip(preds, labels)] cors = [pred == label for pred, label in zip(preds, labels)]
acc = np.mean(cors)
cors = np.array(cors)
print(
"Average accuracy {:.3f}, latency {:.2f}, #q: {} - {}".format(
acc, latency, len(prompts), subject
)
)
return cors, acc, latency
def main(args):
subjects = sorted(
[
f.split("_test.csv")[0]
for f in os.listdir(os.path.join(args.data_dir, "test"))
if "_test.csv" in f
]
)
all_cors = []
all_latencies = []
num_requests = 0
for subject in tqdm(subjects[: args.nsub]):
dev_df = pd.read_csv(
os.path.join(args.data_dir, "dev", subject + "_dev.csv"), header=None
)[: args.ntrain]
test_df = pd.read_csv(
os.path.join(args.data_dir, "test", subject + "_test.csv"), header=None
)
cors, acc, latency = evaluate(args, subject, dev_df, test_df)
all_cors.append(cors)
all_latencies.append(latency)
num_requests += len(test_df)
total_latency = np.sum(all_latencies) pt = 0
print("Total latency: {:.3f}".format(total_latency)) for subject, num_qs in zip(subjects[: args.nsub], num_questions):
print(f"subject: {subject}, #q:{num_qs}, acc: {np.mean(cors[pt: pt + num_qs]):.3f}")
pt += num_qs
assert pt == len(cors)
weighted_acc = np.mean(cors)
weighted_acc = np.mean(np.concatenate(all_cors)) # Print results
print("Total latency: {:.3f}".format(latency))
print("Average accuracy: {:.3f}".format(weighted_acc)) print("Average accuracy: {:.3f}".format(weighted_acc))
# Write results # Write results
...@@ -157,9 +149,9 @@ def main(args): ...@@ -157,9 +149,9 @@ def main(args):
"task": "mmlu", "task": "mmlu",
"backend": args.backend, "backend": args.backend,
"num_gpus": 1, "num_gpus": 1,
"latency": round(total_latency, 3), "latency": round(latency, 3),
"accuracy": round(weighted_acc, 3), "accuracy": round(weighted_acc, 3),
"num_requests": num_requests, "num_requests": len(arguments),
"other": { "other": {
"nsub": args.nsub, "nsub": args.nsub,
"parallel": args.parallel, "parallel": args.parallel,
......
import transformers import argparse
import code import code
#name = "meta-llama/Llama-2-7b-chat-hf" from sglang.srt.hf_transformers_utils import get_tokenizer
name = "meta-llama/Meta-Llama-3-8B-Instruct"
t = transformers.AutoTokenizer.from_pretrained(name)
code.interact(local=locals()) if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, default="meta-llama/Meta-Llama-3-8B-Instruct")
args = parser.parse_args()
t = get_tokenizer(args.name)
code.interact(local=locals())
\ No newline at end of file
...@@ -8,7 +8,8 @@ class FSMCache(BaseCache): ...@@ -8,7 +8,8 @@ class FSMCache(BaseCache):
def __init__(self, tokenizer_path, tokenizer_args_dict, enable=True): def __init__(self, tokenizer_path, tokenizer_args_dict, enable=True):
super().__init__(enable=enable) super().__init__(enable=enable)
if tokenizer_path.endswith(".json"): if tokenizer_path.endswith(".json") or tokenizer_path.endswith(".model"):
# Do not support TiktokenTokenizer or SentencePieceTokenizer
return return
from importlib.metadata import version from importlib.metadata import version
......
...@@ -88,6 +88,9 @@ def get_tokenizer( ...@@ -88,6 +88,9 @@ def get_tokenizer(
if tokenizer_name.endswith(".json"): if tokenizer_name.endswith(".json"):
return TiktokenTokenizer(tokenizer_name) return TiktokenTokenizer(tokenizer_name)
if tokenizer_name.endswith(".model"):
return SentencePieceTokenizer(tokenizer_name)
"""Gets a tokenizer for the given model name via Huggingface.""" """Gets a tokenizer for the given model name via Huggingface."""
if is_multimodal_model(tokenizer_name): if is_multimodal_model(tokenizer_name):
processor = get_processor( processor = get_processor(
...@@ -179,6 +182,7 @@ def get_processor( ...@@ -179,6 +182,7 @@ def get_processor(
class TiktokenTokenizer: class TiktokenTokenizer:
def __init__(self, tokenizer_path): def __init__(self, tokenizer_path):
import tiktoken import tiktoken
from jinja2 import Template
PAT_STR_B = r"""(?i:'s|'t|'re|'ve|'m|'ll|'d)|[^\r\n\p{L}\p{N}]?\p{L}+|\p{N}| ?[^\s\p{L}\p{N}]+[\r\n]*|\s*[\r\n]+|\s+(?!\S)|\s+""" PAT_STR_B = r"""(?i:'s|'t|'re|'ve|'m|'ll|'d)|[^\r\n\p{L}\p{N}]?\p{L}+|\p{N}| ?[^\s\p{L}\p{N}]+[\r\n]*|\s*[\r\n]+|\s+(?!\S)|\s+"""
...@@ -216,6 +220,7 @@ class TiktokenTokenizer: ...@@ -216,6 +220,7 @@ class TiktokenTokenizer:
tokenizer = tiktoken.Encoding(**kwargs) tokenizer = tiktoken.Encoding(**kwargs)
tokenizer._default_allowed_special = default_allowed_special or set() tokenizer._default_allowed_special = default_allowed_special or set()
tokenizer._default_allowed_special |= {"<|separator|>"}
def encode_patched( def encode_patched(
self, self,
...@@ -241,6 +246,9 @@ class TiktokenTokenizer: ...@@ -241,6 +246,9 @@ class TiktokenTokenizer:
self.tokenizer = tokenizer self.tokenizer = tokenizer
self.eos_token_id = tokenizer._special_tokens["<|eos|>"] self.eos_token_id = tokenizer._special_tokens["<|eos|>"]
self.vocab_size = tokenizer.n_vocab self.vocab_size = tokenizer.n_vocab
self.chat_template = Template(
"{% for message in messages %}{% if message['role'] == 'user' %}{{ 'Human: ' + message['content'].strip() + '<|separator|>\n\n' }}{% elif message['role'] == 'system' %}{{ 'System: ' + message['content'].strip() + '<|separator|>\n\n' }}{% elif message['role'] == 'assistant' %}{{ 'Assistant: ' + message['content'] + '<|separator|>\n\n' }}{% endif %}{% endfor %}{% if add_generation_prompt %}{{ 'Assistant:' }}{% endif %}"
)
def encode(self, x, add_special_tokens=False): def encode(self, x, add_special_tokens=False):
return self.tokenizer.encode(x) return self.tokenizer.encode(x)
...@@ -255,7 +263,39 @@ class TiktokenTokenizer: ...@@ -255,7 +263,39 @@ class TiktokenTokenizer:
batch = [[x] for x in batch] batch = [[x] for x in batch]
return self.tokenizer.decode_batch(batch) return self.tokenizer.decode_batch(batch)
def convert_ids_to_tokens(self, index): def apply_chat_template(self, messages, tokenize, add_generation_prompt):
return self.tokenizer.decode_single_token_bytes(index).decode( ret = self.chat_template.render(messages=messages, add_generation_prompt=add_generation_prompt)
"utf-8", errors="ignore" return self.encode(ret) if tokenize else ret
class SentencePieceTokenizer:
def __init__(self, tokenizer_path):
import sentencepiece as spm
from jinja2 import Template
tokenizer = spm.SentencePieceProcessor(model_file=tokenizer_path)
# Convert to HF interface
self.tokenizer = tokenizer
self.eos_token_id = tokenizer.eos_id()
self.vocab_size = tokenizer.vocab_size()
self.chat_template = Template(
"{% for message in messages %}{% if message['role'] == 'user' %}{{ 'Human: ' + message['content'].strip() + '<|separator|>\n\n' }}{% elif message['role'] == 'system' %}{{ 'System: ' + message['content'].strip() + '<|separator|>\n\n' }}{% elif message['role'] == 'assistant' %}{{ 'Assistant: ' + message['content'] + '<|separator|>\n\n' }}{% endif %}{% endfor %}{% if add_generation_prompt %}{{ 'Assistant:' }}{% endif %}"
) )
def encode(self, x, add_special_tokens=False):
return self.tokenizer.encode(x)
def decode(self, x):
return self.tokenizer.decode(x)
def batch_decode(
self, batch, skip_special_tokens=True, spaces_between_special_tokens=False
):
if isinstance(batch[0], int):
batch = [[x] for x in batch]
return self.tokenizer.decode(batch)
def apply_chat_template(self, messages, tokenize, add_generation_prompt):
ret = self.chat_template.render(messages=messages, add_generation_prompt=add_generation_prompt)
return self.encode(ret) if tokenize else ret
\ No newline at end of file
...@@ -317,19 +317,38 @@ def get_default_config( ...@@ -317,19 +317,38 @@ def get_default_config(
topk: int, topk: int,
dtype: Optional[str], dtype: Optional[str],
) -> Dict[str, int]: ) -> Dict[str, int]:
config = { if dtype == "float8":
'BLOCK_SIZE_M': 64,
'BLOCK_SIZE_N': 64,
'BLOCK_SIZE_K': 32,
'GROUP_SIZE_M': 8
}
if M <= E:
config = { config = {
'BLOCK_SIZE_M': 16, 'BLOCK_SIZE_M': 128,
'BLOCK_SIZE_N': 32, 'BLOCK_SIZE_N': 256,
'BLOCK_SIZE_K': 64, 'BLOCK_SIZE_K': 128,
'GROUP_SIZE_M': 1 'GROUP_SIZE_M': 32,
"num_warps": 8,
"num_stages": 4
} }
if M <= E:
config = {
'BLOCK_SIZE_M': 64,
'BLOCK_SIZE_N': 128,
'BLOCK_SIZE_K': 128,
'GROUP_SIZE_M': 1,
"num_warps": 4,
"num_stages": 4
}
else:
config = {
'BLOCK_SIZE_M': 64,
'BLOCK_SIZE_N': 64,
'BLOCK_SIZE_K': 32,
'GROUP_SIZE_M': 8
}
if M <= E:
config = {
'BLOCK_SIZE_M': 16,
'BLOCK_SIZE_N': 32,
'BLOCK_SIZE_K': 64,
'GROUP_SIZE_M': 1
}
return config return config
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
import asyncio import asyncio
import logging import logging
import time from concurrent.futures import ThreadPoolExecutor
import uvloop import uvloop
import zmq import zmq
...@@ -91,6 +91,7 @@ def start_controller_process( ...@@ -91,6 +91,7 @@ def start_controller_process(
pipe_writer.send("init ok") pipe_writer.send("init ok")
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=256))
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
loop.create_task(controller.loop_for_recv_requests()) loop.create_task(controller.loop_for_recv_requests())
try: try:
...@@ -98,4 +99,4 @@ def start_controller_process( ...@@ -98,4 +99,4 @@ def start_controller_process(
except Exception: except Exception:
logger.error("Exception in ControllerSingle:\n" + get_exception_traceback()) logger.error("Exception in ControllerSingle:\n" + get_exception_traceback())
finally: finally:
kill_parent_process() kill_parent_process()
\ No newline at end of file
...@@ -100,7 +100,7 @@ class ModelTpServer: ...@@ -100,7 +100,7 @@ class ModelTpServer:
self.max_prefill_tokens = ( self.max_prefill_tokens = (
max( max(
self.model_config.context_len, self.model_config.context_len,
min(self.max_total_num_tokens // 6, 65536), min(self.max_total_num_tokens // 6, 32768),
) )
if server_args.max_prefill_tokens is None if server_args.max_prefill_tokens is None
else server_args.max_prefill_tokens else server_args.max_prefill_tokens
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment