main.py 6.34 KB
Newer Older
Timothy J. Baek's avatar
Timothy J. Baek committed
1
from fastapi import FastAPI, Depends, HTTPException
2
3
from fastapi.routing import APIRoute
from fastapi.middleware.cors import CORSMiddleware
Timothy J. Baek's avatar
Timothy J. Baek committed
4

5
import logging
6
from fastapi import FastAPI, Request, Depends, status, Response
Timothy J. Baek's avatar
Timothy J. Baek committed
7
from fastapi.responses import JSONResponse
8
9
10
11

from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.responses import StreamingResponse
import json
Timothy J. Baek's avatar
Timothy J. Baek committed
12
import requests
13

14
from utils.utils import get_verified_user, get_current_user, get_admin_user
15
from config import SRC_LOG_LEVELS, ENV
Timothy J. Baek's avatar
Timothy J. Baek committed
16
from constants import ERROR_MESSAGES
17
18
19

log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["LITELLM"])
Timothy J. Baek's avatar
Timothy J. Baek committed
20

21
22
23
24
25
26
27

from config import (
    MODEL_FILTER_ENABLED,
    MODEL_FILTER_LIST,
)


28
29
import asyncio
import subprocess
Timothy J. Baek's avatar
Timothy J. Baek committed
30
31


32
app = FastAPI()
Timothy J. Baek's avatar
Timothy J. Baek committed
33

34
origins = ["*"]
Timothy J. Baek's avatar
Timothy J. Baek committed
35

36
37
38
39
40
41
42
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
Timothy J. Baek's avatar
Timothy J. Baek committed
43

44

45
46
47
# Global variable to store the subprocess reference
background_process = None

Timothy J. Baek's avatar
Timothy J. Baek committed
48

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def run_background_process(command):
    global background_process
    print("run_background_process")

    try:
        # Log the command to be executed
        print(f"Executing command: {command}")
        # Execute the command and create a subprocess
        process = await asyncio.create_subprocess_exec(
            *command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        background_process = process
        print("Subprocess started successfully.")

        # Capture STDERR for debugging purposes
        stderr_output = await process.stderr.read()
        stderr_text = stderr_output.decode().strip()
        if stderr_text:
            print(f"Subprocess STDERR: {stderr_text}")

        # Print output line by line
        async for line in process.stdout:
            print(line.decode().strip())

        # Wait for the process to finish
        returncode = await process.wait()
        print(f"Subprocess exited with return code {returncode}")
    except Exception as e:
        log.error(f"Failed to start subprocess: {e}")
        raise  # Optionally re-raise the exception if you want it to propagate
79
80
81


async def start_litellm_background():
Timothy J. Baek's avatar
Timothy J. Baek committed
82
    print("start_litellm_background")
83
    # Command to run in the background
Timothy J. Baek's avatar
Timothy J. Baek committed
84
    command = "litellm --telemetry False --config ./data/litellm/config.yaml"
Timothy J. Baek's avatar
Timothy J. Baek committed
85

86
    await run_background_process(command)
Timothy J. Baek's avatar
Timothy J. Baek committed
87
88


89
90
91
92
93
94
95
96
97
async def shutdown_litellm_background():
    print("shutdown_litellm_background")
    global background_process
    if background_process:
        background_process.terminate()
        await background_process.wait()  # Ensure the process has terminated
        print("Subprocess terminated")


Timothy J. Baek's avatar
Timothy J. Baek committed
98
@app.on_event("startup")
99
async def startup_event():
Timothy J. Baek's avatar
Timothy J. Baek committed
100
101

    print("startup_event")
Timothy J. Baek's avatar
Timothy J. Baek committed
102
    # TODO: Check config.yaml file and create one
103
    asyncio.create_task(start_litellm_background())
Timothy J. Baek's avatar
Timothy J. Baek committed
104
105


106
107
108
109
app.state.MODEL_FILTER_ENABLED = MODEL_FILTER_ENABLED
app.state.MODEL_FILTER_LIST = MODEL_FILTER_LIST


110
111
112
113
114
@app.get("/")
async def get_status():
    return {"status": True}


115
116
117
118
119
120
121
122
123
124
125
126
@app.get("/restart")
async def restart_litellm(user=Depends(get_admin_user)):
    """
    Endpoint to restart the litellm background service.
    """
    log.info("Requested restart of litellm service.")
    try:
        # Shut down the existing process if it is running
        await shutdown_litellm_background()
        log.info("litellm service shutdown complete.")

        # Restart the background service
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
127
        start_litellm_background()
128
129
130
131
132
133
134
135
136
137
138
139
140
        log.info("litellm service restart complete.")

        return {
            "status": "success",
            "message": "litellm service restarted successfully.",
        }
    except Exception as e:
        log.error(f"Error restarting litellm service: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
        )


Timothy J. Baek's avatar
Timothy J. Baek committed
141
142
143
144
145
146
147
148
@app.get("/models")
@app.get("/v1/models")
async def get_models(user=Depends(get_current_user)):
    url = "http://localhost:4000/v1"
    r = None
    try:
        r = requests.request(method="GET", url=f"{url}/models")
        r.raise_for_status()
149

Timothy J. Baek's avatar
Timothy J. Baek committed
150
        data = r.json()
151

Timothy J. Baek's avatar
Timothy J. Baek committed
152
153
154
155
156
157
158
159
        if app.state.MODEL_FILTER_ENABLED:
            if user and user.role == "user":
                data["data"] = list(
                    filter(
                        lambda model: model["id"] in app.state.MODEL_FILTER_LIST,
                        data["data"],
                    )
                )
160

Timothy J. Baek's avatar
Timothy J. Baek committed
161
162
163
164
165
166
167
168
169
170
171
        return data
    except Exception as e:
        log.exception(e)
        error_detail = "Open WebUI: Server Connection Error"
        if r is not None:
            try:
                res = r.json()
                if "error" in res:
                    error_detail = f"External: {res['error']}"
            except:
                error_detail = f"External: {e}"
172

Timothy J. Baek's avatar
Timothy J. Baek committed
173
174
175
176
        raise HTTPException(
            status_code=r.status_code if r else 500,
            detail=error_detail,
        )
177
178


Timothy J. Baek's avatar
Timothy J. Baek committed
179
180
181
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(path: str, request: Request, user=Depends(get_verified_user)):
    body = await request.body()
182

Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
183
    url = "http://localhost:4000"
184

Timothy J. Baek's avatar
Timothy J. Baek committed
185
    target_url = f"{url}/{path}"
186

Timothy J. Baek's avatar
Timothy J. Baek committed
187
188
189
    headers = {}
    # headers["Authorization"] = f"Bearer {key}"
    headers["Content-Type"] = "application/json"
190

Timothy J. Baek's avatar
Timothy J. Baek committed
191
    r = None
192

Timothy J. Baek's avatar
Timothy J. Baek committed
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
    try:
        r = requests.request(
            method=request.method,
            url=target_url,
            data=body,
            headers=headers,
            stream=True,
        )

        r.raise_for_status()

        # Check if response is SSE
        if "text/event-stream" in r.headers.get("Content-Type", ""):
            return StreamingResponse(
                r.iter_content(chunk_size=8192),
                status_code=r.status_code,
                headers=dict(r.headers),
            )
        else:
            response_data = r.json()
            return response_data
    except Exception as e:
        log.exception(e)
        error_detail = "Open WebUI: Server Connection Error"
        if r is not None:
            try:
                res = r.json()
                if "error" in res:
                    error_detail = f"External: {res['error']['message'] if 'message' in res['error'] else res['error']}"
            except:
                error_detail = f"External: {e}"

        raise HTTPException(
            status_code=r.status_code if r else 500, detail=error_detail
        )