""" Copyright 2023-2024 SGLang Team Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ """ The entry point of inference server. SRT = SGLang Runtime. """ import asyncio import dataclasses import json import logging import multiprocessing as mp import os import sys import threading import time from http import HTTPStatus from typing import Dict, Optional # Fix a bug of Python threading setattr(threading, "_register_atexit", lambda *args, **kwargs: None) import aiohttp import psutil import requests import uvicorn import uvloop from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, Response, StreamingResponse from sglang.lang.backend.runtime_endpoint import RuntimeEndpoint from sglang.srt.constrained import disable_cache from sglang.srt.hf_transformers_utils import get_tokenizer from sglang.srt.managers.controller.manager_multi import ( start_controller_process as start_controller_process_multi, ) from sglang.srt.managers.controller.manager_single import launch_tp_servers from sglang.srt.managers.controller.manager_single import ( start_controller_process as start_controller_process_single, ) from sglang.srt.managers.detokenizer_manager import start_detokenizer_process from sglang.srt.managers.io_struct import GenerateReqInput from sglang.srt.managers.tokenizer_manager import TokenizerManager from sglang.srt.openai_api.adapter import ( load_chat_template_for_openai_api, v1_chat_completions, v1_completions, ) from sglang.srt.openai_api.protocol import ModelCard, ModelList from sglang.srt.server_args import PortArgs, ServerArgs from sglang.srt.utils import ( API_KEY_HEADER_NAME, APIKeyValidatorMiddleware, allocate_init_ports, assert_pkg_version, enable_show_time_cost, maybe_set_triton_cache_manager, set_ulimit, ) from sglang.utils import get_exception_traceback logger = logging.getLogger(__name__) asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) app = FastAPI() tokenizer_manager = None @app.get("/health") async def health() -> Response: """Health check.""" return Response(status_code=200) @app.get("/get_model_info") async def get_model_info(): result = { "model_path": tokenizer_manager.model_path, } return result @app.get("/get_server_args") async def get_server_args(): return dataclasses.asdict(tokenizer_manager.server_args) @app.get("/flush_cache") async def flush_cache(): tokenizer_manager.flush_cache() return Response( content="Cache flushed.\nPlease check backend logs for more details. " "(When there are running or waiting requests, the operation will not be performed.)\n", status_code=200, ) async def generate_request(obj: GenerateReqInput, request: Request): """Handle a generate request.""" if obj.stream: async def stream_results(): try: async for out in tokenizer_manager.generate_request(obj, request): yield f"data: {json.dumps(out, ensure_ascii=False)}\n\n" except ValueError as e: out = {"error": {"message": str(e)}} yield f"data: {json.dumps(out, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( stream_results(), media_type="text/event-stream", background=tokenizer_manager.create_abort_task(obj), ) else: try: ret = await tokenizer_manager.generate_request(obj, request).__anext__() return ret except ValueError as e: return JSONResponse( {"error": {"message": str(e)}}, status_code=HTTPStatus.BAD_REQUEST ) app.post("/generate")(generate_request) app.put("/generate")(generate_request) @app.post("/v1/completions") async def openai_v1_completions(raw_request: Request): return await v1_completions(tokenizer_manager, raw_request) @app.post("/v1/chat/completions") async def openai_v1_chat_completions(raw_request: Request): return await v1_chat_completions(tokenizer_manager, raw_request) @app.get("/v1/models") def available_models(): """Show available models.""" model_names = [tokenizer_manager.model_path] model_cards = [] for model_name in model_names: model_cards.append(ModelCard(id=model_name, root=model_name)) return ModelList(data=model_cards) def _set_torch_compile_config(): # The following configurations are for torch compile optimizations import torch._dynamo.config import torch._inductor.config torch._inductor.config.coordinate_descent_tuning = True torch._inductor.config.triton.unique_kernel_names = True torch._inductor.config.fx_graph_cache = True # Experimental feature to reduce compilation times, will be on by default in future # FIXME: tmp workaround torch._dynamo.config.accumulated_cache_size_limit = 256 def set_envs_and_config(server_args: ServerArgs): # Set global environments os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" os.environ["NCCL_CUMEM_ENABLE"] = "0" os.environ["NCCL_NVLS_ENABLE"] = "0" os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1" # Set ulimit set_ulimit() # Enable show time cost for debugging if server_args.show_time_cost: enable_show_time_cost() # Disable disk cache if server_args.disable_disk_cache: disable_cache() # Fix triton bugs if server_args.tp_size * server_args.dp_size > 1: # FIXME: remove this after https://github.com/triton-lang/triton/pull/4295 is used as a dependency. maybe_set_triton_cache_manager() # Set torch compile config if server_args.enable_torch_compile: _set_torch_compile_config() # Set global chat template if server_args.chat_template: # TODO: replace this with huggingface transformers template load_chat_template_for_openai_api(server_args.chat_template) def launch_server( server_args: ServerArgs, model_overide_args: Optional[dict] = None, pipe_finish_writer: Optional[mp.connection.Connection] = None, ): server_args.check_server_args() """Launch an HTTP server.""" global tokenizer_manager logging.basicConfig( level=getattr(logging, server_args.log_level.upper()), format="%(message)s", ) if not server_args.disable_flashinfer: assert_pkg_version( "flashinfer", "0.1.1", "Please uninstall the old version and " "reinstall the latest version by following the instructions " "at https://docs.flashinfer.ai/installation.html.", ) set_envs_and_config(server_args) # Allocate ports server_args.port, server_args.additional_ports = allocate_init_ports( server_args.port, server_args.additional_ports, server_args.dp_size, ) ports = server_args.additional_ports port_args = PortArgs( tokenizer_port=ports[0], controller_port=ports[1], detokenizer_port=ports[2], nccl_ports=ports[3:], ) logger.info(f"{server_args=}") # Handle multi-node tensor parallelism if server_args.nnodes > 1: if server_args.node_rank != 0: tp_size_local = server_args.tp_size // server_args.nnodes gpu_ids = [ i for _ in range(server_args.nnodes) for i in range(tp_size_local) ] tp_rank_range = list( range( server_args.node_rank * tp_size_local, (server_args.node_rank + 1) * tp_size_local, ) ) procs = launch_tp_servers( gpu_ids, tp_rank_range, server_args, ports[3], model_overide_args, ) while True: pass # Launch processes tokenizer_manager = TokenizerManager(server_args, port_args, model_overide_args) pipe_controller_reader, pipe_controller_writer = mp.Pipe(duplex=False) pipe_detoken_reader, pipe_detoken_writer = mp.Pipe(duplex=False) if server_args.dp_size == 1: start_process = start_controller_process_single else: start_process = start_controller_process_multi proc_controller = mp.Process( target=start_process, args=(server_args, port_args, pipe_controller_writer, model_overide_args), ) proc_controller.start() proc_detoken = mp.Process( target=start_detokenizer_process, args=( server_args, port_args, pipe_detoken_writer, ), ) proc_detoken.start() # Wait for the model to finish loading controller_init_state = pipe_controller_reader.recv() detoken_init_state = pipe_detoken_reader.recv() if controller_init_state != "init ok" or detoken_init_state != "init ok": proc_controller.kill() proc_detoken.kill() print( f"Initialization failed. controller_init_state: {controller_init_state}", flush=True, ) print( f"Initialization failed. detoken_init_state: {detoken_init_state}", flush=True, ) sys.exit(1) assert proc_controller.is_alive() and proc_detoken.is_alive() if server_args.api_key and server_args.api_key != "": app.add_middleware(APIKeyValidatorMiddleware, api_key=server_args.api_key) # Send a warmup request t = threading.Thread( target=_wait_and_warmup, args=(server_args, pipe_finish_writer) ) t.start() # Listen for requests try: uvicorn.run( app, host=server_args.host, port=server_args.port, log_level=server_args.log_level_http or server_args.log_level, timeout_keep_alive=5, loop="uvloop", ) finally: t.join() def _wait_and_warmup(server_args, pipe_finish_writer): headers = {} url = server_args.url() if server_args.api_key: headers[API_KEY_HEADER_NAME] = server_args.api_key # Wait until the server is launched for _ in range(120): time.sleep(0.5) try: requests.get(url + "/get_model_info", timeout=5, headers=headers) break except requests.exceptions.RequestException: pass # Send a warmup request try: for _ in range(server_args.dp_size): res = requests.post( url + "/generate", json={ "text": "The capital city of France is", "sampling_params": { "temperature": 0, "max_new_tokens": 8, }, }, headers=headers, timeout=600, ) assert res.status_code == 200 except Exception as e: if pipe_finish_writer is not None: pipe_finish_writer.send(get_exception_traceback()) print(f"Initialization failed. warmup error: {e}", flush=True) raise e logger.info("The server is fired up and ready to roll!") if pipe_finish_writer is not None: pipe_finish_writer.send("init ok") class Runtime: """ A wrapper for the server. This is used for launching the server in a python program without using the commond line interface. """ def __init__( self, log_level: str = "error", model_overide_args: Optional[dict] = None, *args, **kwargs, ): """See the arguments in server_args.py::ServerArgs""" self.server_args = ServerArgs(*args, log_level=log_level, **kwargs) # Pre-allocate ports self.server_args.port, self.server_args.additional_ports = allocate_init_ports( self.server_args.port, self.server_args.additional_ports, self.server_args.dp_size, ) self.url = self.server_args.url() self.generate_url = ( f"http://{self.server_args.host}:{self.server_args.port}/generate" ) self.pid = None pipe_reader, pipe_writer = mp.Pipe(duplex=False) proc = mp.Process( target=launch_server, args=(self.server_args, model_overide_args, pipe_writer), ) proc.start() pipe_writer.close() self.pid = proc.pid try: init_state = pipe_reader.recv() except EOFError: init_state = "" if init_state != "init ok": self.shutdown() raise RuntimeError( "Initialization failed. Please see the error messages above." ) self.endpoint = RuntimeEndpoint(self.url) def shutdown(self): if self.pid is not None: try: parent = psutil.Process(self.pid) except psutil.NoSuchProcess: return children = parent.children(recursive=True) for child in children: child.kill() psutil.wait_procs(children, timeout=5) parent.kill() parent.wait(timeout=5) self.pid = None def get_tokenizer(self): return get_tokenizer( self.server_args.tokenizer_path, tokenizer_mode=self.server_args.tokenizer_mode, trust_remote_code=self.server_args.trust_remote_code, ) async def add_request( self, prompt: str, sampling_params: Dict, ): json_data = { "text": prompt, "sampling_params": sampling_params, "stream": True, } pos = 0 timeout = aiohttp.ClientTimeout(total=3 * 3600) async with aiohttp.ClientSession(timeout=timeout, trust_env=True) as session: async with session.post(self.generate_url, json=json_data) as response: async for chunk, _ in response.content.iter_chunks(): chunk = chunk.decode("utf-8") if chunk and chunk.startswith("data:"): if chunk == "data: [DONE]\n\n": break data = json.loads(chunk[5:].strip("\n")) cur = data["text"][pos:] if cur: yield cur pos += len(cur) def __del__(self): self.shutdown()