utils.py 3.4 KB
Newer Older
1
2
3
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi import HTTPException, status, Depends
from apps.web.models.users import Users
liu.vaayne's avatar
liu.vaayne committed
4

5
6
from pydantic import BaseModel
from typing import Union, Optional
7
from constants import ERROR_MESSAGES
8
9
10
11
from passlib.context import CryptContext
from datetime import datetime, timedelta
import requests
import jwt
liu.vaayne's avatar
liu.vaayne committed
12
import uuid
Timothy J. Baek's avatar
Timothy J. Baek committed
13
import logging
14
15
import config

Timothy J. Baek's avatar
Timothy J. Baek committed
16
17
18
logging.getLogger("passlib").setLevel(logging.ERROR)


19
SESSION_SECRET = config.WEBUI_SECRET_KEY
20
21
22
23
24
25
ALGORITHM = "HS256"

##############
# Auth Utils
##############

Tim Farrell's avatar
Tim Farrell committed
26
bearer_security = HTTPBearer()
27
28
29
30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password, hashed_password):
Timothy J. Baek's avatar
Timothy J. Baek committed
31
32
33
    return (
        pwd_context.verify(plain_password, hashed_password) if hashed_password else None
    )
34
35
36
37
38
39


def get_password_hash(password):
    return pwd_context.hash(password)


Timothy J. Baek's avatar
Timothy J. Baek committed
40
def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
41
42
43
44
45
46
    payload = data.copy()

    if expires_delta:
        expire = datetime.utcnow() + expires_delta
        payload.update({"exp": expire})

47
    encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM)
48
49
50
51
52
    return encoded_jwt


def decode_token(token: str) -> Optional[dict]:
    try:
53
        decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM])
54
55
56
57
58
59
        return decoded
    except Exception as e:
        return None


def extract_token_from_auth_header(auth_header: str):
Timothy J. Baek's avatar
Timothy J. Baek committed
60
    return auth_header[len("Bearer ") :]
61
62


liu.vaayne's avatar
liu.vaayne committed
63
64
65
66
67
def create_api_key():
    key = str(uuid.uuid4()).replace("-", "")
    return f"sk-{key}"


Timothy J. Baek's avatar
Timothy J. Baek committed
68
69
70
def get_http_authorization_cred(auth_header: str):
    try:
        scheme, credentials = auth_header.split(" ")
Timothy J. Baek's avatar
Timothy J. Baek committed
71
        return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
Timothy J. Baek's avatar
Timothy J. Baek committed
72
73
74
75
    except:
        raise ValueError(ERROR_MESSAGES.INVALID_TOKEN)


Timothy J. Baek's avatar
Timothy J. Baek committed
76
77
78
def get_current_user(
    auth_token: HTTPAuthorizationCredentials = Depends(bearer_security),
):
liu.vaayne's avatar
liu.vaayne committed
79
80
81
82
    # auth by api key
    if auth_token.credentials.startswith("sk-"):
        return get_current_user_by_api_key(auth_token.credentials)
    # auth by jwt token
83
    data = decode_token(auth_token.credentials)
84
85
    if data != None and "id" in data:
        user = Users.get_user_by_id(data["id"])
86
87
88
89
        if user is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail=ERROR_MESSAGES.INVALID_TOKEN,
90
            )
91
        return user
92
93
94
95
96
    else:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=ERROR_MESSAGES.UNAUTHORIZED,
        )
97

liu.vaayne's avatar
liu.vaayne committed
98
99
100
101
102
103
104
105
106
107
def get_current_user_by_api_key(api_key: str):
    from apps.web.models.auths import Auths

    user = Auths.authenticate_user_by_api_key(api_key)
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=ERROR_MESSAGES.INVALID_TOKEN,
        )
    return user
108

Timothy J. Baek's avatar
Timothy J. Baek committed
109
def get_verified_user(user=Depends(get_current_user)):
110
111
112
113
114
    if user.role not in {"user", "admin"}:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
        )
Timothy J. Baek's avatar
Timothy J. Baek committed
115
    return user
116
117


Timothy J. Baek's avatar
Timothy J. Baek committed
118
def get_admin_user(user=Depends(get_current_user)):
119
120
121
122
123
    if user.role != "admin":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
        )
Timothy J. Baek's avatar
Timothy J. Baek committed
124
    return user