import gc import json import os import pathlib import subprocess from unittest.mock import MagicMock, patch import openai import pytest import torch from tensorizer import EncryptionParams from vllm import EngineArgs, LLMEngine, RequestOutput, SamplingParams from vllm.engine.arg_utils import EngineArgs # yapf: disable from vllm.model_executor.model_loader.tensorizer import (TensorizerConfig, TensorSerializer, is_vllm_tensorized, load_with_tensorizer, open_stream, serialize_vllm_model, tensorize_vllm_model) from ..conftest import VllmRunner from ..utils import RemoteOpenAIServer, models_path_prefix from .conftest import retry_until_skip from typing import List, Optional, Tuple from vllm.lora.request import LoRARequest # yapf conflicts with isort for this docstring prompts = [ "Hello, my name is", "The president of the United States is", "The capital of France is", "The future of AI is", ] # Create a sampling params object. sampling_params = SamplingParams(temperature=0.8, top_p=0.95, seed=0) model_ref = os.path.join(models_path_prefix, "facebook/opt-125m") tensorize_model_for_testing_script = os.path.join( os.path.dirname(__file__), "tensorize_vllm_model_for_testing.py") def is_curl_installed(): try: subprocess.check_call(['curl', '--version']) return True except (subprocess.CalledProcessError, FileNotFoundError): return False def get_torch_model(vllm_runner: VllmRunner): return vllm_runner \ .model \ .llm_engine \ .model_executor \ .driver_worker \ .model_runner \ .model def write_keyfile(keyfile_path: str): encryption_params = EncryptionParams.random() pathlib.Path(keyfile_path).parent.mkdir(parents=True, exist_ok=True) with open(keyfile_path, 'wb') as f: f.write(encryption_params.key) @patch('vllm.model_executor.model_loader.tensorizer.TensorizerAgent') def test_load_with_tensorizer(mock_agent, tensorizer_config): mock_linear_method = MagicMock() mock_agent_instance = mock_agent.return_value mock_agent_instance.deserialize.return_value = MagicMock() result = load_with_tensorizer(tensorizer_config, quant_method=mock_linear_method) mock_agent.assert_called_once_with(tensorizer_config, quant_method=mock_linear_method) mock_agent_instance.deserialize.assert_called_once() assert result == mock_agent_instance.deserialize.return_value @pytest.mark.skipif(not is_curl_installed(), reason="cURL is not installed") def test_can_deserialize_s3(vllm_runner): model_ref = os.path.join(models_path_prefix, "EleutherAI/pythia-1.4b") tensorized_path = f"{model_ref}/fp16/model.tensors" with vllm_runner(model_ref, load_format="tensorizer", model_loader_extra_config=TensorizerConfig( tensorizer_uri=tensorized_path, num_readers=1, s3_endpoint="object.ord1.coreweave.com", )) as loaded_hf_model: deserialized_outputs = loaded_hf_model.generate(prompts, sampling_params) # noqa: E501 assert deserialized_outputs @pytest.mark.skipif(not is_curl_installed(), reason="cURL is not installed") def test_deserialized_encrypted_vllm_model_has_same_outputs( vllm_runner, tmp_path): with vllm_runner(model_ref) as vllm_model: model_path = tmp_path / (model_ref + ".tensors") key_path = tmp_path / (model_ref + ".key") write_keyfile(key_path) outputs = vllm_model.generate(prompts, sampling_params) config_for_serializing = TensorizerConfig( tensorizer_uri=model_path, encryption_keyfile=key_path ) serialize_vllm_model(get_torch_model(vllm_model), config_for_serializing) config_for_deserializing = TensorizerConfig(tensorizer_uri=model_path, encryption_keyfile=key_path) with vllm_runner( model_ref, load_format="tensorizer", model_loader_extra_config=config_for_deserializing) as loaded_vllm_model: # noqa: E501 deserialized_outputs = loaded_vllm_model.generate(prompts, sampling_params) # noqa: E501 assert outputs == deserialized_outputs def test_deserialized_hf_model_has_same_outputs(hf_runner, vllm_runner, tmp_path): with hf_runner(model_ref) as hf_model: model_path = tmp_path / (model_ref + ".tensors") max_tokens = 50 outputs = hf_model.generate_greedy(prompts, max_tokens=max_tokens) with open_stream(model_path, "wb+") as stream: serializer = TensorSerializer(stream) serializer.write_module(hf_model.model) with vllm_runner(model_ref, load_format="tensorizer", model_loader_extra_config=TensorizerConfig( tensorizer_uri=model_path, num_readers=1, )) as loaded_hf_model: deserialized_outputs = loaded_hf_model.generate_greedy( prompts, max_tokens=max_tokens) assert outputs == deserialized_outputs def create_test_prompts( lora_path: str ) -> List[Tuple[str, SamplingParams, Optional[LoRARequest]]]: """Create a list of test prompts with their sampling parameters. 2 requests for base model, 4 requests for the LoRA. We define 2 different LoRA adapters (using the same model for demo purposes). Since we also set `max_loras=1`, the expectation is that the requests with the second LoRA adapter will be ran after all requests with the first adapter have finished. """ return [ ("A robot may not injure a human being", SamplingParams(temperature=0.0, logprobs=1, prompt_logprobs=1, max_tokens=128), None), ("To be or not to be,", SamplingParams(temperature=0.8, top_k=5, presence_penalty=0.2, max_tokens=128), None), ( "[user] Write a SQL query to answer the question based on the table schema.\n\n context: CREATE TABLE table_name_74 (icao VARCHAR, airport VARCHAR)\n\n question: Name the ICAO for lilongwe international airport [/user] [assistant]", # noqa: E501 SamplingParams(temperature=0.0, logprobs=1, prompt_logprobs=1, max_tokens=128, stop_token_ids=[32003]), LoRARequest("sql-lora", 1, lora_path)), ( "[user] Write a SQL query to answer the question based on the table schema.\n\n context: CREATE TABLE table_name_11 (nationality VARCHAR, elector VARCHAR)\n\n question: When Anchero Pantaleone was the elector what is under nationality? [/user] [assistant]", # noqa: E501 SamplingParams(n=3, best_of=3, use_beam_search=True, temperature=0, max_tokens=128, stop_token_ids=[32003]), LoRARequest("sql-lora", 1, lora_path)), ( "[user] Write a SQL query to answer the question based on the table schema.\n\n context: CREATE TABLE table_name_74 (icao VARCHAR, airport VARCHAR)\n\n question: Name the ICAO for lilongwe international airport [/user] [assistant]", # noqa: E501 SamplingParams(temperature=0.0, logprobs=1, prompt_logprobs=1, max_tokens=128, stop_token_ids=[32003]), LoRARequest("sql-lora2", 2, lora_path)), ( "[user] Write a SQL query to answer the question based on the table schema.\n\n context: CREATE TABLE table_name_11 (nationality VARCHAR, elector VARCHAR)\n\n question: When Anchero Pantaleone was the elector what is under nationality? [/user] [assistant]", # noqa: E501 SamplingParams(n=3, best_of=3, use_beam_search=True, temperature=0, max_tokens=128, stop_token_ids=[32003]), LoRARequest("sql-lora", 1, lora_path)), ] def process_requests(engine: LLMEngine, test_prompts: List[Tuple[str, SamplingParams, Optional[LoRARequest]]]): """Continuously process a list of prompts and handle the outputs.""" request_id = 0 while test_prompts or engine.has_unfinished_requests(): if test_prompts: prompt, sampling_params, lora_request = test_prompts.pop(0) engine.add_request(str(request_id), prompt, sampling_params, lora_request=lora_request) request_id += 1 request_outputs: List[RequestOutput] = engine.step() for request_output in request_outputs: if request_output.finished: print(request_output) def test_vllm_model_can_load_with_lora(vllm_runner, tmp_path): # from huggingface_hub import snapshot_download # from examples.multilora_inference import (create_test_prompts, # process_requests) model_ref = os.path.join(models_path_prefix, "meta-llama/Llama-2-7b-hf") # lora_path = snapshot_download(repo_id="yard1/llama-2-7b-sql-lora-test") lora_path = os.path.join(models_path_prefix, "yard1/llama-2-7b-sql-lora-test") test_prompts = create_test_prompts(lora_path) # Serialize model before deserializing and binding LoRA adapters with vllm_runner(model_ref, ) as vllm_model: model_path = tmp_path / (model_ref + ".tensors") serialize_vllm_model(get_torch_model(vllm_model), TensorizerConfig(tensorizer_uri=model_path)) with vllm_runner( model_ref, load_format="tensorizer", model_loader_extra_config=TensorizerConfig( tensorizer_uri=model_path, num_readers=1, ), enable_lora=True, max_loras=1, max_lora_rank=8, max_cpu_loras=2, max_num_seqs=50, max_model_len=1000, ) as loaded_vllm_model: process_requests(loaded_vllm_model.model.llm_engine, test_prompts) assert loaded_vllm_model def test_load_without_tensorizer_load_format(vllm_runner): model = None with pytest.raises(ValueError): model = vllm_runner( model_ref, model_loader_extra_config=TensorizerConfig(tensorizer_uri="test")) del model gc.collect() torch.cuda.empty_cache() @pytest.mark.skipif(not is_curl_installed(), reason="cURL is not installed") def test_openai_apiserver_with_tensorizer(vllm_runner, tmp_path): ## Serialize model with vllm_runner(model_ref, ) as vllm_model: model_path = tmp_path / (model_ref + ".tensors") serialize_vllm_model(get_torch_model(vllm_model), TensorizerConfig(tensorizer_uri=model_path)) model_loader_extra_config = { "tensorizer_uri": str(model_path), } ## Start OpenAI API server openai_args = [ "--dtype", "float16", "--load-format", "tensorizer", "--model-loader-extra-config", json.dumps(model_loader_extra_config), ] with RemoteOpenAIServer(model_ref, openai_args) as server: print("Server ready.") client = server.get_client() completion = client.completions.create(model=model_ref, prompt="Hello, my name is", max_tokens=5, temperature=0.0) assert completion.id is not None assert len(completion.choices) == 1 assert len(completion.choices[0].text) >= 5 assert completion.choices[0].finish_reason == "length" assert completion.usage == openai.types.CompletionUsage( completion_tokens=5, prompt_tokens=6, total_tokens=11) def test_raise_value_error_on_invalid_load_format(vllm_runner): model = None with pytest.raises(ValueError): model = vllm_runner( model_ref, load_format="safetensors", model_loader_extra_config=TensorizerConfig(tensorizer_uri="test")) del model gc.collect() torch.cuda.empty_cache() @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires 2 GPUs") def test_tensorizer_with_tp_path_without_template(vllm_runner): with pytest.raises(ValueError): model_ref = os.path.join(models_path_prefix, "EleutherAI/pythia-1.4b") tensorized_path = f"{model_ref}/fp16/model.tensors" vllm_runner( model_ref, load_format="tensorizer", model_loader_extra_config=TensorizerConfig( tensorizer_uri=tensorized_path, num_readers=1, s3_endpoint="object.ord1.coreweave.com", ), tensor_parallel_size=2, disable_custom_all_reduce=True, ) @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires 2 GPUs") def test_deserialized_encrypted_vllm_model_with_tp_has_same_outputs(vllm_runner, tmp_path): model_ref = os.path.join(models_path_prefix, "EleutherAI/pythia-1.4b") # record outputs from un-sharded un-tensorized model with vllm_runner( model_ref, disable_custom_all_reduce=True, enforce_eager=True, ) as base_model: outputs = base_model.generate(prompts, sampling_params) base_model.model.llm_engine.model_executor.shutdown() # load model with two shards and serialize with encryption model_path = str(tmp_path / (model_ref + "-%02d.tensors")) key_path = tmp_path / (model_ref + ".key") tensorizer_config = TensorizerConfig( tensorizer_uri=model_path, encryption_keyfile=key_path, ) tensorize_vllm_model( engine_args=EngineArgs( model=model_ref, tensor_parallel_size=2, disable_custom_all_reduce=True, enforce_eager=True, ), tensorizer_config=tensorizer_config, ) assert os.path.isfile(model_path % 0), "Serialization subprocess failed" assert os.path.isfile(model_path % 1), "Serialization subprocess failed" with vllm_runner( model_ref, tensor_parallel_size=2, load_format="tensorizer", disable_custom_all_reduce=True, enforce_eager=True, model_loader_extra_config=tensorizer_config) as loaded_vllm_model: deserialized_outputs = loaded_vllm_model.generate(prompts, sampling_params) assert outputs == deserialized_outputs @retry_until_skip(3) def test_vllm_tensorized_model_has_same_outputs(vllm_runner, tmp_path): gc.collect() torch.cuda.empty_cache() model_ref = os.path.join(models_path_prefix, "facebook/opt-125m") model_path = tmp_path / (model_ref + ".tensors") config = TensorizerConfig(tensorizer_uri=str(model_path)) with vllm_runner(model_ref) as vllm_model: outputs = vllm_model.generate(prompts, sampling_params) serialize_vllm_model(get_torch_model(vllm_model), config) assert is_vllm_tensorized(config) with vllm_runner(model_ref, load_format="tensorizer", model_loader_extra_config=config) as loaded_vllm_model: deserialized_outputs = loaded_vllm_model.generate(prompts, sampling_params) # noqa: E501 assert outputs == deserialized_outputs