auths.py 4.45 KB
Newer Older
1
from pydantic import BaseModel
2
from typing import Optional
3
import uuid
4
import logging
5
from sqlalchemy import String, Column, Boolean
6

7
from apps.webui.models.users import UserModel, Users
Tim Farrell's avatar
Tim Farrell committed
8
from utils.utils import verify_password
9

10
from apps.webui.internal.db import Base, Session
11

12
from config import SRC_LOG_LEVELS
Timothy J. Baek's avatar
Timothy J. Baek committed
13

14
15
16
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])

17
18
19
20
21
####################
# DB MODEL
####################


22
23
class Auth(Base):
    __tablename__ = "auth"
Timothy J. Baek's avatar
Timothy J. Baek committed
24

25
26
27
28
    id = Column(String, primary_key=True)
    email = Column(String)
    password = Column(String)
    active = Column(Boolean)
Timothy J. Baek's avatar
Timothy J. Baek committed
29
30


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class AuthModel(BaseModel):
    id: str
    email: str
    password: str
    active: bool = True


####################
# Forms
####################


class Token(BaseModel):
    token: str
    token_type: str

Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
47

liu.vaayne's avatar
liu.vaayne committed
48
49
class ApiKey(BaseModel):
    api_key: Optional[str] = None
50

Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
51

52
53
54
55
56
class UserResponse(BaseModel):
    id: str
    email: str
    name: str
    role: str
Timothy J. Baek's avatar
Timothy J. Baek committed
57
    profile_image_url: str
58
59
60
61
62
63
64
65
66
67
68


class SigninResponse(Token, UserResponse):
    pass


class SigninForm(BaseModel):
    email: str
    password: str


69
70
71
72
class ProfileImageUrlForm(BaseModel):
    profile_image_url: str


73
74
75
76
77
class UpdateProfileForm(BaseModel):
    profile_image_url: str
    name: str


78
79
80
81
82
class UpdatePasswordForm(BaseModel):
    password: str
    new_password: str


83
84
85
86
class SignupForm(BaseModel):
    name: str
    email: str
    password: str
Timothy J. Baek's avatar
refac  
Timothy J. Baek committed
87
    profile_image_url: Optional[str] = "/user.png"
88
89


Timothy J. Baek's avatar
Timothy J. Baek committed
90
class AddUserForm(SignupForm):
91
    role: Optional[str] = "pending"
Timothy J. Baek's avatar
Timothy J. Baek committed
92
93


94
95
class AuthsTable:

Timothy J. Baek's avatar
Timothy J. Baek committed
96
    def insert_new_auth(
Danny Liu's avatar
Danny Liu committed
97
98
99
100
        self,
        email: str,
        password: str,
        name: str,
Timothy J. Baek's avatar
refac  
Timothy J. Baek committed
101
        profile_image_url: str = "/user.png",
Danny Liu's avatar
Danny Liu committed
102
        role: str = "pending",
103
        oauth_sub: Optional[str] = None,
Timothy J. Baek's avatar
Timothy J. Baek committed
104
    ) -> Optional[UserModel]:
105
        log.info("insert_new_auth")
106

107
        id = str(uuid.uuid4())
108

109
110
111
112
113
        auth = AuthModel(
            **{"id": id, "email": email, "password": password, "active": True}
        )
        result = Auth(**auth.model_dump())
        Session.add(result)
Timothy J. Baek's avatar
Timothy J. Baek committed
114

115
116
        user = Users.insert_new_user(
            id, name, email, profile_image_url, role, oauth_sub)
117

118
119
        Session.commit()
        Session.refresh(result)
120

121
122
123
124
        if result and user:
            return user
        else:
            return None
125

126
    def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
127
        log.info(f"authenticate_user: {email}")
128
129
130
131
132
133
        try:
            auth = Session.query(Auth).filter_by(email=email, active=True).first()
            if auth:
                if verify_password(password, auth.password):
                    user = Users.get_user_by_id(auth.id)
                    return user
Timothy J. Baek's avatar
Timothy J. Baek committed
134
135
                else:
                    return None
136
            else:
137
                return None
138
139
        except:
            return None
140

141
    def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
liu.vaayne's avatar
liu.vaayne committed
142
        log.info(f"authenticate_user_by_api_key: {api_key}")
143
144
145
        # if no api_key, return None
        if not api_key:
            return None
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
146

147
148
149
150
151
        try:
            user = Users.get_user_by_api_key(api_key)
            return user if user else None
        except:
            return False
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
152

153
    def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
154
        log.info(f"authenticate_user_by_trusted_header: {email}")
155
156
157
158
159
160
161
        try:
            auth = Session.query(Auth).filter(email=email, active=True).first()
            if auth:
                user = Users.get_user_by_id(auth.id)
                return user
        except:
            return None
liu.vaayne's avatar
liu.vaayne committed
162

163
    def update_user_password_by_id(self, id: str, new_password: str) -> bool:
164
165
166
167
168
169
170
        try:
            result = (
                Session.query(Auth).filter_by(id=id).update({"password": new_password})
            )
            return True if result == 1 else False
        except:
            return False
171
172

    def update_email_by_id(self, id: str, email: str) -> bool:
173
174
175
176
177
        try:
            result = Session.query(Auth).filter_by(id=id).update({"email": email})
            return True if result == 1 else False
        except:
            return False
178
179

    def delete_auth_by_id(self, id: str) -> bool:
180
181
182
        try:
            # Delete User
            result = Users.delete_user_by_id(id)
183

184
185
            if result:
                Session.query(Auth).filter_by(id=id).delete()
186

187
188
                return True
            else:
Timothy J. Baek's avatar
Timothy J. Baek committed
189
                return False
190
191
        except:
            return False
Timothy J. Baek's avatar
Timothy J. Baek committed
192

193

194
Auths = AuthsTable()