wrappers.py 1.52 KB
Newer Older
1
2
from contextvars import ContextVar

3
from peewee import PostgresqlDatabase, InterfaceError as PeeWeeInterfaceError, _ConnectionState
4
from playhouse.db_url import register_database
5
from playhouse.pool import PooledPostgresqlDatabase
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from playhouse.shortcuts import ReconnectMixin
from psycopg2 import OperationalError
from psycopg2.errors import InterfaceError


db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(_ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


class CustomReconnectMixin(ReconnectMixin):
    reconnect_errors = (
29
        # default ReconnectMixin exceptions
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
        *ReconnectMixin.reconnect_errors,
        # psycopg2
        (OperationalError, 'termin'),
        (InterfaceError, 'closed'),
        # peewee
        (PeeWeeInterfaceError, 'closed'),
    )


class ReconnectingPostgresqlDatabase(CustomReconnectMixin, PostgresqlDatabase):
    pass


class ReconnectingPooledPostgresqlDatabase(CustomReconnectMixin, PooledPostgresqlDatabase):
    pass


def register_peewee_databases():
    register_database(ReconnectingPostgresqlDatabase, 'postgres', 'postgresql')
    register_database(ReconnectingPooledPostgresqlDatabase, 'postgres+pool', 'postgresql+pool')