s3.py 2.77 KB
Newer Older
赵小蒙's avatar
赵小蒙 committed
1
2
3
4
5
6
# from app.common import s3
import boto3
from botocore.client import Config

import re
import random
赵小蒙's avatar
赵小蒙 committed
7
from typing import List, Union
8
9
10
11
try:
    from app.common import s3_buckets, s3_clusters, get_cluster_name, s3_users
except ImportError:
    from magic_pdf.spark import s3_buckets, s3_clusters, get_cluster_name, s3_users
赵小蒙's avatar
赵小蒙 committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

__re_s3_path = re.compile("^s3a?://([^/]+)(?:/(.*))?$")
def get_s3_config(path: Union[str, List[str]], outside=False):
    paths = [path] if type(path) == str else path
    bucket_config = None
    for p in paths:
        bc = __get_s3_bucket_config(p)
        if bucket_config in [bc, None]:
            bucket_config = bc
            continue
        raise Exception(f"{paths} have different s3 config, cannot read together.")
    if not bucket_config:
        raise Exception("path is empty.")
    return __get_s3_config(bucket_config, outside, prefer_ip=True)

def __get_s3_config(
    bucket_config: tuple,
    outside: bool,
    prefer_ip=False,
    prefer_auto=False,
):
    cluster, user = bucket_config
    cluster_config = s3_clusters[cluster]

    if outside:
        endpoint_key = "outside"
    elif prefer_auto and "auto" in cluster_config:
        endpoint_key = "auto"
    elif cluster_config.get("cluster") == get_cluster_name():
        endpoint_key = "inside"
    else:
        endpoint_key = "outside"

    if prefer_ip and f"{endpoint_key}_ips" in cluster_config:
        endpoint_key = f"{endpoint_key}_ips"

    endpoints = cluster_config[endpoint_key]
    endpoint = random.choice(endpoints)
    return {"endpoint": endpoint, **s3_users[user]}

def split_s3_path(path: str):
    "split bucket and key from path"
    m = __re_s3_path.match(path)
    if m is None:
        return "", ""
    return m.group(1), (m.group(2) or "")

def __get_s3_bucket_config(path: str):
    bucket = split_s3_path(path)[0] if path else ""
    bucket_config = s3_buckets.get(bucket)
    if not bucket_config:
        bucket_config = s3_buckets.get("[default]")
        assert bucket_config is not None
    return bucket_config

def get_s3_client(path: Union[str, List[str]], outside=False):
    s3_config = get_s3_config(path, outside)
    try:
        return boto3.client(
            "s3",
            aws_access_key_id=s3_config["ak"],
            aws_secret_access_key=s3_config["sk"],
            endpoint_url=s3_config["endpoint"],
            config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8, "mode": "standard"}),
        )
    except:
        # older boto3 do not support retries.mode param.
        return boto3.client(
            "s3",
            aws_access_key_id=s3_config["ak"],
            aws_secret_access_key=s3_config["sk"],
            endpoint_url=s3_config["endpoint"],
            config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8}),
        )