main.py 6.74 KB
Newer Older
Timothy J. Baek's avatar
Timothy J. Baek committed
1
from fastapi import FastAPI, Depends, HTTPException
2
3
from fastapi.routing import APIRoute
from fastapi.middleware.cors import CORSMiddleware
Timothy J. Baek's avatar
Timothy J. Baek committed
4

5
import logging
6
from fastapi import FastAPI, Request, Depends, status, Response
Timothy J. Baek's avatar
Timothy J. Baek committed
7
from fastapi.responses import JSONResponse
8
9
10
11

from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.responses import StreamingResponse
import json
Timothy J. Baek's avatar
Timothy J. Baek committed
12
import requests
13

Timothy J. Baek's avatar
Timothy J. Baek committed
14
from utils.utils import get_verified_user, get_current_user
15
from config import SRC_LOG_LEVELS, ENV
Timothy J. Baek's avatar
Timothy J. Baek committed
16
from constants import ERROR_MESSAGES
17
18
19

log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["LITELLM"])
Timothy J. Baek's avatar
Timothy J. Baek committed
20

21
22
23
24
25
26
27

from config import (
    MODEL_FILTER_ENABLED,
    MODEL_FILTER_LIST,
)


28
29
import asyncio
import subprocess
Timothy J. Baek's avatar
Timothy J. Baek committed
30
31


32
app = FastAPI()
Timothy J. Baek's avatar
Timothy J. Baek committed
33

34
origins = ["*"]
Timothy J. Baek's avatar
Timothy J. Baek committed
35

36
37
38
39
40
41
42
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
Timothy J. Baek's avatar
Timothy J. Baek committed
43

44

45
46
47
# Global variable to store the subprocess reference
background_process = None

Timothy J. Baek's avatar
Timothy J. Baek committed
48

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def run_background_process(command):
    global background_process
    print("run_background_process")

    try:
        # Log the command to be executed
        print(f"Executing command: {command}")
        # Execute the command and create a subprocess
        process = await asyncio.create_subprocess_exec(
            *command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        background_process = process
        print("Subprocess started successfully.")

        # Capture STDERR for debugging purposes
        stderr_output = await process.stderr.read()
        stderr_text = stderr_output.decode().strip()
        if stderr_text:
            print(f"Subprocess STDERR: {stderr_text}")

        # Print output line by line
        async for line in process.stdout:
            print(line.decode().strip())

        # Wait for the process to finish
        returncode = await process.wait()
        print(f"Subprocess exited with return code {returncode}")
    except Exception as e:
        log.error(f"Failed to start subprocess: {e}")
        raise  # Optionally re-raise the exception if you want it to propagate
79
80
81


async def start_litellm_background():
Timothy J. Baek's avatar
Timothy J. Baek committed
82
    print("start_litellm_background")
83
    # Command to run in the background
Timothy J. Baek's avatar
Timothy J. Baek committed
84
    command = "litellm --telemetry False --config ./data/litellm/config.yaml"
Timothy J. Baek's avatar
Timothy J. Baek committed
85

86
    await run_background_process(command)
Timothy J. Baek's avatar
Timothy J. Baek committed
87
88


89
90
91
92
93
94
95
96
97
async def shutdown_litellm_background():
    print("shutdown_litellm_background")
    global background_process
    if background_process:
        background_process.terminate()
        await background_process.wait()  # Ensure the process has terminated
        print("Subprocess terminated")


Timothy J. Baek's avatar
Timothy J. Baek committed
98
@app.on_event("startup")
99
async def startup_event():
Timothy J. Baek's avatar
Timothy J. Baek committed
100
101

    print("startup_event")
Timothy J. Baek's avatar
Timothy J. Baek committed
102
    # TODO: Check config.yaml file and create one
103
    asyncio.create_task(start_litellm_background())
Timothy J. Baek's avatar
Timothy J. Baek committed
104
105


106
107
108
109
app.state.MODEL_FILTER_ENABLED = MODEL_FILTER_ENABLED
app.state.MODEL_FILTER_LIST = MODEL_FILTER_LIST


110
111
112
113
114
@app.get("/")
async def get_status():
    return {"status": True}


Timothy J. Baek's avatar
Timothy J. Baek committed
115
116
117
118
119
120
121
122
@app.get("/models")
@app.get("/v1/models")
async def get_models(user=Depends(get_current_user)):
    url = "http://localhost:4000/v1"
    r = None
    try:
        r = requests.request(method="GET", url=f"{url}/models")
        r.raise_for_status()
123

Timothy J. Baek's avatar
Timothy J. Baek committed
124
        data = r.json()
125

Timothy J. Baek's avatar
Timothy J. Baek committed
126
127
128
129
130
131
132
133
        if app.state.MODEL_FILTER_ENABLED:
            if user and user.role == "user":
                data["data"] = list(
                    filter(
                        lambda model: model["id"] in app.state.MODEL_FILTER_LIST,
                        data["data"],
                    )
                )
134

Timothy J. Baek's avatar
Timothy J. Baek committed
135
136
137
138
139
140
141
142
143
144
145
        return data
    except Exception as e:
        log.exception(e)
        error_detail = "Open WebUI: Server Connection Error"
        if r is not None:
            try:
                res = r.json()
                if "error" in res:
                    error_detail = f"External: {res['error']}"
            except:
                error_detail = f"External: {e}"
146

Timothy J. Baek's avatar
Timothy J. Baek committed
147
148
149
150
        raise HTTPException(
            status_code=r.status_code if r else 500,
            detail=error_detail,
        )
151
152


Timothy J. Baek's avatar
Timothy J. Baek committed
153
154
155
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(path: str, request: Request, user=Depends(get_verified_user)):
    body = await request.body()
156

Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
157
    url = "http://localhost:4000"
158

Timothy J. Baek's avatar
Timothy J. Baek committed
159
    target_url = f"{url}/{path}"
160

Timothy J. Baek's avatar
Timothy J. Baek committed
161
162
163
    headers = {}
    # headers["Authorization"] = f"Bearer {key}"
    headers["Content-Type"] = "application/json"
164

Timothy J. Baek's avatar
Timothy J. Baek committed
165
    r = None
166

Timothy J. Baek's avatar
Timothy J. Baek committed
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
    try:
        r = requests.request(
            method=request.method,
            url=target_url,
            data=body,
            headers=headers,
            stream=True,
        )

        r.raise_for_status()

        # Check if response is SSE
        if "text/event-stream" in r.headers.get("Content-Type", ""):
            return StreamingResponse(
                r.iter_content(chunk_size=8192),
                status_code=r.status_code,
                headers=dict(r.headers),
            )
        else:
            response_data = r.json()
            return response_data
    except Exception as e:
        log.exception(e)
        error_detail = "Open WebUI: Server Connection Error"
        if r is not None:
            try:
                res = r.json()
                if "error" in res:
                    error_detail = f"External: {res['error']['message'] if 'message' in res['error'] else res['error']}"
            except:
                error_detail = f"External: {e}"

        raise HTTPException(
            status_code=r.status_code if r else 500, detail=error_detail
        )


# class ModifyModelsResponseMiddleware(BaseHTTPMiddleware):
#     async def dispatch(
#         self, request: Request, call_next: RequestResponseEndpoint
#     ) -> Response:

#         response = await call_next(request)
#         user = request.state.user

#         if "/models" in request.url.path:
#             if isinstance(response, StreamingResponse):
#                 # Read the content of the streaming response
#                 body = b""
#                 async for chunk in response.body_iterator:
#                     body += chunk

#                 data = json.loads(body.decode("utf-8"))

#                 if app.state.MODEL_FILTER_ENABLED:
#                     if user and user.role == "user":
#                         data["data"] = list(
#                             filter(
#                                 lambda model: model["id"]
#                                 in app.state.MODEL_FILTER_LIST,
#                                 data["data"],
#                             )
#                         )

#                 # Modified Flag
#                 data["modified"] = True
#                 return JSONResponse(content=data)

#         return response


# app.add_middleware(ModifyModelsResponseMiddleware)