S3ReaderWriter.py 4.37 KB
Newer Older
kernel.h@qq.com's avatar
kernel.h@qq.com committed
1
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
赵小蒙's avatar
赵小蒙 committed
2
from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key, join_path
liukaiwen's avatar
liukaiwen committed
3
4
5
6
import boto3
from loguru import logger
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
liukaiwen's avatar
liukaiwen committed
7
8
9
10
import os

MODE_TXT = "text"
MODE_BIN = "binary"
kernel.h@qq.com's avatar
kernel.h@qq.com committed
11
12


liukaiwen's avatar
liukaiwen committed
13
class S3ReaderWriter(AbsReaderWriter):
liukaiwen's avatar
liukaiwen committed
14
    def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str = 'auto', parent_path: str = ''):
liukaiwen's avatar
liukaiwen committed
15
        self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
liukaiwen's avatar
liukaiwen committed
16
        self.path = parent_path
kernel.h@qq.com's avatar
kernel.h@qq.com committed
17

liukaiwen's avatar
liukaiwen committed
18
    def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
liukaiwen's avatar
liukaiwen committed
19
20
21
22
        s3_client = boto3.client(
            service_name="s3",
            aws_access_key_id=ak,
            aws_secret_access_key=sk,
liukaiwen's avatar
liukaiwen committed
23
            endpoint_url=endpoint_url,
liukaiwen's avatar
liukaiwen committed
24
25
26
27
            config=Config(s3={"addressing_style": addressing_style},
                          retries={'max_attempts': 5, 'mode': 'standard'}),
        )
        return s3_client
liukaiwen's avatar
liukaiwen committed
28
29
30
31
32

    def read(self, s3_relative_path, mode=MODE_TXT, encoding="utf-8"):
        if s3_relative_path.startswith("s3://"):
            s3_path = s3_relative_path
        else:
赵小蒙's avatar
赵小蒙 committed
33
            s3_path = join_path(self.path, s3_relative_path)
liukaiwen's avatar
liukaiwen committed
34
35
        bucket_name, key = parse_bucket_key(s3_path)
        res = self.client.get_object(Bucket=bucket_name, Key=key)
liukaiwen's avatar
liukaiwen committed
36
        body = res["Body"].read()
liukaiwen's avatar
liukaiwen committed
37
        if mode == MODE_TXT:
liukaiwen's avatar
liukaiwen committed
38
            data = body.decode(encoding)  # Decode bytes to text
liukaiwen's avatar
liukaiwen committed
39
        elif mode == MODE_BIN:
liukaiwen's avatar
liukaiwen committed
40
41
42
43
44
            data = body
        else:
            raise ValueError("Invalid mode. Use 'text' or 'binary'.")
        return data

liukaiwen's avatar
liukaiwen committed
45
46
47
48
    def write(self, content, s3_relative_path, mode=MODE_TXT, encoding="utf-8"):
        if s3_relative_path.startswith("s3://"):
            s3_path = s3_relative_path
        else:
赵小蒙's avatar
赵小蒙 committed
49
            s3_path = join_path(self.path, s3_relative_path)
liukaiwen's avatar
liukaiwen committed
50
51
52
53
        if mode == MODE_TXT:
            body = content.encode(encoding)  # Encode text data as bytes
        elif mode == MODE_BIN:
            body = content
liukaiwen's avatar
liukaiwen committed
54
55
        else:
            raise ValueError("Invalid mode. Use 'text' or 'binary'.")
liukaiwen's avatar
liukaiwen committed
56
57
        bucket_name, key = parse_bucket_key(s3_path)
        self.client.put_object(Body=body, Bucket=bucket_name, Key=key)
liukaiwen's avatar
liukaiwen committed
58
        logger.info(f"内容已写入 {s3_path} ")
liukaiwen's avatar
liukaiwen committed
59

liukaiwen's avatar
liukaiwen committed
60
61
62
63
    def read_jsonl(self, path: str, byte_start=0, byte_end=None, mode=MODE_TXT, encoding='utf-8'):
        if path.startswith("s3://"):
            s3_path = path
        else:
赵小蒙's avatar
赵小蒙 committed
64
            s3_path = join_path(self.path, path)
liukaiwen's avatar
liukaiwen committed
65
66
67
68
69
70
71
72
73
74
75
76
77
        bucket_name, key = parse_bucket_key(s3_path)

        range_header = f'bytes={byte_start}-{byte_end}' if byte_end else f'bytes={byte_start}-'
        res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header)
        body = res["Body"].read()
        if mode == MODE_TXT:
            data = body.decode(encoding)  # Decode bytes to text
        elif mode == MODE_BIN:
            data = body
        else:
            raise ValueError("Invalid mode. Use 'text' or 'binary'.")
        return data

liukaiwen's avatar
liukaiwen committed
78
79
80

if __name__ == "__main__":
    # Config the connection info
liukaiwen's avatar
liukaiwen committed
81
82
83
    ak = ""
    sk = ""
    endpoint_url = ""
liukaiwen's avatar
liukaiwen committed
84
85
    addressing_style = "auto"
    bucket_name = ""
liukaiwen's avatar
liukaiwen committed
86
    # Create an S3ReaderWriter object
liukaiwen's avatar
liukaiwen committed
87
    s3_reader_writer = S3ReaderWriter(ak, sk, endpoint_url, addressing_style, "s3://bucket_name/")
liukaiwen's avatar
liukaiwen committed
88
89
90

    # Write text data to S3
    text_data = "This is some text data"
liukaiwen's avatar
liukaiwen committed
91
    s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT)
liukaiwen's avatar
liukaiwen committed
92
93

    # Read text data from S3
liukaiwen's avatar
liukaiwen committed
94
    text_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT)
liukaiwen's avatar
liukaiwen committed
95
    logger.info(f"Read text data from S3: {text_data_read}")
liukaiwen's avatar
liukaiwen committed
96
97
    # Write binary data to S3
    binary_data = b"This is some binary data"
liukaiwen's avatar
liukaiwen committed
98
    s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
liukaiwen's avatar
liukaiwen committed
99
100

    # Read binary data from S3
liukaiwen's avatar
liukaiwen committed
101
102
103
104
105
106
107
    binary_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
    logger.info(f"Read binary data from S3: {binary_data_read}")

    # Range Read text data from S3
    binary_data_read = s3_reader_writer.read_jsonl(path=f"s3://{bucket_name}/ebook/test/test.json",
                                                   byte_start=0, byte_end=10, mode=MODE_BIN)
    logger.info(f"Read binary data from S3: {binary_data_read}")