db.py 2.8 KB
Newer Older
1
import os
2
import logging
3
import json
4
from contextlib import contextmanager
Timothy J. Baek's avatar
Timothy J. Baek committed
5
6
7
8

from peewee_migrate import Router
from apps.webui.internal.wrappers import register_connection

9
10
from typing import Optional, Any
from typing_extensions import Self
Timothy J. Baek's avatar
Timothy J. Baek committed
11

12
13
from sqlalchemy import create_engine, types, Dialect
from sqlalchemy.ext.declarative import declarative_base
14
from sqlalchemy.orm import sessionmaker, scoped_session
15
from sqlalchemy.sql.type_api import _T
16

17
from config import SRC_LOG_LEVELS, DATA_DIR, DATABASE_URL, BACKEND_DIR
18

19
20
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["DB"])
21

Timothy J. Baek's avatar
Timothy J. Baek committed
22

23
24
25
26
27
28
29
30
31
32
33
34
35
36
class JSONField(types.TypeDecorator):
    impl = types.Text
    cache_ok = True

    def process_bind_param(self, value: Optional[_T], dialect: Dialect) -> Any:
        return json.dumps(value)

    def process_result_value(self, value: Optional[_T], dialect: Dialect) -> Any:
        if value is not None:
            return json.loads(value)

    def copy(self, **kw: Any) -> Self:
        return JSONField(self.impl.length)

37
38
39
40
41
42
43
    def db_value(self, value):
        return json.dumps(value)

    def python_value(self, value):
        if value is not None:
            return json.loads(value)

Timothy J. Baek's avatar
Timothy J. Baek committed
44

45
46
47
48
# Check if the file exists
if os.path.exists(f"{DATA_DIR}/ollama.db"):
    # Rename the file
    os.rename(f"{DATA_DIR}/ollama.db", f"{DATA_DIR}/webui.db")
49
    log.info("Database migrated from Ollama-WebUI successfully.")
50
51
52
else:
    pass

Timothy J. Baek's avatar
Timothy J. Baek committed
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

# Workaround to handle the peewee migration
# This is required to ensure the peewee migration is handled before the alembic migration
def handle_peewee_migration():
    try:
        db = register_connection(DATABASE_URL)
        migrate_dir = BACKEND_DIR / "apps" / "webui" / "internal" / "migrations"
        router = Router(db, logger=log, migrate_dir=migrate_dir)
        router.run()
        db.close()

        # check if db connection has been closed

    except Exception as e:
        log.error(f"Failed to initialize the database connection: {e}")
        raise

    finally:
        # Properly closing the database connection
        if db and not db.is_closed():
            db.close()

        # Assert if db connection has been closed
        assert db.is_closed(), "Database connection is still open."


handle_peewee_migration()


82
83
84
85
86
87
88
SQLALCHEMY_DATABASE_URL = DATABASE_URL
if "sqlite" in SQLALCHEMY_DATABASE_URL:
    engine = create_engine(
        SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
    )
else:
    engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
Timothy J. Baek's avatar
Timothy J. Baek committed
89
90


91
92
93
SessionLocal = sessionmaker(
    autocommit=False, autoflush=False, bind=engine, expire_on_commit=False
)
94
Base = declarative_base()
95
Session = scoped_session(SessionLocal)
Timothy J. Baek's avatar
Timothy J. Baek committed
96
97
98


# Dependency
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
99
def get_session():
Timothy J. Baek's avatar
Timothy J. Baek committed
100
101
102
103
104
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
105
106
107


get_db = contextmanager(get_session)