wrappers.py 2.12 KB
Newer Older
1
from contextvars import ContextVar
2
from peewee import *
3
4
from peewee import PostgresqlDatabase, InterfaceError as PeeWeeInterfaceError

5
import logging
6
from playhouse.db_url import connect, parse
perf3ct's avatar
format  
perf3ct committed
7
from playhouse.shortcuts import ReconnectMixin
8
9
10
11
12

from config import SRC_LOG_LEVELS

log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["DB"])
13
14
15
16

db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())

perf3ct's avatar
format  
perf3ct committed
17

18
class PeeweeConnectionState(object):
19
20
21
22
23
24
25
26
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
27
28
        value = self._state.get()[name]
        return value
29

perf3ct's avatar
format  
perf3ct committed
30

31
32
33
class CustomReconnectMixin(ReconnectMixin):
    reconnect_errors = (
        # psycopg2
perf3ct's avatar
format  
perf3ct committed
34
35
        (OperationalError, "termin"),
        (InterfaceError, "closed"),
36
        # peewee
perf3ct's avatar
format  
perf3ct committed
37
        (PeeWeeInterfaceError, "closed"),
38
39
    )

perf3ct's avatar
format  
perf3ct committed
40

41
42
43
class ReconnectingPostgresqlDatabase(CustomReconnectMixin, PostgresqlDatabase):
    pass

perf3ct's avatar
format  
perf3ct committed
44

45
46
47
def register_connection(db_url):
    db = connect(db_url)
    if isinstance(db, PostgresqlDatabase):
48
        # Enable autoconnect for SQLite databases, managed by Peewee
perf3ct's avatar
perf3ct committed
49
        db.autoconnect = True
50
        db.reuse_if_open = True
51
        log.info("Connected to PostgreSQL database")
perf3ct's avatar
format  
perf3ct committed
52
53

        # Get the connection details
54
        connection = parse(db_url)
perf3ct's avatar
format  
perf3ct committed
55
56
57
58
59
60
61
62
63

        # Use our custom database class that supports reconnection
        db = ReconnectingPostgresqlDatabase(
            connection["database"],
            user=connection["user"],
            password=connection["password"],
            host=connection["host"],
            port=connection["port"],
        )
64
        db.connect(reuse_if_open=True)
65
    elif isinstance(db, SqliteDatabase):
66
        # Enable autoconnect for SQLite databases, managed by Peewee
perf3ct's avatar
perf3ct committed
67
        db.autoconnect = True
68
        db.reuse_if_open = True
69
        log.info("Connected to SQLite database")
70
    else:
perf3ct's avatar
format  
perf3ct committed
71
72
        raise ValueError("Unsupported database connection")
    return db