Commit 615e9cbf authored by huteng.ht's avatar huteng.ht
Browse files

init commit for opensource


Signed-off-by: default avatarhuteng.ht <huteng.ht@bytedance.com>
parents
/*
* Copyright (c) 2024 Beijing Volcano Engine Technology Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "include/fastcrypto.h"
#include "include/io_helper.h"
IOHelper::~IOHelper()
{
free_buffer();
}
// init buffer with given positive size or the size of the file in specified
// path
void IOHelper::init_buffer(string file_path, int64_t buffer_size, bool use_pinmem, bool use_sfcs_sdk)
{
if (buffer_size <= 0)
{
buffer_size = get_file_size(file_path.c_str(), use_sfcs_sdk);
}
if (buffer_size_ > 0)
{
free_buffer();
}
buffer_size_ = buffer_size;
if (use_pinmem)
{
use_pinmem_ = true;
cudaMallocHost(&pin_mem, buffer_size, cudaHostAllocMapped);
}
else
{
pin_mem = (char *)mmap(NULL, buffer_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
madvise(pin_mem, buffer_size, MADV_HUGEPAGE);
}
}
void IOHelper::free_buffer()
{
if (pin_mem != NULL)
{
if (use_pinmem_)
cudaFreeHost(pin_mem);
else
munmap(pin_mem, buffer_size_);
}
}
void read_unaligned_part(std::string file_path, torch::Tensor res_tensor, int64_t *offset, int64_t device_id,
size_t *total_size, bool use_sfcs_sdk, bool use_direct_io, size_t *read_unaligned_size,
CipherInfo cipher_info)
{
// cpu align only read head part, while gpu align read both head and tail part
if (device_id < 0)
{
// head is aligned
if ((*offset & (BUF_ALIGN_SIZE - 1)) == 0)
{
return;
}
*read_unaligned_size = min(BUF_ALIGN_SIZE - (*offset & (BUF_ALIGN_SIZE - 1)), *total_size);
if ((uint64_t)res_tensor.data_ptr() % BUF_ALIGN_SIZE != *offset % BUF_ALIGN_SIZE)
{
throw std::runtime_error("data ptr does not satisfy the align purpose");
}
read_file(file_path, (char *)res_tensor.data_ptr(), NULL, 1, *read_unaligned_size, *offset, use_sfcs_sdk,
use_direct_io, cipher_info);
*total_size -= *read_unaligned_size;
*offset += *read_unaligned_size;
}
else
{
size_t end_offset = *offset + *total_size;
// both head and tail are aligned
if ((*offset & (BUF_ALIGN_SIZE - 1)) == 0 && ((end_offset) & (BUF_ALIGN_SIZE - 1)) == 0)
{
return;
}
char tmp_buf_head[BUF_ALIGN_SIZE] = {};
char tmp_buf_tail[BUF_ALIGN_SIZE] = {};
cudaSetDevice(device_id);
// read head unaligned
if ((*offset & (BUF_ALIGN_SIZE - 1)) != 0)
{
size_t read_head_size = min(BUF_ALIGN_SIZE - (*offset & (BUF_ALIGN_SIZE - 1)), *total_size);
read_file(file_path, tmp_buf_head, (char *)res_tensor.data_ptr(), 1, read_head_size, *offset, use_sfcs_sdk,
use_direct_io, cipher_info);
*read_unaligned_size = read_head_size;
*offset += read_head_size;
*total_size -= read_head_size;
}
// read tail unaligned
if (*total_size > 0 && (end_offset & (BUF_ALIGN_SIZE - 1)) != 0)
{
size_t tail_offset = end_offset - (end_offset & (BUF_ALIGN_SIZE - 1));
size_t tensor_offset = tail_offset - *offset + *read_unaligned_size;
read_file(file_path, tmp_buf_tail, (char *)res_tensor.data_ptr() + tensor_offset, 1,
end_offset - tail_offset, tail_offset, use_sfcs_sdk, use_direct_io, cipher_info);
*total_size -= end_offset - tail_offset;
}
cudaDeviceSynchronize();
}
}
void IOHelper::load_file_to_tensor(std::string file_path, torch::Tensor res_tensor, torch::Tensor sample_tensor,
int64_t offset, int64_t device_id, int64_t num_thread, bool use_pinmem,
bool use_sfcs_sdk, bool use_direct_io, bool use_cipher,
pybind11::array_t<char> key_arr, pybind11::array_t<char> iv_arr)
{
size_t file_size = get_file_size(file_path.c_str(), use_sfcs_sdk);
size_t total_size = file_size - offset;
size_t read_unaligned_size = 0;
// set cipher
CipherInfo cipher_info(use_cipher, key_arr, iv_arr);
if (device_id < 0)
{
read_unaligned_part(file_path, res_tensor, &offset, device_id, &total_size, use_sfcs_sdk, use_direct_io,
&read_unaligned_size, cipher_info);
read_file(file_path, (char *)res_tensor.data_ptr() + read_unaligned_size, NULL, num_thread, total_size, offset,
use_sfcs_sdk, use_direct_io, cipher_info);
}
else
{
read_unaligned_part(file_path, res_tensor, &offset, device_id, &total_size, use_sfcs_sdk, use_direct_io,
&read_unaligned_size, cipher_info);
// change use_pinmem attribute may introduce ambiguity
if (buffer_size_ > 0 && use_pinmem != use_pinmem_)
{
throw std::runtime_error("use_pinmem attribute of an exising IOHelper should not be changed");
}
// TODO: HPA might be slow
// only use pin_mem as buffer for copying data to device memory
// the lifecycle of the pin_mem is the same as helper
if (pin_mem == NULL || total_size > buffer_size_)
{
init_buffer(file_path, total_size, use_pinmem, use_sfcs_sdk);
}
cudaSetDevice(device_id);
read_file(file_path, pin_mem, (char *)res_tensor.data_ptr() + read_unaligned_size, num_thread, total_size,
offset, use_sfcs_sdk, use_direct_io, CipherInfo());
cudaDeviceSynchronize();
if (cipher_info.use_cipher && total_size > 0)
{
if (offset % CTR_BLOCK_SIZE != 0 || total_size % CTR_BLOCK_SIZE != 0)
{
throw std::runtime_error("cannot decrypt because gpu read is not aligned");
}
unsigned char iv[CTR_BLOCK_SIZE];
for (size_t i = 0; i < CTR_BLOCK_SIZE; i++)
{
iv[i] = cipher_info.iv[i];
}
ctr128_inc_by(iv, CTR_BLOCK_SIZE, offset / CTR_BLOCK_SIZE);
unsigned char *iv_gpu;
cudaMalloc((void **)&iv_gpu, CTR_BLOCK_SIZE);
cudaMemcpy(iv_gpu, iv, CTR_BLOCK_SIZE, cudaMemcpyHostToDevice);
unsigned char *ct = reinterpret_cast<unsigned char *>(res_tensor.data_ptr()) + read_unaligned_size;
int cipher_ret = ctr_decrypt_gpu(cipher_info.key, iv_gpu, ct, total_size, ct);
if (!cipher_ret)
{
throw std::runtime_error("Cipher Exception: gpu decrypt fail");
}
cudaDeviceSynchronize();
cudaFree(iv_gpu);
}
}
}
libfastcrypto.so.0.1
\ No newline at end of file
/*
* Copyright (c) 2024 Beijing Volcano Engine Technology Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "include/load_utils.h"
void read_file_thread_fread(int thread_id, string file_path, char *addr, char *dev_mem, size_t block_size,
size_t total_size, size_t global_offset, bool use_direct_io)
{
size_t offset = thread_id * block_size;
size_t read_size = block_size;
int fd;
if (offset + read_size >= total_size)
{
read_size = (total_size > offset) ? total_size - offset : 0;
}
// TODO: use_direct_io if sfcs file detected
if (use_direct_io)
{
fd = open(file_path.c_str(), O_RDONLY | O_DIRECT);
}
else
{
fd = open(file_path.c_str(), O_RDONLY);
}
FILE *fp = fdopen(fd, "rb");
fseek(fp, global_offset + offset, SEEK_SET);
fread(addr + offset, 1, read_size, fp);
fclose(fp);
if (dev_mem != NULL)
cudaMemcpyAsync(dev_mem + offset, addr + offset, read_size, cudaMemcpyHostToDevice);
}
void read_file(string file_path, char *addr, char *dev_mem, int num_thread, size_t total_size, size_t global_offset,
bool use_sfcs_sdk, bool use_direct_io, CipherInfo cipher_info)
{
if (total_size == 0)
{
return;
}
vector<thread> threads(num_thread);
size_t block_size = (size_t)ceil((double)total_size / num_thread);
// align the block_size;
block_size = (block_size + BUF_ALIGN_SIZE - 1) / BUF_ALIGN_SIZE * BUF_ALIGN_SIZE;
// re-caculate the real needed thread num;
num_thread = (total_size + block_size - 1) / block_size;
if (use_sfcs_sdk)
{
SFCSFile sfcs_file(file_path, cipher_info);
sfcs_file.read_file_parallel(addr, dev_mem, num_thread, total_size, global_offset);
}
else
{
for (int thread_id = 0; thread_id < num_thread; thread_id++)
{
threads[thread_id] = std::thread(read_file_thread_fread, thread_id, file_path, addr, dev_mem, block_size,
total_size, global_offset, use_direct_io);
}
for (int thread_id = 0; thread_id < num_thread; thread_id++)
{
threads[thread_id].join();
}
}
}
size_t get_file_size(const char *file_name, bool use_sfcs_sdk)
{
if (use_sfcs_sdk)
{
SFCSFile sfcs_file(file_name);
return sfcs_file.get_file_size();
}
else
{
struct stat st;
stat(file_name, &st);
return st.st_size;
}
}
/*
* Copyright (c) 2024 Beijing Volcano Engine Technology Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "include/io_helper.h"
#include "include/sfcs.h"
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
{
py::class_<IOHelper>(m, "IOHelper").def(py::init<>()).def("load_file_to_tensor", &IOHelper::load_file_to_tensor);
py::class_<SFCSFile>(m, "SFCSFile")
.def(py::init<std::string>())
.def(py::init<std::string, bool, pybind11::array_t<char>, pybind11::array_t<char>>())
.def("get_file_size", &SFCSFile::get_file_size)
.def("read_file_to_array", &SFCSFile::read_file_to_array)
.def("write_file_from_array", &SFCSFile::write_file_from_array)
.def("delete_file", &SFCSFile::delete_file);
}
\ No newline at end of file
/*
* Copyright (c) 2024 Beijing Volcano Engine Technology Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "include/sfcs.h"
#include "include/fastcrypto.h"
SFCSFile::SFCSFile(std::string path)
{
file_path = path;
// construct builder
struct cfsBuilder *bld = cfsNewBuilder();
if (bld == NULL)
{
logError("Failed to construct bld", cfsGetLastError());
throw std::runtime_error("SFCS Exception: construct bld");
}
cfsBuilderSetNameNode(bld, SFCS_NAME_NODE);
cfsBuilderSetUserName(bld, SFCS_USER_NAME);
// connect to cfs
fs = cfsBuilderConnect(bld, NULL);
if (fs == NULL)
{
logError("Failed to connect to cfs", cfsGetLastError());
cfsFreeBuilder(bld);
throw std::runtime_error("SFCS Exception: connect to cfs");
}
}
SFCSFile::SFCSFile(std::string file_path, CipherInfo cipher_info) : SFCSFile(file_path)
{
this->cipher_info = cipher_info;
}
SFCSFile::SFCSFile(std::string file_path, bool use_cipher, pybind11::array_t<char> key_arr,
pybind11::array_t<char> iv_arr)
: SFCSFile(file_path)
{
this->cipher_info = CipherInfo(use_cipher, key_arr, iv_arr);
}
SFCSFile::~SFCSFile()
{
cfsDisconnect(fs);
}
size_t SFCSFile::get_file_size()
{
size_t size;
// get path info
cfsFileInfo *file_info = cfsGetPathInfo(fs, file_path.c_str());
if (file_info == NULL)
{
logError("Failed to get path info of relative path", file_path, cfsGetLastError());
cfsDisconnect(fs);
throw std::runtime_error("SFCS Exception: get path info");
}
size = file_info->mSize;
cfsFreeFileInfo(file_info, 1);
return size;
}
size_t SFCSFile::read_file(char *addr, size_t length, size_t offset)
{
size_t count;
int32_t ret;
char *dst;
cfsFile file = cfsOpenFile(fs, file_path.c_str(), O_RDONLY, 0, 0, 0);
if (file == NULL)
{
logError("Failed to open file", file_path, cfsGetLastError());
throw std::runtime_error("SFCS Exception: open file");
}
ret = cfsSeek(fs, file, offset);
if (ret == -1)
{
logError("Failed to seek file", file_path, cfsGetLastError());
cfsCloseFile(fs, file);
throw std::runtime_error("SFCS Exception: seek file");
}
dst = addr;
count = length;
while (count > 0)
{
ret = cfsRead(fs, file, dst, count);
// EOF
if (ret == 0)
break;
if (ret < 0)
{
logError("Failed to read file", file_path, cfsGetLastError());
throw std::runtime_error("SFCS Exception: read file");
}
count -= ret;
dst += ret;
}
cfsCloseFile(fs, file);
// Decrypt if use_cipher is true
if (cipher_info.use_cipher)
{
CtrDecrypter dec(cipher_info.key, cipher_info.iv, offset);
unsigned char *ct = reinterpret_cast<unsigned char *>(addr);
int cipher_ret = dec.decrypt_update(ct, length - count, ct);
if (!cipher_ret)
{
throw std::runtime_error("Cipher Exception: decrypt fail");
}
}
return length - count;
}
void SFCSFile::read_file_thread(int thread_id, char *addr, char *dev_mem, size_t block_size, size_t total_size,
size_t global_offset)
{
size_t offset = thread_id * block_size;
size_t read_size = block_size;
if (offset + read_size >= total_size)
{
read_size = (total_size > offset) ? total_size - offset : 0;
}
read_file(addr + offset, read_size, global_offset + offset);
if (dev_mem != NULL)
cudaMemcpyAsync(dev_mem + offset, addr + offset, read_size, cudaMemcpyHostToDevice);
}
size_t SFCSFile::read_file_parallel(char *addr, char *dev_mem, int num_thread, size_t total_size, size_t global_offset)
{
vector<thread> threads(num_thread);
if (total_size == 0)
{
return total_size;
}
size_t block_size = (size_t)ceil((double)total_size / num_thread);
// align the block_size;
block_size = (block_size + BUF_ALIGN_SIZE - 1) / BUF_ALIGN_SIZE * BUF_ALIGN_SIZE;
// re-caculate the real needed thread num;
num_thread = (total_size + block_size - 1) / block_size;
for (int thread_id = 0; thread_id < num_thread; thread_id++)
{
threads[thread_id] = std::thread(&SFCSFile::read_file_thread, this, thread_id, addr, dev_mem, block_size,
total_size, global_offset);
}
for (int thread_id = 0; thread_id < num_thread; thread_id++)
{
threads[thread_id].join();
}
return total_size;
}
size_t SFCSFile::read_file_to_array(pybind11::array_t<char> arr, size_t length, size_t offset, int num_thread)
{
pybind11::buffer_info buf_info = arr.request();
char *addr = static_cast<char *>(buf_info.ptr);
return read_file_parallel(addr, NULL, num_thread, length, offset);
}
size_t SFCSFile::write_file(char *addr, size_t length)
{
size_t count;
int32_t ret;
char *dst;
if (cipher_info.use_cipher)
{
CtrEncrypter enc(cipher_info.key, cipher_info.iv, 0);
unsigned char *pt = reinterpret_cast<unsigned char *>(addr);
int cipher_ret = enc.encrypt_update(pt, length, pt);
if (!cipher_ret)
{
throw std::runtime_error("Cipher Exception: encrypt fail");
}
}
cfsFile file = cfsOpenFile(fs, file_path.c_str(), O_WRONLY | O_ASYNC, 0, 0, 0);
if (file == NULL)
{
logError("Failed to open file", file_path, cfsGetLastError());
throw std::runtime_error("SFCS Exception: open file");
}
dst = addr;
count = length;
while (count > 0)
{
ret = cfsWrite(fs, file, dst, count);
// EOF
if (ret == 0)
break;
if (ret < 0)
{
logError("Failed to write file", file_path, cfsGetLastError());
throw std::runtime_error("SFCS Exception: write file");
}
count -= ret;
dst += ret;
}
cfsCloseFile(fs, file);
return length - count;
}
size_t SFCSFile::write_file_from_array(pybind11::array_t<char> arr, size_t length)
{
pybind11::buffer_info buf_info = arr.request();
char *addr = static_cast<char *>(buf_info.ptr);
return write_file(addr, length);
}
void SFCSFile::delete_file()
{
int ret;
ret = cfsDelete(fs, file_path.c_str(), 1);
if (ret == -1)
{
logError("Failed to delete file", file_path, cfsGetLastError());
throw std::runtime_error("SFCS Exception: delete file");
}
}
CipherInfo::CipherInfo(bool use_cipher, pybind11::array_t<char> key_arr, pybind11::array_t<char> iv_arr)
{
this->use_cipher = use_cipher;
if (use_cipher)
{
pybind11::buffer_info key_info = key_arr.request();
if ((size_t)key_info.size != CTR_BLOCK_SIZE)
{
throw std::runtime_error("Cipher Exception: key length invalid");
}
key = reinterpret_cast<unsigned char *>(key_info.ptr);
pybind11::buffer_info iv_info = iv_arr.request();
if ((size_t)iv_info.size != CTR_BLOCK_SIZE)
{
throw std::runtime_error("Cipher Exception: iv length invalid");
}
iv = reinterpret_cast<unsigned char *>(iv_info.ptr);
}
}
\ No newline at end of file
'''
Copyright (c) 2024 Beijing Volcano Engine Technology Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
from typing import Optional
import torch
from loguru import logger
from veturboio.ops.cipher import CipherInfo
try:
import veturboio_ext
IOHelper = veturboio_ext.IOHelper
except ImportError:
IOHelper = None
logger.warning("veturboio_ext not found, fallback to pure python implementation")
def load_file_to_tensor(
file_path: str,
total_tensor: torch.Tensor,
sample_tensor: torch.Tensor,
offset: int,
helper: IOHelper,
device_id: Optional[int] = -1,
num_thread: Optional[int] = 32,
use_pinmem: Optional[bool] = False,
use_sfcs_sdk: Optional[bool] = False,
use_direct_io: Optional[bool] = False,
cipher_info: CipherInfo = CipherInfo(False),
) -> torch.Tensor:
return helper.load_file_to_tensor(
file_path,
total_tensor,
sample_tensor,
offset,
device_id,
num_thread,
use_pinmem,
use_sfcs_sdk,
use_direct_io,
cipher_info.use_cipher,
cipher_info.key,
cipher_info.iv,
)
def init_io_helper() -> IOHelper:
return IOHelper()
'''
Copyright (c) 2024 Beijing Volcano Engine Technology Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import os
import shutil
import tempfile
import threading
import xml.dom.minidom
from datetime import datetime, timezone
from typing import Optional
import numpy as np
from loguru import logger
from veturboio.ops.cipher import CipherInfo, DataPipeClient
try:
import veturboio_ext
SFCSFile = veturboio_ext.SFCSFile
except ImportError:
SFCSFile = None
logger.warning("veturboio_ext not found, fallback to pure python implementation")
SFCS_REQ_ENV_LIST = [
'SFCS_FSNAME',
'SFCS_REGION',
'SFCS_AUTHENTICATION_SERVICE_NAME',
'SFCS_NS_ID',
'SFCS_UFS_PATH',
'SFCS_MULTI_NIC_WHITELIST',
'SFCS_NETWORK_SEGMENT',
'SFCS_LOG_SEVERITY',
]
SFCS_OPT_ENV_LIST = [
'SFCS_ACCESS_KEY',
'SFCS_SECRET_KEY',
'SFCS_NAMENODE_ENDPOINT_ADDRESS',
]
SFCS_PROPERTIES = {
'cfs.filesystem.fs-mode': 'ACC',
'cfs.filesystem.task-id': 'sfcs',
'cfs.filesystem.resolve.addr.by.dns': 'false',
'cfs.metrics.emitters': 'metric_server;local_prometheus',
'cfs.client.metadata-cache.enable': 'false',
'rpc.client.channel.pool.size': '32',
'dfs.default.replica': '2',
'cfs.client.multi-nic.enabled': 'true',
'fs.datanode.router.ignore-main-nic': 'true',
'cfs.datanode.router.shuffle': 'true',
}
class CredentialsHelper:
def __init__(self):
self.lock = threading.Lock()
self.running = False
# daemon thread will stop when parent thread exits
self.thread = None
self.client = None
self.current_time = 0
self.expired_time = 0
self.ak = None
self.sk = None
self.st = None
self.name_node_ip = None
self.sfcs_conf_path = None
self.stop_flag = None
def run(self, sfcs_conf_path) -> None:
if not self.running:
with self.lock:
if not self.running:
self.thread = threading.Thread(target=self.refresh_loop, daemon=True)
self.stop_flag = threading.Event()
self.client = DataPipeClient()
if not self.client.session:
raise RuntimeError('Datapipe client initialization failed in credentials helper')
self.sfcs_conf_path = sfcs_conf_path
if not self.do_refresh():
raise RuntimeError('Credentials helper do refresh failed')
self.thread.start()
self.running = True
logger.info('CredentialsHelper refresh thread strat')
return
logger.info('CredentialsHelper thread is already running, do nothing')
def stop(self):
self.stop_flag.set()
def is_valid_res(self, d: Optional[dict]) -> bool:
if not d:
return False
for k in ['Cred', 'SfcsNameNodeAddress']:
if k not in d:
return False
d = d['Cred']
for k in ['CurrentTime', 'ExpiredTime', 'AccessKeyId', 'SecretAccessKey', 'SessionToken']:
if k not in d:
return False
return True
def refresh_loop(self) -> None:
while True:
now = datetime.now(tz=timezone.utc).timestamp()
ts_ref = (self.current_time + self.expired_time) / 2
if now >= ts_ref:
if not self.do_refresh():
raise RuntimeError('Credentials helper do refresh failed')
else:
if self.stop_flag.wait(ts_ref - now):
return
def do_refresh(self) -> bool:
d = self.client.get_sfcs_ak_sk_st()
if self.is_valid_res(d):
self.name_node_ip = d['SfcsNameNodeAddress']
d = d['Cred']
self.current_time = datetime.fromisoformat(d['CurrentTime']).timestamp()
self.expired_time = datetime.fromisoformat(d['ExpiredTime']).timestamp()
self.ak = d['AccessKeyId']
self.sk = d['SecretAccessKey']
self.st = d['SessionToken']
# update SFCS_PROPERTIES and then write xml
SFCS_PROPERTIES['cfs.access.key'] = self.ak
SFCS_PROPERTIES['cfs.secret.key'] = self.sk
SFCS_PROPERTIES['cfs.security.token'] = self.st
SFCS_PROPERTIES['cfs.namenode.endpoint.address.' + os.getenv('SFCS_FSNAME')] = self.name_node_ip
generate_sfcs_conf_xml(self.sfcs_conf_path)
logger.info('Credentials are successfully refreshed!')
return True
else:
return False
credentials_helper = CredentialsHelper()
def init_sfcs_properties():
for env in SFCS_REQ_ENV_LIST:
if os.getenv(env) is None:
raise ValueError('environ ' + env + ' not set')
SFCS_PROPERTIES['dfs.default.uri'] = (
'cfs://' + os.getenv('SFCS_FSNAME') + '.sfcs-' + os.getenv('SFCS_REGION') + '.ivolces.com'
)
SFCS_PROPERTIES['dfs.authentication.service.name'] = os.getenv('SFCS_AUTHENTICATION_SERVICE_NAME')
SFCS_PROPERTIES['cfs.filesystem.ns-id'] = os.getenv('SFCS_NS_ID')
SFCS_PROPERTIES['cfs.filesystem.ufs-path'] = os.getenv('SFCS_UFS_PATH')
SFCS_PROPERTIES['cfs.metrics.server.host'] = 'metricserver.cfs-' + os.getenv('SFCS_REGION') + '.ivolces.com'
SFCS_PROPERTIES['cfs.client.multi-nic.whitelist'] = os.getenv('SFCS_MULTI_NIC_WHITELIST')
SFCS_PROPERTIES['cfs.client.network.segment'] = os.getenv('SFCS_NETWORK_SEGMENT')
SFCS_PROPERTIES['dfs.client.log.severity'] = os.getenv('SFCS_LOG_SEVERITY')
# optional
SFCS_PROPERTIES['cfs.filesystem.sync-interval'] = os.getenv('SFCS_SYNC_INTERVAL', "-1")
SFCS_PROPERTIES['cfs.access.key'] = os.getenv('SFCS_ACCESS_KEY')
SFCS_PROPERTIES['cfs.secret.key'] = os.getenv('SFCS_SECRET_KEY')
SFCS_PROPERTIES['cfs.namenode.endpoint.address.' + os.getenv('SFCS_FSNAME')] = os.getenv(
'SFCS_NAMENODE_ENDPOINT_ADDRESS'
)
def generate_sfcs_conf_xml(sfcs_conf):
doc = xml.dom.minidom.Document()
configuration = doc.createElement('configuration')
doc.appendChild(configuration)
for key in SFCS_PROPERTIES:
property = doc.createElement('property')
name = doc.createElement('name')
name.appendChild(doc.createTextNode(key))
value = doc.createElement('value')
value.appendChild(doc.createTextNode(SFCS_PROPERTIES[key]))
property.appendChild(name)
property.appendChild(value)
configuration.appendChild(property)
pi = doc.createProcessingInstruction('xml-stylesheet', 'type="text/xsl" href="configuration.xsl"')
doc.insertBefore(pi, configuration)
tmp_conf = tempfile.NamedTemporaryFile(mode='w', delete=False)
doc.writexml(tmp_conf, indent='\t', addindent='\t', newl='\n', encoding="utf-8")
tmp_conf.close()
shutil.move(tmp_conf.name, sfcs_conf)
def init_sfcs_conf():
if not os.getenv('LIBCFS_CONF'):
logger.warning('environ LIBCFS_CONF not set, set it to ' + os.getcwd() + '/libcfs.xml')
os.environ['LIBCFS_CONF'] = os.getcwd() + '/libcfs.xml'
sfcs_conf = os.getenv('LIBCFS_CONF')
if os.path.exists(sfcs_conf):
# case 1: a xml file already exists, do nothing
logger.warning('LIBCFS_CONF file exists')
else:
init_sfcs_properties()
if (
os.getenv('SFCS_ACCESS_KEY')
and os.getenv('SFCS_SECRET_KEY')
and os.getenv('SFCS_NAMENODE_ENDPOINT_ADDRESS')
):
# case 2: env SFCS_ACCESS_KEY, SFCS_SECRET_KEY and SFCS_NAMENODE_ENDPOINT_ADDRESS exist
logger.warning('Use aksk and namenode_ip in env to generate sfcs config')
generate_sfcs_conf_xml(sfcs_conf)
else:
# case 3: use datapipe socket to get and refresh ak, sk, and st
logger.warning('Use credentials helper to generate and update sfcs config')
credentials_helper.run(sfcs_conf)
def sfcs_get_file_size(file_path: str) -> int:
sfcs_file = SFCSFile(file_path)
return sfcs_file.get_file_size()
def sfcs_read_file(
file_path: str,
arr: np.ndarray,
length: int,
offset: int,
num_thread: Optional[int] = 1,
cipher_info: CipherInfo = CipherInfo(False),
) -> int:
sfcs_file = SFCSFile(file_path, cipher_info.use_cipher, cipher_info.key, cipher_info.iv)
return sfcs_file.read_file_to_array(arr, length, offset, num_thread)
def sfcs_write_file(file_path: str, arr: np.ndarray, length: int, cipher_info: CipherInfo = CipherInfo(False)) -> int:
sfcs_file = SFCSFile(file_path, cipher_info.use_cipher, cipher_info.key, cipher_info.iv)
return sfcs_file.write_file_from_array(arr, length)
def sfcs_delete_file(file_path: str):
sfcs_file = SFCSFile(file_path)
sfcs_file.delete_file()
This diff is collapsed.
'''
Copyright (c) 2024 Beijing Volcano Engine Technology Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
from veturboio.saver.base_saver import BaseSaver, PosixSaver
from veturboio.saver.sfcs_client_saver import SfcsClientSaver
__all__ = ["BaseSaver", "PosixSaver", "SfcsClientSaver"]
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment