S3ReaderWriter.py 2.75 KB
Newer Older
kernel.h@qq.com's avatar
kernel.h@qq.com committed
1
2


liukaiwen's avatar
liukaiwen committed
3
4
5
6
7
8
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key
import boto3
from loguru import logger
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
kernel.h@qq.com's avatar
kernel.h@qq.com committed
9
10


liukaiwen's avatar
liukaiwen committed
11
12
13
class S3ReaderWriter(AbsReaderWriter):
    def __init__(self, s3_profile):
        self.client = self._get_client(s3_profile)
kernel.h@qq.com's avatar
kernel.h@qq.com committed
14

liukaiwen's avatar
liukaiwen committed
15
    def _get_client(self, s3_profile):
kernel.h@qq.com's avatar
kernel.h@qq.com committed
16

liukaiwen's avatar
liukaiwen committed
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
        ak, sk, end_point, addressing_style = parse_aws_param(s3_profile)
        s3_client = boto3.client(
            service_name="s3",
            aws_access_key_id=ak,
            aws_secret_access_key=sk,
            endpoint_url=end_point,
            config=Config(s3={"addressing_style": addressing_style},
                          retries={'max_attempts': 5, 'mode': 'standard'}),
        )

        return s3_client
    def read(self, s3_path, mode="text", encoding="utf-8"):
        bucket_name, bucket_key = parse_bucket_key(s3_path)
        res = self.client.get_object(Bucket=bucket_name, Key=bucket_key)
        body = res["Body"].read()
        if mode == 'text':
            data = body.decode(encoding)  # Decode bytes to text
        elif mode == 'binary':
            data = body
        else:
            raise ValueError("Invalid mode. Use 'text' or 'binary'.")
        return data

    def write(self, data, s3_path, mode="text", encoding="utf-8"):
        if mode == 'text':
            body = data.encode(encoding)  # Encode text data as bytes
        elif mode == 'binary':
            body = data
        else:
            raise ValueError("Invalid mode. Use 'text' or 'binary'.")
        bucket_name, bucket_key = parse_bucket_key(s3_path)
        self.client.put_object(Body=body, Bucket=bucket_name, Key=bucket_key)
liukaiwen's avatar
liukaiwen committed
49
        logger.info(f"内容已写入 {s3_path} ")
liukaiwen's avatar
liukaiwen committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67


if __name__ == "__main__":
    # Config the connection info
    profile = {
        'ak': '',
        'sk': '',
        'endpoint': ''
    }
    # Create an S3ReaderWriter object
    s3_reader_writer = S3ReaderWriter(profile)

    # Write text data to S3
    text_data = "This is some text data"
    s3_reader_writer.write(data=text_data, s3_path = "s3://bucket_name/ebook/test/test.json", mode='text')

    # Read text data from S3
    text_data_read = s3_reader_writer.read(s3_path = "s3://bucket_name/ebook/test/test.json", mode='text')
liukaiwen's avatar
liukaiwen committed
68
    logger.info(f"Read text data from S3: {text_data_read}")
liukaiwen's avatar
liukaiwen committed
69
70
71
72
73
74
    # Write binary data to S3
    binary_data = b"This is some binary data"
    s3_reader_writer.write(data=text_data, s3_path = "s3://bucket_name/ebook/test/test2.json", mode='binary')

    # Read binary data from S3
    binary_data_read = s3_reader_writer.read(s3_path = "s3://bucket_name/ebook/test/test2.json", mode='binary')
liukaiwen's avatar
liukaiwen committed
75
    logger.info(f"Read binary data from S3: {binary_data_read}")