"src/targets/vscode:/vscode.git/clone" did not exist on "4b4d43db7e4096085fd6f6b8c5e3d2242642d8a7"
s3.py 2.66 KB
Newer Older
赵小蒙's avatar
赵小蒙 committed
1
2
3
4
# from app.common import s3
import boto3
from botocore.client import Config

赵小蒙's avatar
赵小蒙 committed
5
from app.common import s3_buckets, s3_clusters, get_cluster_name, s3_users
赵小蒙's avatar
赵小蒙 committed
6
7
import re
import random
赵小蒙's avatar
赵小蒙 committed
8
from typing import List, Union
赵小蒙's avatar
赵小蒙 committed
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

__re_s3_path = re.compile("^s3a?://([^/]+)(?:/(.*))?$")
def get_s3_config(path: Union[str, List[str]], outside=False):
    paths = [path] if type(path) == str else path
    bucket_config = None
    for p in paths:
        bc = __get_s3_bucket_config(p)
        if bucket_config in [bc, None]:
            bucket_config = bc
            continue
        raise Exception(f"{paths} have different s3 config, cannot read together.")
    if not bucket_config:
        raise Exception("path is empty.")
    return __get_s3_config(bucket_config, outside, prefer_ip=True)

def __get_s3_config(
    bucket_config: tuple,
    outside: bool,
    prefer_ip=False,
    prefer_auto=False,
):
    cluster, user = bucket_config
    cluster_config = s3_clusters[cluster]

    if outside:
        endpoint_key = "outside"
    elif prefer_auto and "auto" in cluster_config:
        endpoint_key = "auto"
    elif cluster_config.get("cluster") == get_cluster_name():
        endpoint_key = "inside"
    else:
        endpoint_key = "outside"

    if prefer_ip and f"{endpoint_key}_ips" in cluster_config:
        endpoint_key = f"{endpoint_key}_ips"

    endpoints = cluster_config[endpoint_key]
    endpoint = random.choice(endpoints)
    return {"endpoint": endpoint, **s3_users[user]}

def split_s3_path(path: str):
    "split bucket and key from path"
    m = __re_s3_path.match(path)
    if m is None:
        return "", ""
    return m.group(1), (m.group(2) or "")

def __get_s3_bucket_config(path: str):
    bucket = split_s3_path(path)[0] if path else ""
    bucket_config = s3_buckets.get(bucket)
    if not bucket_config:
        bucket_config = s3_buckets.get("[default]")
        assert bucket_config is not None
    return bucket_config

def get_s3_client(path: Union[str, List[str]], outside=False):
    s3_config = get_s3_config(path, outside)
    try:
        return boto3.client(
            "s3",
            aws_access_key_id=s3_config["ak"],
            aws_secret_access_key=s3_config["sk"],
            endpoint_url=s3_config["endpoint"],
            config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8, "mode": "standard"}),
        )
    except:
        # older boto3 do not support retries.mode param.
        return boto3.client(
            "s3",
            aws_access_key_id=s3_config["ak"],
            aws_secret_access_key=s3_config["sk"],
            endpoint_url=s3_config["endpoint"],
            config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8}),
        )