chats.py 8.31 KB
Newer Older
Timothy J. Baek's avatar
Timothy J. Baek committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pydantic import BaseModel
from typing import List, Union, Optional
from peewee import *
from playhouse.shortcuts import model_to_dict

import json
import uuid
import time

from apps.web.internal.db import DB

####################
# Chat DB Schema
####################


class Chat(Model):
    id = CharField(unique=True)
Timothy J. Baek's avatar
Timothy J. Baek committed
19
    user_id = CharField()
Timothy J. Baek's avatar
Timothy J. Baek committed
20
21
22
    title = CharField()
    chat = TextField()  # Save Chat JSON as Text
    timestamp = DateField()
23
    share_id = CharField(null=True, unique=True)
Timothy J. Baek's avatar
Timothy J. Baek committed
24
    archived = BooleanField(default=False)
Timothy J. Baek's avatar
Timothy J. Baek committed
25
26
27
28
29
30
31
32
33

    class Meta:
        database = DB


class ChatModel(BaseModel):
    id: str
    user_id: str
    title: str
Timothy J. Baek's avatar
Timothy J. Baek committed
34
    chat: str
Timothy J. Baek's avatar
Timothy J. Baek committed
35
    timestamp: int  # timestamp in epoch
36
    share_id: Optional[str] = None
Timothy J. Baek's avatar
Timothy J. Baek committed
37
    archived: bool = False
Timothy J. Baek's avatar
Timothy J. Baek committed
38
39
40
41
42
43
44
45
46
47
48


####################
# Forms
####################


class ChatForm(BaseModel):
    chat: dict


Timothy J. Baek's avatar
Timothy J. Baek committed
49
50
51
52
class ChatTitleForm(BaseModel):
    title: str


53
class ChatResponse(BaseModel):
Timothy J. Baek's avatar
Timothy J. Baek committed
54
    id: str
55
56
57
58
    user_id: str
    title: str
    chat: dict
    timestamp: int  # timestamp in epoch
59
    share_id: Optional[str] = None  # id of the chat to be shared
Timothy J. Baek's avatar
Timothy J. Baek committed
60
61
62
63
64
65
66
67
68
69
70
71


class ChatTitleIdResponse(BaseModel):
    id: str
    title: str


class ChatTable:
    def __init__(self, db):
        self.db = db
        db.create_tables([Chat])

Timothy J. Baek's avatar
Timothy J. Baek committed
72
    def insert_new_chat(self, user_id: str, form_data: ChatForm) -> Optional[ChatModel]:
Timothy J. Baek's avatar
Timothy J. Baek committed
73
74
75
76
77
        id = str(uuid.uuid4())
        chat = ChatModel(
            **{
                "id": id,
                "user_id": user_id,
78
79
80
                "title": (
                    form_data.chat["title"] if "title" in form_data.chat else "New Chat"
                ),
Timothy J. Baek's avatar
Timothy J. Baek committed
81
                "chat": json.dumps(form_data.chat),
Timothy J. Baek's avatar
Timothy J. Baek committed
82
                "timestamp": int(time.time()),
Timothy J. Baek's avatar
Timothy J. Baek committed
83
84
            }
        )
Timothy J. Baek's avatar
Timothy J. Baek committed
85
86
87
88
89
90

        result = Chat.create(**chat.model_dump())
        return chat if result else None

    def update_chat_by_id(self, id: str, chat: dict) -> Optional[ChatModel]:
        try:
91
92
93
94
95
            query = Chat.update(
                chat=json.dumps(chat),
                title=chat["title"] if "title" in chat else "New Chat",
                timestamp=int(time.time()),
            ).where(Chat.id == id)
Timothy J. Baek's avatar
Timothy J. Baek committed
96
97
98
99
100
101
102
            query.execute()

            chat = Chat.get(Chat.id == id)
            return ChatModel(**model_to_dict(chat))
        except:
            return None

Timothy J. Baek's avatar
Timothy J. Baek committed
103
    def insert_shared_chat_by_chat_id(self, chat_id: str) -> Optional[ChatModel]:
104
105
106
107
108
109
110
111
112
        # Get the existing chat to share
        chat = Chat.get(Chat.id == chat_id)
        # Check if the chat is already shared
        if chat.share_id:
            return self.get_chat_by_id_and_user_id(chat.share_id, "shared")
        # Create a new chat with the same data, but with a new ID
        shared_chat = ChatModel(
            **{
                "id": str(uuid.uuid4()),
Timothy J. Baek's avatar
Timothy J. Baek committed
113
                "user_id": f"shared-{chat_id}",
114
115
116
117
118
119
120
121
122
123
124
125
126
                "title": chat.title,
                "chat": chat.chat,
                "timestamp": int(time.time()),
            }
        )
        shared_result = Chat.create(**shared_chat.model_dump())
        # Update the original chat with the share_id
        result = (
            Chat.update(share_id=shared_chat.id).where(Chat.id == chat_id).execute()
        )

        return shared_chat if (shared_result and result) else None

Timothy J. Baek's avatar
Timothy J. Baek committed
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    def update_shared_chat_by_chat_id(self, chat_id: str) -> Optional[ChatModel]:
        try:
            print("update_shared_chat_by_id")
            chat = Chat.get(Chat.id == chat_id)
            print(chat)

            query = Chat.update(
                title=chat.title,
                chat=chat.chat,
            ).where(Chat.id == chat.share_id)

            query.execute()

            chat = Chat.get(Chat.id == chat.share_id)
            return ChatModel(**model_to_dict(chat))
        except:
            return None

Timothy J. Baek's avatar
Timothy J. Baek committed
145
146
147
148
149
150
151
152
153
    def delete_shared_chat_by_chat_id(self, chat_id: str) -> bool:
        try:
            query = Chat.delete().where(Chat.user_id == f"shared-{chat_id}")
            query.execute()  # Remove the rows, return number of rows removed.

            return True
        except:
            return False

154
    def update_chat_share_id_by_id(
Timothy J. Baek's avatar
Timothy J. Baek committed
155
        self, id: str, share_id: Optional[str]
156
157
158
159
160
161
162
163
164
165
166
167
    ) -> Optional[ChatModel]:
        try:
            query = Chat.update(
                share_id=share_id,
            ).where(Chat.id == id)
            query.execute()

            chat = Chat.get(Chat.id == id)
            return ChatModel(**model_to_dict(chat))
        except:
            return None

Timothy J. Baek's avatar
Timothy J. Baek committed
168
169
170
171
172
173
174
175
176
177
178
179
180
181
    def toggle_chat_archive_by_id(self, id: str) -> Optional[ChatModel]:
        try:
            chat = self.get_chat_by_id(id)
            query = Chat.update(
                archived=(not chat.archived),
            ).where(Chat.id == id)

            query.execute()

            chat = Chat.get(Chat.id == id)
            return ChatModel(**model_to_dict(chat))
        except:
            return None

Timothy J. Baek's avatar
Timothy J. Baek committed
182
183
184
    def get_chat_lists_by_user_id(
        self, user_id: str, skip: int = 0, limit: int = 50
    ) -> List[ChatModel]:
Timothy J. Baek's avatar
Timothy J. Baek committed
185
        return [
Timothy J. Baek's avatar
Timothy J. Baek committed
186
187
            ChatModel(**model_to_dict(chat))
            for chat in Chat.select()
Timothy J. Baek's avatar
Timothy J. Baek committed
188
            .where(Chat.archived == False)
Timothy J. Baek's avatar
Timothy J. Baek committed
189
190
            .where(Chat.user_id == user_id)
            .order_by(Chat.timestamp.desc())
191
192
            # .limit(limit)
            # .offset(skip)
Timothy J. Baek's avatar
Timothy J. Baek committed
193
194
        ]

Timothy J. Baek's avatar
Timothy J. Baek committed
195
196
197
198
199
200
    def get_chat_lists_by_chat_ids(
        self, chat_ids: List[str], skip: int = 0, limit: int = 50
    ) -> List[ChatModel]:
        return [
            ChatModel(**model_to_dict(chat))
            for chat in Chat.select()
Timothy J. Baek's avatar
Timothy J. Baek committed
201
            .where(Chat.archived == False)
Timothy J. Baek's avatar
Timothy J. Baek committed
202
203
204
205
            .where(Chat.id.in_(chat_ids))
            .order_by(Chat.timestamp.desc())
        ]

206
207
208
    def get_all_chats(self) -> List[ChatModel]:
        return [
            ChatModel(**model_to_dict(chat))
Timothy J. Baek's avatar
Timothy J. Baek committed
209
210
211
            for chat in Chat.select()
            .where(Chat.archived == False)
            .order_by(Chat.timestamp.desc())
212
213
        ]

Timothy J. Baek's avatar
Timothy J. Baek committed
214
215
    def get_all_chats_by_user_id(self, user_id: str) -> List[ChatModel]:
        return [
Timothy J. Baek's avatar
Timothy J. Baek committed
216
217
            ChatModel(**model_to_dict(chat))
            for chat in Chat.select()
Timothy J. Baek's avatar
Timothy J. Baek committed
218
            .where(Chat.archived == False)
Timothy J. Baek's avatar
Timothy J. Baek committed
219
220
            .where(Chat.user_id == user_id)
            .order_by(Chat.timestamp.desc())
Timothy J. Baek's avatar
Timothy J. Baek committed
221
222
        ]

Timothy J. Baek's avatar
Timothy J. Baek committed
223
224
225
226
227
228
229
    def get_chat_by_id(self, id: str) -> Optional[ChatModel]:
        try:
            chat = Chat.get(Chat.id == id)
            return ChatModel(**model_to_dict(chat))
        except:
            return None

230
231
232
233
234
235
236
237
238
239
240
241
    def get_chat_by_share_id(self, id: str) -> Optional[ChatModel]:
        try:
            chat = Chat.get(Chat.share_id == id)

            if chat:
                chat = Chat.get(Chat.id == id)
                return ChatModel(**model_to_dict(chat))
            else:
                return None
        except:
            return None

Timothy J. Baek's avatar
Timothy J. Baek committed
242
    def get_chat_by_id_and_user_id(self, id: str, user_id: str) -> Optional[ChatModel]:
Timothy J. Baek's avatar
Timothy J. Baek committed
243
244
245
246
247
248
249
250
251
252
253
254
        try:
            chat = Chat.get(Chat.id == id, Chat.user_id == user_id)
            return ChatModel(**model_to_dict(chat))
        except:
            return None

    def get_chats(self, skip: int = 0, limit: int = 50) -> List[ChatModel]:
        return [
            ChatModel(**model_to_dict(chat))
            for chat in Chat.select().limit(limit).offset(skip)
        ]

255
256
    def delete_chat_by_id_and_user_id(self, id: str, user_id: str) -> bool:
        try:
Timothy J. Baek's avatar
Timothy J. Baek committed
257
            query = Chat.delete().where((Chat.id == id) & (Chat.user_id == user_id))
258
259
            query.execute()  # Remove the rows, return number of rows removed.

Timothy J. Baek's avatar
Timothy J. Baek committed
260
            return True and self.delete_shared_chat_by_chat_id(id)
261
262
263
        except:
            return False

264
265
    def delete_chats_by_user_id(self, user_id: str) -> bool:
        try:
266
267
268

            self.delete_shared_chats_by_user_id(user_id)

269
270
271
            query = Chat.delete().where(Chat.user_id == user_id)
            query.execute()  # Remove the rows, return number of rows removed.

272
            return True
Timothy J. Baek's avatar
Timothy J. Baek committed
273
274
275
276
277
278
279
280
281
282
283
284
285
        except:
            return False

    def delete_shared_chats_by_user_id(self, user_id: str) -> bool:
        try:
            shared_chat_ids = [
                f"shared-{chat.id}"
                for chat in Chat.select().where(Chat.user_id == user_id)
            ]

            query = Chat.delete().where(Chat.user_id << shared_chat_ids)
            query.execute()  # Remove the rows, return number of rows removed.

286
287
288
289
            return True
        except:
            return False

Timothy J. Baek's avatar
Timothy J. Baek committed
290
291

Chats = ChatTable(DB)