main.py 10.9 KB
Newer Older
1
import sys
2
from contextlib import asynccontextmanager
3

Timothy J. Baek's avatar
Timothy J. Baek committed
4
from fastapi import FastAPI, Depends, HTTPException
5
6
from fastapi.routing import APIRoute
from fastapi.middleware.cors import CORSMiddleware
Timothy J. Baek's avatar
Timothy J. Baek committed
7

8
import logging
9
from fastapi import FastAPI, Request, Depends, status, Response
Timothy J. Baek's avatar
Timothy J. Baek committed
10
from fastapi.responses import JSONResponse
11
12
13
14

from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.responses import StreamingResponse
import json
15
import time
Timothy J. Baek's avatar
Timothy J. Baek committed
16
import requests
17

18
from pydantic import BaseModel, ConfigDict
Timothy J. Baek's avatar
Timothy J. Baek committed
19
20
from typing import Optional, List

21
from apps.web.models.models import Models
22
from utils.utils import get_verified_user, get_current_user, get_admin_user
23
from config import SRC_LOG_LEVELS
24
from constants import MESSAGES
25

26
27
import os

28
29
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["LITELLM"])
Timothy J. Baek's avatar
Timothy J. Baek committed
30

31

32
from config import (
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
33
    ENABLE_LITELLM,
Timothy J. Baek's avatar
Timothy J. Baek committed
34
    ENABLE_MODEL_FILTER,
35
36
37
    MODEL_FILTER_LIST,
    DATA_DIR,
    LITELLM_PROXY_PORT,
38
    LITELLM_PROXY_HOST,
39
)
40

Timothy J. Baek's avatar
Timothy J. Baek committed
41
42
43
44
import warnings

warnings.simplefilter("ignore")

45
from litellm.utils import get_llm_provider
46

47
48
import asyncio
import subprocess
Timothy J. Baek's avatar
Timothy J. Baek committed
49
import yaml
Timothy J. Baek's avatar
Timothy J. Baek committed
50

51
52
53
54
55
56
57
58
59
60

@asynccontextmanager
async def lifespan(app: FastAPI):
    log.info("startup_event")
    # TODO: Check config.yaml file and create one
    asyncio.create_task(start_litellm_background())
    yield


app = FastAPI(lifespan=lifespan)
Timothy J. Baek's avatar
Timothy J. Baek committed
61

62
origins = ["*"]
Timothy J. Baek's avatar
Timothy J. Baek committed
63

64
65
66
67
68
69
70
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
Timothy J. Baek's avatar
Timothy J. Baek committed
71

72

Timothy J. Baek's avatar
Timothy J. Baek committed
73
74
75
76
77
LITELLM_CONFIG_DIR = f"{DATA_DIR}/litellm/config.yaml"

with open(LITELLM_CONFIG_DIR, "r") as file:
    litellm_config = yaml.safe_load(file)

Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
78

Timothy J. Baek's avatar
Timothy J. Baek committed
79
80
app.state.ENABLE_MODEL_FILTER = ENABLE_MODEL_FILTER.value
app.state.MODEL_FILTER_LIST = MODEL_FILTER_LIST.value
81
82
83
app.state.MODEL_CONFIG = [
    model.to_form() for model in Models.get_all_models_by_source("litellm")
]
Timothy J. Baek's avatar
Timothy J. Baek committed
84

Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
85
app.state.ENABLE = ENABLE_LITELLM
Timothy J. Baek's avatar
Timothy J. Baek committed
86
87
app.state.CONFIG = litellm_config

88
89
90
# Global variable to store the subprocess reference
background_process = None

91
92
93
94
95
96
97
CONFLICT_ENV_VARS = [
    # Uvicorn uses PORT, so LiteLLM might use it as well
    "PORT",
    # LiteLLM uses DATABASE_URL for Prisma connections
    "DATABASE_URL",
]

Timothy J. Baek's avatar
Timothy J. Baek committed
98

99
100
async def run_background_process(command):
    global background_process
Timothy J. Baek's avatar
Timothy J. Baek committed
101
    log.info("run_background_process")
102
103
104

    try:
        # Log the command to be executed
Timothy J. Baek's avatar
Timothy J. Baek committed
105
        log.info(f"Executing command: {command}")
106
107
        # Filter environment variables known to conflict with litellm
        env = {k: v for k, v in os.environ.items() if k not in CONFLICT_ENV_VARS}
108
109
        # Execute the command and create a subprocess
        process = await asyncio.create_subprocess_exec(
110
            *command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env
111
112
        )
        background_process = process
Timothy J. Baek's avatar
Timothy J. Baek committed
113
        log.info("Subprocess started successfully.")
114
115
116
117
118

        # Capture STDERR for debugging purposes
        stderr_output = await process.stderr.read()
        stderr_text = stderr_output.decode().strip()
        if stderr_text:
Timothy J. Baek's avatar
Timothy J. Baek committed
119
            log.info(f"Subprocess STDERR: {stderr_text}")
120

Timothy J. Baek's avatar
Timothy J. Baek committed
121
        # log.info output line by line
122
        async for line in process.stdout:
Timothy J. Baek's avatar
Timothy J. Baek committed
123
            log.info(line.decode().strip())
124
125
126

        # Wait for the process to finish
        returncode = await process.wait()
Timothy J. Baek's avatar
Timothy J. Baek committed
127
        log.info(f"Subprocess exited with return code {returncode}")
128
129
130
    except Exception as e:
        log.error(f"Failed to start subprocess: {e}")
        raise  # Optionally re-raise the exception if you want it to propagate
131
132
133


async def start_litellm_background():
Timothy J. Baek's avatar
Timothy J. Baek committed
134
    log.info("start_litellm_background")
135
    # Command to run in the background
136
137
138
139
140
141
142
143
144
145
146
    command = [
        "litellm",
        "--port",
        str(LITELLM_PROXY_PORT),
        "--host",
        LITELLM_PROXY_HOST,
        "--telemetry",
        "False",
        "--config",
        LITELLM_CONFIG_DIR,
    ]
Timothy J. Baek's avatar
Timothy J. Baek committed
147

148
    await run_background_process(command)
Timothy J. Baek's avatar
Timothy J. Baek committed
149
150


151
async def shutdown_litellm_background():
Timothy J. Baek's avatar
Timothy J. Baek committed
152
    log.info("shutdown_litellm_background")
153
154
155
156
    global background_process
    if background_process:
        background_process.terminate()
        await background_process.wait()  # Ensure the process has terminated
Timothy J. Baek's avatar
Timothy J. Baek committed
157
        log.info("Subprocess terminated")
158
        background_process = None
159
160


161
162
163
164
165
@app.get("/")
async def get_status():
    return {"status": True}


Timothy J. Baek's avatar
Timothy J. Baek committed
166
async def restart_litellm():
167
168
169
170
171
172
173
174
175
176
    """
    Endpoint to restart the litellm background service.
    """
    log.info("Requested restart of litellm service.")
    try:
        # Shut down the existing process if it is running
        await shutdown_litellm_background()
        log.info("litellm service shutdown complete.")

        # Restart the background service
Timothy J. Baek's avatar
Timothy J. Baek committed
177
178

        asyncio.create_task(start_litellm_background())
179
180
181
182
183
184
185
        log.info("litellm service restart complete.")

        return {
            "status": "success",
            "message": "litellm service restarted successfully.",
        }
    except Exception as e:
Timothy J. Baek's avatar
Timothy J. Baek committed
186
        log.info(f"Error restarting litellm service: {e}")
187
188
189
190
191
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
        )


Timothy J. Baek's avatar
Timothy J. Baek committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@app.get("/restart")
async def restart_litellm_handler(user=Depends(get_admin_user)):
    return await restart_litellm()


@app.get("/config")
async def get_config(user=Depends(get_admin_user)):
    return app.state.CONFIG


class LiteLLMConfigForm(BaseModel):
    general_settings: Optional[dict] = None
    litellm_settings: Optional[dict] = None
    model_list: Optional[List[dict]] = None
    router_settings: Optional[dict] = None

208
209
    model_config = ConfigDict(protected_namespaces=())

Timothy J. Baek's avatar
Timothy J. Baek committed
210
211
212
213
214
215
216
217
218
219
220
221

@app.post("/config/update")
async def update_config(form_data: LiteLLMConfigForm, user=Depends(get_admin_user)):
    app.state.CONFIG = form_data.model_dump(exclude_none=True)

    with open(LITELLM_CONFIG_DIR, "w") as file:
        yaml.dump(app.state.CONFIG, file)

    await restart_litellm()
    return app.state.CONFIG


Timothy J. Baek's avatar
Timothy J. Baek committed
222
223
224
@app.get("/models")
@app.get("/v1/models")
async def get_models(user=Depends(get_current_user)):
225

Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
    if app.state.ENABLE:
        while not background_process:
            await asyncio.sleep(0.1)

        url = f"http://localhost:{LITELLM_PROXY_PORT}/v1"
        r = None
        try:
            r = requests.request(method="GET", url=f"{url}/models")
            r.raise_for_status()

            data = r.json()

            if app.state.ENABLE_MODEL_FILTER:
                if user and user.role == "user":
                    data["data"] = list(
                        filter(
                            lambda model: model["id"] in app.state.MODEL_FILTER_LIST,
                            data["data"],
                        )
Timothy J. Baek's avatar
Timothy J. Baek committed
245
                    )
246

247
248
            for model in data["data"]:
                add_custom_info_to_model(model)
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
            return data
        except Exception as e:

            log.exception(e)
            error_detail = "Open WebUI: Server Connection Error"
            if r is not None:
                try:
                    res = r.json()
                    if "error" in res:
                        error_detail = f"External: {res['error']}"
                except:
                    error_detail = f"External: {e}"

            return {
                "data": [
                    {
                        "id": model["model_name"],
                        "object": "model",
                        "created": int(time.time()),
                        "owned_by": "openai",
269
270
271
272
                        "custom_info": next(
                            (
                                item
                                for item in app.state.MODEL_CONFIG
273
                                if item.id == model["model_name"]
274
                            ),
275
                            None,
276
                        ),
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
277
278
279
280
281
282
                    }
                    for model in app.state.CONFIG["model_list"]
                ],
                "object": "list",
            }
    else:
283
        return {
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
284
            "data": [],
285
286
            "object": "list",
        }
287
288


289
290
def add_custom_info_to_model(model: dict):
    model["custom_info"] = next(
291
        (item for item in app.state.MODEL_CONFIG if item.id == model["id"]), None
292
293
294
    )


295
296
297
298
299
300
301
302
303
@app.get("/model/info")
async def get_model_list(user=Depends(get_admin_user)):
    return {"data": app.state.CONFIG["model_list"]}


class AddLiteLLMModelForm(BaseModel):
    model_name: str
    litellm_params: dict

304
305
    model_config = ConfigDict(protected_namespaces=())

306
307
308
309
310

@app.post("/model/new")
async def add_model_to_config(
    form_data: AddLiteLLMModelForm, user=Depends(get_admin_user)
):
311
312
313
    try:
        get_llm_provider(model=form_data.model_name)
        app.state.CONFIG["model_list"].append(form_data.model_dump())
314

315
316
        with open(LITELLM_CONFIG_DIR, "w") as file:
            yaml.dump(app.state.CONFIG, file)
317

318
        await restart_litellm()
319

320
321
322
323
324
325
        return {"message": MESSAGES.MODEL_ADDED(form_data.model_name)}
    except Exception as e:
        print(e)
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
        )
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346


class DeleteLiteLLMModelForm(BaseModel):
    id: str


@app.post("/model/delete")
async def delete_model_from_config(
    form_data: DeleteLiteLLMModelForm, user=Depends(get_admin_user)
):
    app.state.CONFIG["model_list"] = [
        model
        for model in app.state.CONFIG["model_list"]
        if model["model_name"] != form_data.id
    ]

    with open(LITELLM_CONFIG_DIR, "w") as file:
        yaml.dump(app.state.CONFIG, file)

    await restart_litellm()

347
    return {"message": MESSAGES.MODEL_DELETED(form_data.id)}
348
349


Timothy J. Baek's avatar
Timothy J. Baek committed
350
351
352
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(path: str, request: Request, user=Depends(get_verified_user)):
    body = await request.body()
353

354
    url = f"http://localhost:{LITELLM_PROXY_PORT}"
355

Timothy J. Baek's avatar
Timothy J. Baek committed
356
    target_url = f"{url}/{path}"
357

Timothy J. Baek's avatar
Timothy J. Baek committed
358
359
360
    headers = {}
    # headers["Authorization"] = f"Bearer {key}"
    headers["Content-Type"] = "application/json"
361

Timothy J. Baek's avatar
Timothy J. Baek committed
362
    r = None
363

Timothy J. Baek's avatar
Timothy J. Baek committed
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
    try:
        r = requests.request(
            method=request.method,
            url=target_url,
            data=body,
            headers=headers,
            stream=True,
        )

        r.raise_for_status()

        # Check if response is SSE
        if "text/event-stream" in r.headers.get("Content-Type", ""):
            return StreamingResponse(
                r.iter_content(chunk_size=8192),
                status_code=r.status_code,
                headers=dict(r.headers),
            )
        else:
            response_data = r.json()
            return response_data
    except Exception as e:
        log.exception(e)
        error_detail = "Open WebUI: Server Connection Error"
        if r is not None:
            try:
                res = r.json()
                if "error" in res:
                    error_detail = f"External: {res['error']['message'] if 'message' in res['error'] else res['error']}"
            except:
                error_detail = f"External: {e}"

        raise HTTPException(
            status_code=r.status_code if r else 500, detail=error_detail
        )