main.py 4.53 KB
Newer Older
Timothy J. Baek's avatar
Timothy J. Baek committed
1
import socketio
2
3
import asyncio

Timothy J. Baek's avatar
Timothy J. Baek committed
4
5
6
7
8
9
10
11

from apps.webui.models.users import Users
from utils.utils import decode_token

sio = socketio.AsyncServer(cors_allowed_origins=[], async_mode="asgi")
app = socketio.ASGIApp(sio, socketio_path="/ws/socket.io")

# Dictionary to maintain the user pool
Timothy J. Baek's avatar
Timothy J. Baek committed
12

Timothy J. Baek's avatar
Timothy J. Baek committed
13
SESSION_POOL = {}
Timothy J. Baek's avatar
Timothy J. Baek committed
14
USER_POOL = {}
15
16
17
USAGE_POOL = {}
# Timeout duration in seconds
TIMEOUT_DURATION = 3
Timothy J. Baek's avatar
Timothy J. Baek committed
18
19
20
21
22


@sio.event
async def connect(sid, environ, auth):
    user = None
Timothy J. Baek's avatar
Timothy J. Baek committed
23
24
25
26
27
28
29
    if auth and "token" in auth:
        data = decode_token(auth["token"])

        if data is not None and "id" in data:
            user = Users.get_user_by_id(data["id"])

        if user:
Timothy J. Baek's avatar
Timothy J. Baek committed
30
31
32
33
34
35
            SESSION_POOL[sid] = user.id
            if user.id in USER_POOL:
                USER_POOL[user.id].append(sid)
            else:
                USER_POOL[user.id] = [sid]

Timothy J. Baek's avatar
Timothy J. Baek committed
36
            print(f"user {user.name}({user.id}) connected with session ID {sid}")
Timothy J. Baek's avatar
Timothy J. Baek committed
37

Timothy J. Baek's avatar
Timothy J. Baek committed
38
            await sio.emit("user-count", {"count": len(set(USER_POOL))})
Timothy J. Baek's avatar
Timothy J. Baek committed
39
            await sio.emit("usage", {"models": get_models_in_use()})
Timothy J. Baek's avatar
Timothy J. Baek committed
40
41


Timothy J. Baek's avatar
Timothy J. Baek committed
42
43
44
45
46
47
48
49
50
51
52
53
54
@sio.on("user-join")
async def user_join(sid, data):
    print("user-join", sid, data)

    auth = data["auth"] if "auth" in data else None

    if auth and "token" in auth:
        data = decode_token(auth["token"])

        if data is not None and "id" in data:
            user = Users.get_user_by_id(data["id"])

        if user:
Timothy J. Baek's avatar
Timothy J. Baek committed
55
56
57
58
59
60
            SESSION_POOL[sid] = user.id
            if user.id in USER_POOL:
                USER_POOL[user.id].append(sid)
            else:
                USER_POOL[user.id] = [sid]

Timothy J. Baek's avatar
Timothy J. Baek committed
61
62
63
64
65
            print(f"user {user.name}({user.id}) connected with session ID {sid}")

            await sio.emit("user-count", {"count": len(set(USER_POOL))})


Timothy J. Baek's avatar
Timothy J. Baek committed
66
67
68
69
@sio.on("user-count")
async def user_count(sid):
    await sio.emit("user-count", {"count": len(set(USER_POOL))})

Timothy J. Baek's avatar
Timothy J. Baek committed
70

71
72
73
def get_models_in_use():
    # Aggregate all models in use
    models_in_use = []
Timothy J. Baek's avatar
Timothy J. Baek committed
74
75
    for model_id, data in USAGE_POOL.items():
        models_in_use.append(model_id)
76
77
78
79
80
81
82
83

    return models_in_use


@sio.on("usage")
async def usage(sid, data):
    model_id = data["model"]

Timothy J. Baek's avatar
Timothy J. Baek committed
84
85
86
    # Cancel previous callback if there is one
    if model_id in USAGE_POOL:
        USAGE_POOL[model_id]["callback"].cancel()
87

Timothy J. Baek's avatar
Timothy J. Baek committed
88
    # Store the new usage data and task
89

Timothy J. Baek's avatar
Timothy J. Baek committed
90
91
92
    if model_id in USAGE_POOL:
        USAGE_POOL[model_id]["sids"].append(sid)
        USAGE_POOL[model_id]["sids"] = list(set(USAGE_POOL[model_id]["sids"]))
93
94

    else:
Timothy J. Baek's avatar
Timothy J. Baek committed
95
        USAGE_POOL[model_id] = {"sids": [sid]}
96
97

    # Schedule a task to remove the usage data after TIMEOUT_DURATION
Timothy J. Baek's avatar
Timothy J. Baek committed
98
99
100
    USAGE_POOL[model_id]["callback"] = asyncio.create_task(
        remove_after_timeout(sid, model_id)
    )
101
102

    # Broadcast the usage data to all clients
Timothy J. Baek's avatar
Timothy J. Baek committed
103
    await sio.emit("usage", {"models": get_models_in_use()})
104
105
106
107
108


async def remove_after_timeout(sid, model_id):
    try:
        await asyncio.sleep(TIMEOUT_DURATION)
Timothy J. Baek's avatar
Timothy J. Baek committed
109
110
111
112
113
114
115
116
        if model_id in USAGE_POOL:
            print(USAGE_POOL[model_id]["sids"])
            USAGE_POOL[model_id]["sids"].remove(sid)
            USAGE_POOL[model_id]["sids"] = list(set(USAGE_POOL[model_id]["sids"]))

            if len(USAGE_POOL[model_id]["sids"]) == 0:
                del USAGE_POOL[model_id]

117
            # Broadcast the usage data to all clients
Timothy J. Baek's avatar
Timothy J. Baek committed
118
            await sio.emit("usage", {"models": get_models_in_use()})
119
120
121
122
123
    except asyncio.CancelledError:
        # Task was cancelled due to new 'usage' event
        pass


Timothy J. Baek's avatar
Timothy J. Baek committed
124
@sio.event
Timothy J. Baek's avatar
Timothy J. Baek committed
125
async def disconnect(sid):
Timothy J. Baek's avatar
Timothy J. Baek committed
126
127
128
    if sid in SESSION_POOL:
        user_id = SESSION_POOL[sid]
        del SESSION_POOL[sid]
Timothy J. Baek's avatar
Timothy J. Baek committed
129

Timothy J. Baek's avatar
Timothy J. Baek committed
130
131
132
133
134
        USER_POOL[user_id].remove(sid)

        if len(USER_POOL[user_id]) == 0:
            del USER_POOL[user_id]

Timothy J. Baek's avatar
Timothy J. Baek committed
135
        await sio.emit("user-count", {"count": len(USER_POOL)})
Timothy J. Baek's avatar
Timothy J. Baek committed
136
137
    else:
        print(f"Unknown session ID {sid} disconnected")
Timothy J. Baek's avatar
refac  
Timothy J. Baek committed
138
139


Michael Poluektov's avatar
Michael Poluektov committed
140
def get_event_emitter(request_info):
Timothy J. Baek's avatar
refac  
Timothy J. Baek committed
141
142
143
144
145
    async def __event_emitter__(event_data):
        await sio.emit(
            "chat-events",
            {
                "chat_id": request_info["chat_id"],
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
146
                "message_id": request_info["message_id"],
Timothy J. Baek's avatar
refac  
Timothy J. Baek committed
147
148
149
150
151
152
153
154
                "data": event_data,
            },
            to=request_info["session_id"],
        )

    return __event_emitter__


Michael Poluektov's avatar
Michael Poluektov committed
155
def get_event_call(request_info):
Timothy J. Baek's avatar
refac  
Timothy J. Baek committed
156
157
158
159
160
    async def __event_call__(event_data):
        response = await sio.call(
            "chat-events",
            {
                "chat_id": request_info["chat_id"],
Timothy J. Baek's avatar
fix  
Timothy J. Baek committed
161
                "message_id": request_info["message_id"],
Timothy J. Baek's avatar
refac  
Timothy J. Baek committed
162
163
164
165
166
167
168
                "data": event_data,
            },
            to=request_info["session_id"],
        )
        return response

    return __event_call__