"src/vscode:/vscode.git/clone" did not exist on "5fb7adb0ddf0e53f22f2f5787b9b6c251a0aa8e4"
utils.py 3.48 KB
Newer Older
1
2
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi import HTTPException, status, Depends
Timothy J. Baek's avatar
refac  
Timothy J. Baek committed
3

4
from apps.web.models.users import Users
liu.vaayne's avatar
liu.vaayne committed
5

6
7
from pydantic import BaseModel
from typing import Union, Optional
8
from constants import ERROR_MESSAGES
9
10
11
12
from passlib.context import CryptContext
from datetime import datetime, timedelta
import requests
import jwt
liu.vaayne's avatar
liu.vaayne committed
13
import uuid
Timothy J. Baek's avatar
Timothy J. Baek committed
14
import logging
15
16
import config

Timothy J. Baek's avatar
Timothy J. Baek committed
17
18
19
logging.getLogger("passlib").setLevel(logging.ERROR)


20
SESSION_SECRET = config.WEBUI_SECRET_KEY
21
22
23
24
25
26
ALGORITHM = "HS256"

##############
# Auth Utils
##############

Tim Farrell's avatar
Tim Farrell committed
27
bearer_security = HTTPBearer()
28
29
30
31
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password, hashed_password):
Timothy J. Baek's avatar
Timothy J. Baek committed
32
33
34
    return (
        pwd_context.verify(plain_password, hashed_password) if hashed_password else None
    )
35
36
37
38
39
40


def get_password_hash(password):
    return pwd_context.hash(password)


Timothy J. Baek's avatar
Timothy J. Baek committed
41
def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
42
43
44
45
46
47
    payload = data.copy()

    if expires_delta:
        expire = datetime.utcnow() + expires_delta
        payload.update({"exp": expire})

48
    encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM)
49
50
51
52
53
    return encoded_jwt


def decode_token(token: str) -> Optional[dict]:
    try:
54
        decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM])
55
56
57
58
59
60
        return decoded
    except Exception as e:
        return None


def extract_token_from_auth_header(auth_header: str):
Timothy J. Baek's avatar
Timothy J. Baek committed
61
    return auth_header[len("Bearer ") :]
62
63


liu.vaayne's avatar
liu.vaayne committed
64
65
66
67
68
def create_api_key():
    key = str(uuid.uuid4()).replace("-", "")
    return f"sk-{key}"


Timothy J. Baek's avatar
Timothy J. Baek committed
69
70
71
def get_http_authorization_cred(auth_header: str):
    try:
        scheme, credentials = auth_header.split(" ")
Timothy J. Baek's avatar
Timothy J. Baek committed
72
        return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
Timothy J. Baek's avatar
Timothy J. Baek committed
73
74
75
76
    except:
        raise ValueError(ERROR_MESSAGES.INVALID_TOKEN)


Timothy J. Baek's avatar
Timothy J. Baek committed
77
78
79
def get_current_user(
    auth_token: HTTPAuthorizationCredentials = Depends(bearer_security),
):
liu.vaayne's avatar
liu.vaayne committed
80
81
82
83
    # auth by api key
    if auth_token.credentials.startswith("sk-"):
        return get_current_user_by_api_key(auth_token.credentials)
    # auth by jwt token
84
    data = decode_token(auth_token.credentials)
85
86
    if data != None and "id" in data:
        user = Users.get_user_by_id(data["id"])
87
88
89
90
        if user is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail=ERROR_MESSAGES.INVALID_TOKEN,
91
            )
Timothy J. Baek's avatar
Timothy J. Baek committed
92
93
        else:
            Users.update_user_last_active_by_id(user.id)
94
        return user
95
96
97
98
99
    else:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=ERROR_MESSAGES.UNAUTHORIZED,
        )
100

Timothy J. Baek's avatar
Timothy J. Baek committed
101

liu.vaayne's avatar
liu.vaayne committed
102
def get_current_user_by_api_key(api_key: str):
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
103
    user = Users.get_user_by_api_key(api_key)
Timothy J. Baek's avatar
Timothy J. Baek committed
104

liu.vaayne's avatar
liu.vaayne committed
105
106
107
108
109
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=ERROR_MESSAGES.INVALID_TOKEN,
        )
Timothy J. Baek's avatar
Timothy J. Baek committed
110
111
112
    else:
        Users.update_user_last_active_by_id(user.id)

liu.vaayne's avatar
liu.vaayne committed
113
    return user
114

Timothy J. Baek's avatar
Timothy J. Baek committed
115

Timothy J. Baek's avatar
Timothy J. Baek committed
116
def get_verified_user(user=Depends(get_current_user)):
117
118
119
120
121
    if user.role not in {"user", "admin"}:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
        )
Timothy J. Baek's avatar
Timothy J. Baek committed
122
    return user
123
124


Timothy J. Baek's avatar
Timothy J. Baek committed
125
def get_admin_user(user=Depends(get_current_user)):
126
127
128
129
130
    if user.role != "admin":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
        )
Timothy J. Baek's avatar
Timothy J. Baek committed
131
    return user