async_io.py 2.4 KB
Newer Older
PengGao's avatar
PengGao committed
1
2
3
import asyncio
import io
from pathlib import Path
PengGao's avatar
PengGao committed
4
5
6
7
from typing import Union

import aiofiles
from PIL import Image
PengGao's avatar
PengGao committed
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from loguru import logger


async def load_image_async(path: Union[str, Path]) -> Image.Image:
    try:
        async with aiofiles.open(path, "rb") as f:
            data = await f.read()

        return await asyncio.to_thread(lambda: Image.open(io.BytesIO(data)).convert("RGB"))
    except Exception as e:
        logger.error(f"Failed to load image from {path}: {e}")
        raise


async def save_video_async(video_path: Union[str, Path], video_data: bytes):
    try:
        video_path = Path(video_path)
        video_path.parent.mkdir(parents=True, exist_ok=True)

        async with aiofiles.open(video_path, "wb") as f:
            await f.write(video_data)

        logger.info(f"Video saved to {video_path}")
    except Exception as e:
        logger.error(f"Failed to save video to {video_path}: {e}")
        raise


async def read_text_async(path: Union[str, Path], encoding: str = "utf-8") -> str:
    try:
        async with aiofiles.open(path, "r", encoding=encoding) as f:
            return await f.read()
    except Exception as e:
        logger.error(f"Failed to read text from {path}: {e}")
        raise


async def write_text_async(path: Union[str, Path], content: str, encoding: str = "utf-8"):
    try:
        path = Path(path)
        path.parent.mkdir(parents=True, exist_ok=True)

        async with aiofiles.open(path, "w", encoding=encoding) as f:
            await f.write(content)

        logger.info(f"Text written to {path}")
    except Exception as e:
        logger.error(f"Failed to write text to {path}: {e}")
        raise


async def exists_async(path: Union[str, Path]) -> bool:
    return await asyncio.to_thread(lambda: Path(path).exists())


async def read_bytes_async(path: Union[str, Path]) -> bytes:
    try:
        async with aiofiles.open(path, "rb") as f:
            return await f.read()
    except Exception as e:
        logger.error(f"Failed to read bytes from {path}: {e}")
        raise


async def write_bytes_async(path: Union[str, Path], data: bytes):
    try:
        path = Path(path)
        path.parent.mkdir(parents=True, exist_ok=True)

        async with aiofiles.open(path, "wb") as f:
            await f.write(data)

        logger.debug(f"Bytes written to {path}")
    except Exception as e:
        logger.error(f"Failed to write bytes to {path}: {e}")
        raise