main.py 6.37 KB
Newer Older
Timothy J. Baek's avatar
Timothy J. Baek committed
1
from fastapi import FastAPI, Depends, HTTPException
2
3
from fastapi.routing import APIRoute
from fastapi.middleware.cors import CORSMiddleware
Timothy J. Baek's avatar
Timothy J. Baek committed
4

5
import logging
6
from fastapi import FastAPI, Request, Depends, status, Response
Timothy J. Baek's avatar
Timothy J. Baek committed
7
from fastapi.responses import JSONResponse
8
9
10
11

from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.responses import StreamingResponse
import json
Timothy J. Baek's avatar
Timothy J. Baek committed
12
import requests
13

14
from utils.utils import get_verified_user, get_current_user, get_admin_user
15
from config import SRC_LOG_LEVELS, ENV
Timothy J. Baek's avatar
Timothy J. Baek committed
16
from constants import ERROR_MESSAGES
17
18
19

log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["LITELLM"])
Timothy J. Baek's avatar
Timothy J. Baek committed
20

21
22
23
24
25
26
27

from config import (
    MODEL_FILTER_ENABLED,
    MODEL_FILTER_LIST,
)


28
29
import asyncio
import subprocess
Timothy J. Baek's avatar
Timothy J. Baek committed
30
31


32
app = FastAPI()
Timothy J. Baek's avatar
Timothy J. Baek committed
33

34
origins = ["*"]
Timothy J. Baek's avatar
Timothy J. Baek committed
35

36
37
38
39
40
41
42
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
Timothy J. Baek's avatar
Timothy J. Baek committed
43

44

45
46
47
# Global variable to store the subprocess reference
background_process = None

Timothy J. Baek's avatar
Timothy J. Baek committed
48

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def run_background_process(command):
    global background_process
    print("run_background_process")

    try:
        # Log the command to be executed
        print(f"Executing command: {command}")
        # Execute the command and create a subprocess
        process = await asyncio.create_subprocess_exec(
            *command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        background_process = process
        print("Subprocess started successfully.")

        # Capture STDERR for debugging purposes
        stderr_output = await process.stderr.read()
        stderr_text = stderr_output.decode().strip()
        if stderr_text:
            print(f"Subprocess STDERR: {stderr_text}")

        # Print output line by line
        async for line in process.stdout:
            print(line.decode().strip())

        # Wait for the process to finish
        returncode = await process.wait()
        print(f"Subprocess exited with return code {returncode}")
    except Exception as e:
        log.error(f"Failed to start subprocess: {e}")
        raise  # Optionally re-raise the exception if you want it to propagate
79
80
81


async def start_litellm_background():
Timothy J. Baek's avatar
Timothy J. Baek committed
82
    print("start_litellm_background")
83
    # Command to run in the background
Timothy J. Baek's avatar
Timothy J. Baek committed
84
85
86
    command = (
        "litellm --port 14365 --telemetry False --config ./data/litellm/config.yaml"
    )
Timothy J. Baek's avatar
Timothy J. Baek committed
87

88
    await run_background_process(command)
Timothy J. Baek's avatar
Timothy J. Baek committed
89
90


91
92
93
94
95
96
97
98
99
async def shutdown_litellm_background():
    print("shutdown_litellm_background")
    global background_process
    if background_process:
        background_process.terminate()
        await background_process.wait()  # Ensure the process has terminated
        print("Subprocess terminated")


Timothy J. Baek's avatar
Timothy J. Baek committed
100
@app.on_event("startup")
101
async def startup_event():
Timothy J. Baek's avatar
Timothy J. Baek committed
102
103

    print("startup_event")
Timothy J. Baek's avatar
Timothy J. Baek committed
104
    # TODO: Check config.yaml file and create one
105
    asyncio.create_task(start_litellm_background())
Timothy J. Baek's avatar
Timothy J. Baek committed
106
107


108
109
110
111
app.state.MODEL_FILTER_ENABLED = MODEL_FILTER_ENABLED
app.state.MODEL_FILTER_LIST = MODEL_FILTER_LIST


112
113
114
115
116
@app.get("/")
async def get_status():
    return {"status": True}


117
118
119
120
121
122
123
124
125
126
127
128
@app.get("/restart")
async def restart_litellm(user=Depends(get_admin_user)):
    """
    Endpoint to restart the litellm background service.
    """
    log.info("Requested restart of litellm service.")
    try:
        # Shut down the existing process if it is running
        await shutdown_litellm_background()
        log.info("litellm service shutdown complete.")

        # Restart the background service
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
129
        start_litellm_background()
130
131
132
133
134
135
136
137
138
139
140
141
142
        log.info("litellm service restart complete.")

        return {
            "status": "success",
            "message": "litellm service restarted successfully.",
        }
    except Exception as e:
        log.error(f"Error restarting litellm service: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
        )


Timothy J. Baek's avatar
Timothy J. Baek committed
143
144
145
@app.get("/models")
@app.get("/v1/models")
async def get_models(user=Depends(get_current_user)):
Timothy J. Baek's avatar
Timothy J. Baek committed
146
    url = "http://localhost:14365/v1"
Timothy J. Baek's avatar
Timothy J. Baek committed
147
148
149
150
    r = None
    try:
        r = requests.request(method="GET", url=f"{url}/models")
        r.raise_for_status()
151

Timothy J. Baek's avatar
Timothy J. Baek committed
152
        data = r.json()
153

Timothy J. Baek's avatar
Timothy J. Baek committed
154
155
156
157
158
159
160
161
        if app.state.MODEL_FILTER_ENABLED:
            if user and user.role == "user":
                data["data"] = list(
                    filter(
                        lambda model: model["id"] in app.state.MODEL_FILTER_LIST,
                        data["data"],
                    )
                )
162

Timothy J. Baek's avatar
Timothy J. Baek committed
163
164
165
166
167
168
169
170
171
172
173
        return data
    except Exception as e:
        log.exception(e)
        error_detail = "Open WebUI: Server Connection Error"
        if r is not None:
            try:
                res = r.json()
                if "error" in res:
                    error_detail = f"External: {res['error']}"
            except:
                error_detail = f"External: {e}"
174

Timothy J. Baek's avatar
Timothy J. Baek committed
175
176
177
178
        raise HTTPException(
            status_code=r.status_code if r else 500,
            detail=error_detail,
        )
179
180


Timothy J. Baek's avatar
Timothy J. Baek committed
181
182
183
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(path: str, request: Request, user=Depends(get_verified_user)):
    body = await request.body()
184

Timothy J. Baek's avatar
Timothy J. Baek committed
185
    url = "http://localhost:14365"
186

Timothy J. Baek's avatar
Timothy J. Baek committed
187
    target_url = f"{url}/{path}"
188

Timothy J. Baek's avatar
Timothy J. Baek committed
189
190
191
    headers = {}
    # headers["Authorization"] = f"Bearer {key}"
    headers["Content-Type"] = "application/json"
192

Timothy J. Baek's avatar
Timothy J. Baek committed
193
    r = None
194

Timothy J. Baek's avatar
Timothy J. Baek committed
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    try:
        r = requests.request(
            method=request.method,
            url=target_url,
            data=body,
            headers=headers,
            stream=True,
        )

        r.raise_for_status()

        # Check if response is SSE
        if "text/event-stream" in r.headers.get("Content-Type", ""):
            return StreamingResponse(
                r.iter_content(chunk_size=8192),
                status_code=r.status_code,
                headers=dict(r.headers),
            )
        else:
            response_data = r.json()
            return response_data
    except Exception as e:
        log.exception(e)
        error_detail = "Open WebUI: Server Connection Error"
        if r is not None:
            try:
                res = r.json()
                if "error" in res:
                    error_detail = f"External: {res['error']['message'] if 'message' in res['error'] else res['error']}"
            except:
                error_detail = f"External: {e}"

        raise HTTPException(
            status_code=r.status_code if r else 500, detail=error_detail
        )