import random import threading import time import unittest from types import SimpleNamespace import requests import sglang as sgl from sglang.srt.hf_transformers_utils import get_tokenizer from sglang.srt.utils import kill_process_tree from sglang.test.few_shot_gsm8k import run_eval from sglang.test.test_utils import ( DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST, DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST, DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH, DEFAULT_URL_FOR_TEST, popen_launch_server, ) class TestEAGLEEngine(unittest.TestCase): def test_eagle_accuracy(self): prompt1 = "Today is a sunny day and I like" sampling_params1 = {"temperature": 0, "max_new_tokens": 8} # Get the reference output ref_engine = sgl.Engine(model_path=DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST) ref_output = ref_engine.generate(prompt1, sampling_params1)["text"] ref_engine.shutdown() # Test cases with different configurations configs = [ # Original config { "model_path": DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST, "speculative_draft_model_path": DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST, "speculative_algorithm": "EAGLE", "speculative_num_steps": 5, "speculative_eagle_topk": 8, "speculative_num_draft_tokens": 64, "mem_fraction_static": 0.7, }, # Config with CUDA graph disabled { "model_path": DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST, "speculative_draft_model_path": DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST, "speculative_algorithm": "EAGLE", "speculative_num_steps": 5, "speculative_eagle_topk": 8, "speculative_num_draft_tokens": 64, "mem_fraction_static": 0.7, "disable_cuda_graph": True, }, ] for config in configs: # Launch EAGLE engine engine = sgl.Engine(**config) # Case 1: Test the output of EAGLE engine is the same as normal engine out1 = engine.generate(prompt1, sampling_params1)["text"] print(f"{out1=}, {ref_output=}") self.assertEqual(out1, ref_output) # Case 2: Test the output of EAGLE engine does not contain unexpected EOS prompt2 = "[INST] <>\\nYou are a helpful assistant.\\n<>\\nToday is a sunny day and I like [/INST]" sampling_params2 = { "temperature": 0, "max_new_tokens": 1024, "skip_special_tokens": False, } tokenizer = get_tokenizer(DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST) out2 = engine.generate(prompt2, sampling_params2)["text"] print(f"{out2=}") tokens = tokenizer.encode(out2, truncation=False) assert tokenizer.eos_token_id not in tokens # Case 3: Batched prompts prompts = [ "Hello, my name is", "The president of the United States is", "The capital of France is", "The future of AI is", ] sampling_params3 = {"temperature": 0, "max_new_tokens": 30} outputs = engine.generate(prompts, sampling_params3) for prompt, output in zip(prompts, outputs): print("===============================") print(f"Prompt: {prompt}\nGenerated text: {output['text']}") # Shutdown the engine engine.shutdown() prompts = [ "[INST] <>\\nYou are a helpful assistant.\\n<>\\nToday is a sunny day and I like[/INST]" '[INST] <>\\nYou are a helpful assistant.\\n<>\\nWhat are the mental triggers in Jeff Walker\'s Product Launch Formula and "Launch" book?[/INST]', "[INST] <>\\nYou are a helpful assistant.\\n<>\\nSummarize Russell Brunson's Perfect Webinar Script...[/INST]", "[INST] <>\\nYou are a helpful assistant.\\n<>\\nwho are you?[/INST]", "[INST] <>\\nYou are a helpful assistant.\\n<>\\nwhere are you from?[/INST]", ] class TestEAGLEServer(unittest.TestCase): @classmethod def setUpClass(cls): cls.base_url = DEFAULT_URL_FOR_TEST cls.process = popen_launch_server( DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST, cls.base_url, timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH, other_args=[ "--speculative-algorithm", "EAGLE", "--speculative-draft-model-path", DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST, "--speculative-num-steps", "5", "--speculative-eagle-topk", "8", "--speculative-num-draft-tokens", "64", "--mem-fraction-static", "0.7", ], ) @classmethod def tearDownClass(cls): kill_process_tree(cls.process.pid) def send_request(self): time.sleep(random.uniform(0, 2)) for prompt in prompts: url = self.base_url + "/generate" data = { "text": prompt, "sampling_params": { "temperature": 0, "max_new_tokens": 1024, }, } response = requests.post(url, json=data) assert response.status_code == 200 def send_requests_abort(self): for prompt in prompts: try: time.sleep(random.uniform(0, 2)) url = self.base_url + "/generate" data = { "model": "base", "text": prompt, "sampling_params": { "temperature": 0, "max_new_tokens": 1024, }, } # set timeout = 1s,mock disconnected requests.post(url, json=data, timeout=1) except Exception as e: print(e) pass def test_request_abort(self): concurrency = 4 threads = [ threading.Thread(target=self.send_request) for _ in range(concurrency) ] + [ threading.Thread(target=self.send_requests_abort) for _ in range(concurrency) ] for worker in threads: worker.start() for p in threads: p.join() def test_gsm8k(self): args = SimpleNamespace( num_shots=5, data_path=None, num_questions=200, max_new_tokens=512, parallel=128, host="http://127.0.0.1", port=int(self.base_url.split(":")[-1]), ) metrics = run_eval(args) print(f"{metrics=}") self.assertGreater(metrics["accuracy"], 0.20) if __name__ == "__main__": unittest.main()