__init__.py 9.35 KB
Newer Older
LiangLiu's avatar
LiangLiu committed
1
2
import io
import json
LiangLiu's avatar
LiangLiu committed
3
import os
LiangLiu's avatar
LiangLiu committed
4
5
6
7
8
9
10
11
12

import torch
from PIL import Image

from lightx2v.deploy.common.utils import class_try_catch_async


class BaseDataManager:
    def __init__(self):
LiangLiu's avatar
LiangLiu committed
13
14
15
16
        self.template_images_dir = None
        self.template_audios_dir = None
        self.template_videos_dir = None
        self.template_tasks_dir = None
17
18
        self.podcast_temp_session_dir = None
        self.podcast_output_dir = None
LiangLiu's avatar
LiangLiu committed
19
20
21
22
23
24
25

    async def init(self):
        pass

    async def close(self):
        pass

LiangLiu's avatar
LiangLiu committed
26
27
28
29
30
31
    def fmt_path(self, base, filename, abs_path=None):
        if abs_path:
            return abs_path
        else:
            return os.path.join(base, filename)

LiangLiu's avatar
LiangLiu committed
32
33
34
35
36
37
38
39
40
41
    def to_device(self, data, device):
        if isinstance(data, dict):
            return {key: self.to_device(value, device) for key, value in data.items()}
        elif isinstance(data, list):
            return [self.to_device(item, device) for item in data]
        elif isinstance(data, torch.Tensor):
            return data.to(device)
        else:
            return data

LiangLiu's avatar
LiangLiu committed
42
    async def save_bytes(self, bytes_data, filename, abs_path=None):
LiangLiu's avatar
LiangLiu committed
43
44
        raise NotImplementedError

LiangLiu's avatar
LiangLiu committed
45
    async def load_bytes(self, filename, abs_path=None):
LiangLiu's avatar
LiangLiu committed
46
47
        raise NotImplementedError

LiangLiu's avatar
LiangLiu committed
48
    async def delete_bytes(self, filename, abs_path=None):
LiangLiu's avatar
LiangLiu committed
49
50
        raise NotImplementedError

LiangLiu's avatar
LiangLiu committed
51
52
53
    async def presign_url(self, filename, abs_path=None):
        return None

LiangLiu's avatar
LiangLiu committed
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
    async def recurrent_save(self, data, prefix):
        if isinstance(data, dict):
            return {k: await self.recurrent_save(v, f"{prefix}-{k}") for k, v in data.items()}
        elif isinstance(data, list):
            return [await self.recurrent_save(v, f"{prefix}-{idx}") for idx, v in enumerate(data)]
        elif isinstance(data, torch.Tensor):
            save_path = prefix + ".pt"
            await self.save_tensor(data, save_path)
            return save_path
        elif isinstance(data, Image.Image):
            save_path = prefix + ".png"
            await self.save_image(data, save_path)
            return save_path
        else:
            return data

    async def recurrent_load(self, data, device, prefix):
        if isinstance(data, dict):
            return {k: await self.recurrent_load(v, device, f"{prefix}-{k}") for k, v in data.items()}
        elif isinstance(data, list):
            return [await self.recurrent_load(v, device, f"{prefix}-{idx}") for idx, v in enumerate(data)]
        elif isinstance(data, str) and data == prefix + ".pt":
            return await self.load_tensor(data, device)
        elif isinstance(data, str) and data == prefix + ".png":
            return await self.load_image(data)
        else:
            return data

    async def recurrent_delete(self, data, prefix):
        if isinstance(data, dict):
            return {k: await self.recurrent_delete(v, f"{prefix}-{k}") for k, v in data.items()}
        elif isinstance(data, list):
            return [await self.recurrent_delete(v, f"{prefix}-{idx}") for idx, v in enumerate(data)]
        elif isinstance(data, str) and data == prefix + ".pt":
            await self.delete_bytes(data)
        elif isinstance(data, str) and data == prefix + ".png":
            await self.delete_bytes(data)

    @class_try_catch_async
    async def save_object(self, data, filename):
        data = await self.recurrent_save(data, filename)
        bytes_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
        await self.save_bytes(bytes_data, filename)

    @class_try_catch_async
    async def load_object(self, filename, device):
        bytes_data = await self.load_bytes(filename)
        data = json.loads(bytes_data.decode("utf-8"))
        data = await self.recurrent_load(data, device, filename)
        return data

    @class_try_catch_async
    async def delete_object(self, filename):
        bytes_data = await self.load_bytes(filename)
        data = json.loads(bytes_data.decode("utf-8"))
        await self.recurrent_delete(data, filename)
        await self.delete_bytes(filename)

    @class_try_catch_async
    async def save_tensor(self, data: torch.Tensor, filename):
        buffer = io.BytesIO()
        torch.save(data.to("cpu"), buffer)
        await self.save_bytes(buffer.getvalue(), filename)

    @class_try_catch_async
    async def load_tensor(self, filename, device):
        bytes_data = await self.load_bytes(filename)
        buffer = io.BytesIO(bytes_data)
        t = torch.load(io.BytesIO(bytes_data))
        t = t.to(device)
        return t

    @class_try_catch_async
    async def save_image(self, data: Image.Image, filename):
        buffer = io.BytesIO()
        data.save(buffer, format="PNG")
        await self.save_bytes(buffer.getvalue(), filename)

    @class_try_catch_async
    async def load_image(self, filename):
        bytes_data = await self.load_bytes(filename)
        buffer = io.BytesIO(bytes_data)
        img = Image.open(buffer).convert("RGB")
        return img

    def get_delete_func(self, type):
        maps = {
            "TENSOR": self.delete_bytes,
            "IMAGE": self.delete_bytes,
            "OBJECT": self.delete_object,
            "VIDEO": self.delete_bytes,
        }
        return maps[type]

LiangLiu's avatar
LiangLiu committed
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
    def get_template_dir(self, template_type):
        if template_type == "audios":
            return self.template_audios_dir
        elif template_type == "images":
            return self.template_images_dir
        elif template_type == "videos":
            return self.template_videos_dir
        elif template_type == "tasks":
            return self.template_tasks_dir
        else:
            raise ValueError(f"Invalid template type: {template_type}")

    @class_try_catch_async
    async def list_template_files(self, template_type):
        template_dir = self.get_template_dir(template_type)
        if template_dir is None:
            return []
        return await self.list_files(base_dir=template_dir)

    @class_try_catch_async
    async def load_template_file(self, template_type, filename):
        template_dir = self.get_template_dir(template_type)
        if template_dir is None:
            return None
        return await self.load_bytes(None, abs_path=os.path.join(template_dir, filename))

    @class_try_catch_async
    async def template_file_exists(self, template_type, filename):
        template_dir = self.get_template_dir(template_type)
        if template_dir is None:
            return None
        return await self.file_exists(None, abs_path=os.path.join(template_dir, filename))

    @class_try_catch_async
    async def delete_template_file(self, template_type, filename):
        template_dir = self.get_template_dir(template_type)
        if template_dir is None:
            return None
        return await self.delete_bytes(None, abs_path=os.path.join(template_dir, filename))

    @class_try_catch_async
    async def save_template_file(self, template_type, filename, bytes_data):
        template_dir = self.get_template_dir(template_type)
        if template_dir is None:
            return None
193
194
        abs_path = os.path.join(template_dir, filename)
        return await self.save_bytes(bytes_data, None, abs_path=abs_path)
LiangLiu's avatar
LiangLiu committed
195
196
197
198
199
200
201
202

    @class_try_catch_async
    async def presign_template_url(self, template_type, filename):
        template_dir = self.get_template_dir(template_type)
        if template_dir is None:
            return None
        return await self.presign_url(None, abs_path=os.path.join(template_dir, filename))

203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
    @class_try_catch_async
    async def list_podcast_temp_session_files(self, session_id):
        session_dir = os.path.join(self.podcast_temp_session_dir, session_id)
        return await self.list_files(base_dir=session_dir)

    @class_try_catch_async
    async def save_podcast_temp_session_file(self, session_id, filename, bytes_data):
        fpath = os.path.join(self.podcast_temp_session_dir, session_id, filename)
        await self.save_bytes(bytes_data, None, abs_path=fpath)

    @class_try_catch_async
    async def load_podcast_temp_session_file(self, session_id, filename):
        fpath = os.path.join(self.podcast_temp_session_dir, session_id, filename)
        return await self.load_bytes(None, abs_path=fpath)

    @class_try_catch_async
    async def delete_podcast_temp_session_file(self, session_id, filename):
        fpath = os.path.join(self.podcast_temp_session_dir, session_id, filename)
        return await self.delete_bytes(None, abs_path=fpath)

    @class_try_catch_async
    async def save_podcast_output_file(self, filename, bytes_data):
        fpath = os.path.join(self.podcast_output_dir, filename)
        await self.save_bytes(bytes_data, None, abs_path=fpath)

    @class_try_catch_async
    async def load_podcast_output_file(self, filename):
        fpath = os.path.join(self.podcast_output_dir, filename)
        return await self.load_bytes(None, abs_path=fpath)

    @class_try_catch_async
    async def delete_podcast_output_file(self, filename):
        fpath = os.path.join(self.podcast_output_dir, filename)
        return await self.delete_bytes(None, abs_path=fpath)

    @class_try_catch_async
    async def presign_podcast_output_url(self, filename):
        fpath = os.path.join(self.podcast_output_dir, filename)
        return await self.presign_url(None, abs_path=fpath)

LiangLiu's avatar
LiangLiu committed
243
244
245
246
247
248

# Import data manager implementations
from .local_data_manager import LocalDataManager  # noqa
from .s3_data_manager import S3DataManager  # noqa

__all__ = ["BaseDataManager", "LocalDataManager", "S3DataManager"]