S3ReaderWriter.py 5.19 KB
Newer Older
kernel.h@qq.com's avatar
kernel.h@qq.com committed
1
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
赵小蒙's avatar
赵小蒙 committed
2
from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key, join_path
liukaiwen's avatar
liukaiwen committed
3
4
5
import boto3
from loguru import logger
from botocore.config import Config
kernel.h@qq.com's avatar
kernel.h@qq.com committed
6
7


liukaiwen's avatar
liukaiwen committed
8
class S3ReaderWriter(AbsReaderWriter):
icecraft's avatar
icecraft committed
9
10
11
12
13
14
15
16
    def __init__(
        self,
        ak: str,
        sk: str,
        endpoint_url: str,
        addressing_style: str = "auto",
        parent_path: str = "",
    ):
liukaiwen's avatar
liukaiwen committed
17
        self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
liukaiwen's avatar
liukaiwen committed
18
        self.path = parent_path
kernel.h@qq.com's avatar
kernel.h@qq.com committed
19

liukaiwen's avatar
liukaiwen committed
20
    def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
liukaiwen's avatar
liukaiwen committed
21
22
23
24
        s3_client = boto3.client(
            service_name="s3",
            aws_access_key_id=ak,
            aws_secret_access_key=sk,
liukaiwen's avatar
liukaiwen committed
25
            endpoint_url=endpoint_url,
icecraft's avatar
icecraft committed
26
27
28
29
            config=Config(
                s3={"addressing_style": addressing_style},
                retries={"max_attempts": 5, "mode": "standard"},
            ),
liukaiwen's avatar
liukaiwen committed
30
31
        )
        return s3_client
liukaiwen's avatar
liukaiwen committed
32

icecraft's avatar
icecraft committed
33
    def read(self, s3_relative_path, mode=AbsReaderWriter.MODE_TXT, encoding="utf-8"):
liukaiwen's avatar
liukaiwen committed
34
35
36
        if s3_relative_path.startswith("s3://"):
            s3_path = s3_relative_path
        else:
赵小蒙's avatar
赵小蒙 committed
37
            s3_path = join_path(self.path, s3_relative_path)
liukaiwen's avatar
liukaiwen committed
38
39
        bucket_name, key = parse_bucket_key(s3_path)
        res = self.client.get_object(Bucket=bucket_name, Key=key)
liukaiwen's avatar
liukaiwen committed
40
        body = res["Body"].read()
icecraft's avatar
icecraft committed
41
        if mode == AbsReaderWriter.MODE_TXT:
liukaiwen's avatar
liukaiwen committed
42
            data = body.decode(encoding)  # Decode bytes to text
icecraft's avatar
icecraft committed
43
        elif mode == AbsReaderWriter.MODE_BIN:
liukaiwen's avatar
liukaiwen committed
44
45
46
47
48
            data = body
        else:
            raise ValueError("Invalid mode. Use 'text' or 'binary'.")
        return data

icecraft's avatar
icecraft committed
49
    def write(self, content, s3_relative_path, mode=AbsReaderWriter.MODE_TXT, encoding="utf-8"):
liukaiwen's avatar
liukaiwen committed
50
51
52
        if s3_relative_path.startswith("s3://"):
            s3_path = s3_relative_path
        else:
赵小蒙's avatar
赵小蒙 committed
53
            s3_path = join_path(self.path, s3_relative_path)
icecraft's avatar
icecraft committed
54
        if mode == AbsReaderWriter.MODE_TXT:
liukaiwen's avatar
liukaiwen committed
55
            body = content.encode(encoding)  # Encode text data as bytes
icecraft's avatar
icecraft committed
56
        elif mode == AbsReaderWriter.MODE_BIN:
liukaiwen's avatar
liukaiwen committed
57
            body = content
liukaiwen's avatar
liukaiwen committed
58
59
        else:
            raise ValueError("Invalid mode. Use 'text' or 'binary'.")
liukaiwen's avatar
liukaiwen committed
60
61
        bucket_name, key = parse_bucket_key(s3_path)
        self.client.put_object(Body=body, Bucket=bucket_name, Key=key)
liukaiwen's avatar
liukaiwen committed
62
        logger.info(f"内容已写入 {s3_path} ")
liukaiwen's avatar
liukaiwen committed
63

icecraft's avatar
icecraft committed
64
    def read_offset(self, path: str, offset=0, limit=None) -> bytes:
liukaiwen's avatar
liukaiwen committed
65
66
67
        if path.startswith("s3://"):
            s3_path = path
        else:
赵小蒙's avatar
赵小蒙 committed
68
            s3_path = join_path(self.path, path)
liukaiwen's avatar
liukaiwen committed
69
70
        bucket_name, key = parse_bucket_key(s3_path)

icecraft's avatar
icecraft committed
71
72
73
        range_header = (
            f"bytes={offset}-{offset+limit-1}" if limit else f"bytes={offset}-"
        )
liukaiwen's avatar
liukaiwen committed
74
        res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header)
icecraft's avatar
icecraft committed
75
        return res["Body"].read()
liukaiwen's avatar
liukaiwen committed
76

liukaiwen's avatar
liukaiwen committed
77
78

if __name__ == "__main__":
icecraft's avatar
icecraft committed
79
80
81
82
83
84
85
86
87
88
89
    if 0:
        # Config the connection info
        ak = ""
        sk = ""
        endpoint_url = ""
        addressing_style = "auto"
        bucket_name = ""
        # Create an S3ReaderWriter object
        s3_reader_writer = S3ReaderWriter(
            ak, sk, endpoint_url, addressing_style, "s3://bucket_name/"
        )
liukaiwen's avatar
liukaiwen committed
90

icecraft's avatar
icecraft committed
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
        # Write text data to S3
        text_data = "This is some text data"
        s3_reader_writer.write(
            text_data,
            s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json",
            mode=AbsReaderWriter.MODE_TXT,
        )

        # Read text data from S3
        text_data_read = s3_reader_writer.read(
            s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=AbsReaderWriter.MODE_TXT
        )
        logger.info(f"Read text data from S3: {text_data_read}")
        # Write binary data to S3
        binary_data = b"This is some binary data"
        s3_reader_writer.write(
            text_data,
            s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json",
            mode=AbsReaderWriter.MODE_BIN,
        )
liukaiwen's avatar
liukaiwen committed
111

icecraft's avatar
icecraft committed
112
113
114
115
116
117
118
119
120
121
122
123
124
125
        # Read binary data from S3
        binary_data_read = s3_reader_writer.read(
            s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=AbsReaderWriter.MODE_BIN
        )
        logger.info(f"Read binary data from S3: {binary_data_read}")

        # Range Read text data from S3
        binary_data_read = s3_reader_writer.read_offset(
            path=f"s3://{bucket_name}/ebook/test/test.json", offset=0, limit=10
        )
        logger.info(f"Read binary data from S3: {binary_data_read}")
    if 1:
        import os
        import json
liukaiwen's avatar
liukaiwen committed
126

icecraft's avatar
icecraft committed
127
128
129
130
131
132
133
134
135
136
137
138
        ak = os.getenv("AK", "")
        sk = os.getenv("SK", "")
        endpoint_url = os.getenv("ENDPOINT", "")
        bucket = os.getenv("S3_BUCKET", "")
        prefix = os.getenv("S3_PREFIX", "")
        key_basename = os.getenv("S3_KEY_BASENAME", "")
        s3_reader_writer = S3ReaderWriter(
            ak, sk, endpoint_url, "auto", f"s3://{bucket}/{prefix}"
        )
        content_bin = s3_reader_writer.read_offset(key_basename)
        assert content_bin[:10] == b'{"track_id'
        assert content_bin[-10:] == b'r":null}}\n'
liukaiwen's avatar
liukaiwen committed
139

icecraft's avatar
icecraft committed
140
141
142
        content_bin = s3_reader_writer.read_offset(key_basename, offset=424, limit=426)
        jso = json.dumps(content_bin.decode("utf-8"))
        print(jso)