// Copyright 2019-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions // are met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // * Neither the name of NVIDIA CORPORATION nor the names of its // contributors may be used to endorse or promote products derived // from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY // OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "filesystem.h" #ifdef _WIN32 // suppress the min and max definitions in Windef.h. #define NOMINMAX #include // _CRT_INTERNAL_NONSTDC_NAMES 1 before including Microsoft provided C Runtime // library to expose declarations without "_" prefix to match POSIX style. #define _CRT_INTERNAL_NONSTDC_NAMES 1 #include #include #else #include #include #endif #ifdef TRITON_ENABLE_GCS #include #endif // TRITON_ENABLE_GCS #ifdef TRITON_ENABLE_S3 #include #include #include #include #include #include #include #endif // TRITON_ENABLE_S3 #ifdef TRITON_ENABLE_AZURE_STORAGE #include #include #include #undef LOG_INFO #undef LOG_WARNING #endif // TRITON_ENABLE_AZURE_STORAGE #include #include #include #include #include #include #include #include #include #include #include "constants.h" #include "status.h" #include "triton/common/logging.h" #define TRITONJSON_STATUSTYPE triton::core::Status #define TRITONJSON_STATUSRETURN(M) \ return triton::core::Status(triton::core::Status::Code::INTERNAL, (M)) #define TRITONJSON_STATUSSUCCESS triton::core::Status::Success #include "triton/common/triton_json.h" #ifdef _WIN32 // in Windows doesn't define S_ISDIR macro #if !defined(S_ISDIR) && defined(S_IFMT) && defined(S_IFDIR) #define S_ISDIR(m) (((m)&S_IFMT) == S_IFDIR) #endif #define F_OK 0 #endif namespace triton { namespace core { namespace { // Check if a local path is a directory. We need to use this in LocalFileSystem // and LocalizedPath so have this common function. Status IsPathDirectory(const std::string& path, bool* is_dir) { *is_dir = false; struct stat st; if (stat(path.c_str(), &st) != 0) { return Status(Status::Code::INTERNAL, "failed to stat file " + path); } *is_dir = S_ISDIR(st.st_mode); return Status::Success; } } // namespace LocalizedPath::~LocalizedPath() { if (!local_path_.empty()) { bool is_dir = true; IsDirectory(local_path_, &is_dir); LOG_STATUS_ERROR( DeletePath(is_dir ? local_path_ : DirName(local_path_)), "failed to delete localized path"); } } namespace { class FileSystem { public: virtual Status FileExists(const std::string& path, bool* exists) = 0; virtual Status IsDirectory(const std::string& path, bool* is_dir) = 0; virtual Status FileModificationTime( const std::string& path, int64_t* mtime_ns) = 0; virtual Status GetDirectoryContents( const std::string& path, std::set* contents) = 0; virtual Status GetDirectorySubdirs( const std::string& path, std::set* subdirs) = 0; virtual Status GetDirectoryFiles( const std::string& path, std::set* files) = 0; virtual Status ReadTextFile( const std::string& path, std::string* contents) = 0; virtual Status LocalizePath( const std::string& path, std::shared_ptr* localized) = 0; virtual Status WriteTextFile( const std::string& path, const std::string& contents) = 0; virtual Status WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) = 0; virtual Status MakeDirectory( const std::string& dir, const bool recursive) = 0; virtual Status MakeTemporaryDirectory(std::string* temp_dir) = 0; virtual Status DeletePath(const std::string& path) = 0; }; class LocalFileSystem : public FileSystem { public: Status FileExists(const std::string& path, bool* exists) override; Status IsDirectory(const std::string& path, bool* is_dir) override; Status FileModificationTime( const std::string& path, int64_t* mtime_ns) override; Status GetDirectoryContents( const std::string& path, std::set* contents) override; Status GetDirectorySubdirs( const std::string& path, std::set* subdirs) override; Status GetDirectoryFiles( const std::string& path, std::set* files) override; Status ReadTextFile(const std::string& path, std::string* contents) override; Status LocalizePath( const std::string& path, std::shared_ptr* localized) override; Status WriteTextFile( const std::string& path, const std::string& contents) override; Status WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) override; Status MakeDirectory(const std::string& dir, const bool recursive) override; Status MakeTemporaryDirectory(std::string* temp_dir) override; Status DeletePath(const std::string& path) override; }; Status LocalFileSystem::FileExists(const std::string& path, bool* exists) { *exists = (access(path.c_str(), F_OK) == 0); return Status::Success; } Status LocalFileSystem::IsDirectory(const std::string& path, bool* is_dir) { return IsPathDirectory(path, is_dir); } Status LocalFileSystem::FileModificationTime( const std::string& path, int64_t* mtime_ns) { struct stat st; if (stat(path.c_str(), &st) != 0) { return Status(Status::Code::INTERNAL, "failed to stat file " + path); } #ifdef _WIN32 // In Windows, st_mtime is in time_t *mtime_ns = std::max(st.st_mtime, st.st_ctime); #else *mtime_ns = std::max(TIMESPEC_TO_NANOS(st.st_mtim), TIMESPEC_TO_NANOS(st.st_ctim)); #endif return Status::Success; } Status LocalFileSystem::GetDirectoryContents( const std::string& path, std::set* contents) { #ifdef _WIN32 WIN32_FIND_DATA entry; // Append "*" to obtain all files under 'path' HANDLE dir = FindFirstFile(JoinPath({path, "*"}).c_str(), &entry); if (dir == INVALID_HANDLE_VALUE) { return Status(Status::Code::INTERNAL, "failed to open directory " + path); } if ((strcmp(entry.cFileName, ".") != 0) && (strcmp(entry.cFileName, "..") != 0)) { contents->insert(entry.cFileName); } while (FindNextFile(dir, &entry)) { if ((strcmp(entry.cFileName, ".") != 0) && (strcmp(entry.cFileName, "..") != 0)) { contents->insert(entry.cFileName); } } FindClose(dir); #else DIR* dir = opendir(path.c_str()); if (dir == nullptr) { return Status(Status::Code::INTERNAL, "failed to open directory " + path); } struct dirent* entry; while ((entry = readdir(dir)) != nullptr) { std::string entryname = entry->d_name; if ((entryname != ".") && (entryname != "..")) { contents->insert(entryname); } } closedir(dir); #endif return Status::Success; } Status LocalFileSystem::GetDirectorySubdirs( const std::string& path, std::set* subdirs) { RETURN_IF_ERROR(GetDirectoryContents(path, subdirs)); // Erase non-directory entries... for (auto iter = subdirs->begin(); iter != subdirs->end();) { bool is_dir; RETURN_IF_ERROR(IsDirectory(JoinPath({path, *iter}), &is_dir)); if (!is_dir) { iter = subdirs->erase(iter); } else { ++iter; } } return Status::Success; } Status LocalFileSystem::GetDirectoryFiles( const std::string& path, std::set* files) { RETURN_IF_ERROR(GetDirectoryContents(path, files)); // Erase directory entries... for (auto iter = files->begin(); iter != files->end();) { bool is_dir; RETURN_IF_ERROR(IsDirectory(JoinPath({path, *iter}), &is_dir)); if (is_dir) { iter = files->erase(iter); } else { ++iter; } } return Status::Success; } Status LocalFileSystem::ReadTextFile(const std::string& path, std::string* contents) { std::ifstream in(path, std::ios::in | std::ios::binary); if (!in) { return Status( Status::Code::INTERNAL, "failed to open text file for read " + path + ": " + strerror(errno)); } in.seekg(0, std::ios::end); contents->resize(in.tellg()); in.seekg(0, std::ios::beg); in.read(&(*contents)[0], contents->size()); in.close(); return Status::Success; } Status LocalFileSystem::LocalizePath( const std::string& path, std::shared_ptr* localized) { // For local file system we don't actually need to download the // directory or file. We use it in place. localized->reset(new LocalizedPath(path)); return Status::Success; } Status LocalFileSystem::WriteTextFile( const std::string& path, const std::string& contents) { std::ofstream out(path, std::ios::out | std::ios::binary); if (!out) { return Status( Status::Code::INTERNAL, "failed to open text file for write " + path + ": " + strerror(errno)); } out.write(&contents[0], contents.size()); out.close(); return Status::Success; } Status LocalFileSystem::WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) { std::ofstream out(path, std::ios::out | std::ios::binary); if (!out) { return Status( Status::Code::INTERNAL, "failed to open binary file for write " + path + ": " + strerror(errno)); } out.write(contents, content_len); return Status::Success; } Status LocalFileSystem::MakeDirectory(const std::string& dir, const bool recursive) { #ifdef _WIN32 if (mkdir(dir.c_str()) == -1) #else if (mkdir(dir.c_str(), S_IRWXU) == -1) #endif { // Only allow the error due to parent directory does not exist // if 'recursive' is requested if ((errno == ENOENT) && (!dir.empty()) && recursive) { RETURN_IF_ERROR(MakeDirectory(DirName(dir), recursive)); // Retry the creation #ifdef _WIN32 if (mkdir(dir.c_str()) == -1) #else if (mkdir(dir.c_str(), S_IRWXU) == -1) #endif { return Status( Status::Code::INTERNAL, "Failed to create directory '" + dir + "', errno:" + strerror(errno)); } } else { return Status( Status::Code::INTERNAL, "Failed to create directory '" + dir + "', errno:" + strerror(errno)); } } return Status::Success; } Status LocalFileSystem::MakeTemporaryDirectory(std::string* temp_dir) { #ifdef _WIN32 char temp_path[MAX_PATH + 1]; size_t temp_path_length = GetTempPath(MAX_PATH + 1, temp_path); if (temp_path_length == 0) { return Status( Status::Code::INTERNAL, "Failed to get local directory for temporary files"); } // There is no single operation like 'mkdtemp' in Windows, thus generating // unique temporary directory is a process of getting temporary file name, // deleting the file (file creation is side effect fo getting name), creating // corresponding directory, so mutex is used to avoid possible race condition. // However, it doesn't prevent other process on creating temporary file and // thus the race condition may still happen. One possible solution is // to reserve a temporary directory for the process and generate temporary // model directories inside it. static std::mutex mtx; std::lock_guard lk(mtx); // Construct a std::string as filled 'temp_path' is not C string, // and so that we can reuse 'temp_path' to hold the temp file name. std::string temp_path_str(temp_path, temp_path_length); if (GetTempFileName(temp_path_str.c_str(), "folder", 0, temp_path) == 0) { return Status(Status::Code::INTERNAL, "Failed to create local temp folder"); } *temp_dir = temp_path; DeleteFile(temp_dir->c_str()); if (CreateDirectory(temp_dir->c_str(), NULL) == 0) { return Status( Status::Code::INTERNAL, "Failed to create local temp folder: " + *temp_dir); } #else std::string folder_template = "/tmp/folderXXXXXX"; char* res = mkdtemp(const_cast(folder_template.c_str())); if (res == nullptr) { return Status( Status::Code::INTERNAL, "Failed to create local temp folder: " + folder_template + ", errno:" + strerror(errno)); } *temp_dir = res; #endif return Status::Success; } Status LocalFileSystem::DeletePath(const std::string& path) { bool is_dir = false; RETURN_IF_ERROR(IsDirectory(path, &is_dir)); if (is_dir) { std::set contents; RETURN_IF_ERROR(GetDirectoryContents(path, &contents)); for (const auto& content : contents) { RETURN_IF_ERROR(DeletePath(JoinPath({path, content}))); } rmdir(path.c_str()); } else { remove(path.c_str()); } return Status::Success; } #if defined(TRITON_ENABLE_GCS) || defined(TRITON_ENABLE_S3) || \ defined(TRITON_ENABLE_AZURE_STORAGE) // Helper function to take care of lack of trailing slashes std::string AppendSlash(const std::string& name) { if (name.empty() || (name.back() == '/')) { return name; } return (name + "/"); } #endif // TRITON_ENABLE_GCS || TRITON_ENABLE_S3 || TRITON_ENABLE_AZURE_STORAGE #ifdef TRITON_ENABLE_GCS namespace gcs = google::cloud::storage; struct GCSCredential { std::string path_; GCSCredential(); // from env var GCSCredential(triton::common::TritonJson::Value& cred_json); }; GCSCredential::GCSCredential() { const char* path = std::getenv("GOOGLE_APPLICATION_CREDENTIALS"); path_ = (path != nullptr ? std::string(path) : ""); } GCSCredential::GCSCredential(triton::common::TritonJson::Value& cred_json) { cred_json.AsString(&path_); } class GCSFileSystem : public FileSystem { public: GCSFileSystem(const GCSCredential& gs_cred); // unify with S3/azure interface GCSFileSystem(const std::string& path, const GCSCredential& gs_cred) : GCSFileSystem(gs_cred) { } Status CheckClient(); // unify with S3 interface Status CheckClient(const std::string& path) { return CheckClient(); } Status FileExists(const std::string& path, bool* exists) override; Status IsDirectory(const std::string& path, bool* is_dir) override; Status FileModificationTime( const std::string& path, int64_t* mtime_ns) override; Status GetDirectoryContents( const std::string& path, std::set* contents) override; Status GetDirectorySubdirs( const std::string& path, std::set* subdirs) override; Status GetDirectoryFiles( const std::string& path, std::set* files) override; Status ReadTextFile(const std::string& path, std::string* contents) override; Status LocalizePath( const std::string& path, std::shared_ptr* localized) override; Status WriteTextFile( const std::string& path, const std::string& contents) override; Status WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) override; Status MakeDirectory(const std::string& dir, const bool recursive) override; Status MakeTemporaryDirectory(std::string* temp_dir) override; Status DeletePath(const std::string& path) override; private: Status ParsePath( const std::string& path, std::string* bucket, std::string* object); Status MetaDataExists( const std::string path, bool* exists, google::cloud::StatusOr* metadata); google::cloud::StatusOr client_; }; GCSFileSystem::GCSFileSystem(const GCSCredential& gs_cred) { auto creds = gcs::oauth2::CreateServiceAccountCredentialsFromJsonFilePath( gs_cred.path_); if (creds) { client_ = gcs::Client(gcs::ClientOptions(*creds)); } } Status GCSFileSystem::CheckClient() { if (!client_) { return Status( Status::Code::INTERNAL, "Unable to create GCS client. Check account credentials."); } return Status::Success; } Status GCSFileSystem::ParsePath( const std::string& path, std::string* bucket, std::string* object) { // Get the bucket name and the object path. Return error if input is malformed int bucket_start = path.find("gs://") + strlen("gs://"); int bucket_end = path.find("/", bucket_start); // If there isn't a second slash, the address has only the bucket if (bucket_end > bucket_start) { *bucket = path.substr(bucket_start, bucket_end - bucket_start); *object = path.substr(bucket_end + 1); } else { *bucket = path.substr(bucket_start); *object = ""; } if (bucket->empty()) { return Status( Status::Code::INTERNAL, "No bucket name found in path: " + path); } return Status::Success; } Status GCSFileSystem::FileExists(const std::string& path, bool* exists) { *exists = false; std::string bucket, object; RETURN_IF_ERROR(ParsePath(path, &bucket, &object)); // Make a request for metadata and check the response google::cloud::StatusOr object_metadata = client_->GetObjectMetadata(bucket, object); if (object_metadata) { *exists = true; return Status::Success; } // GCS doesn't make objects for directories, so it could still be a directory bool is_dir; RETURN_IF_ERROR(IsDirectory(path, &is_dir)); *exists = is_dir; return Status::Success; } Status GCSFileSystem::IsDirectory(const std::string& path, bool* is_dir) { *is_dir = false; std::string bucket, object_path; RETURN_IF_ERROR(ParsePath(path, &bucket, &object_path)); // Check if the bucket exists google::cloud::StatusOr bucket_metadata = client_->GetBucketMetadata(bucket); if (!bucket_metadata) { return Status( Status::Code::INTERNAL, "Could not get MetaData for bucket with name " + bucket + " : " + bucket_metadata.status().message()); } // Root case - bucket exists and object path is empty if (object_path.empty()) { *is_dir = true; return Status::Success; } // Check whether it has children. If at least one child, it is a directory for (auto&& object_metadata : client_->ListObjects(bucket, gcs::Prefix(AppendSlash(object_path)))) { if (object_metadata) { *is_dir = true; break; } } return Status::Success; } Status GCSFileSystem::FileModificationTime(const std::string& path, int64_t* mtime_ns) { // We don't need to worry about the case when this is a directory bool is_dir; RETURN_IF_ERROR(IsDirectory(path, &is_dir)); if (is_dir) { *mtime_ns = 0; return Status::Success; } std::string bucket, object; RETURN_IF_ERROR(ParsePath(path, &bucket, &object)); // Otherwise check the object metadata for update time google::cloud::StatusOr object_metadata = client_->GetObjectMetadata(bucket, object); if (!object_metadata) { return Status( Status::Code::INTERNAL, "Failed to get metadata for " + object + " : " + object_metadata.status().message()); } // Get duration from time point with respect to object clock auto update_time = std::chrono::time_point_cast( object_metadata->updated()) .time_since_epoch() .count(); *mtime_ns = update_time; return Status::Success; } Status GCSFileSystem::GetDirectoryContents( const std::string& path, std::set* contents) { std::string bucket, dir_path; RETURN_IF_ERROR(ParsePath(path, &bucket, &dir_path)); // Append a slash to make it easier to list contents std::string full_dir = AppendSlash(dir_path); // Get objects with prefix equal to full directory path for (auto&& object_metadata : client_->ListObjects(bucket, gcs::Prefix(full_dir))) { if (!object_metadata) { return Status( Status::Code::INTERNAL, "Could not list contents of directory at " + path + " : " + object_metadata.status().message()); } // In the case of empty directories, the directory itself will appear here if (object_metadata->name() == full_dir) { continue; } // We have to make sure that subdirectory contents do not appear here std::string name = object_metadata->name(); int item_start = name.find(full_dir) + full_dir.size(); // GCS response prepends parent directory name int item_end = name.find("/", item_start); // Let set take care of subdirectory contents std::string item = name.substr(item_start, item_end - item_start); contents->insert(item); } return Status::Success; } Status GCSFileSystem::GetDirectorySubdirs( const std::string& path, std::set* subdirs) { RETURN_IF_ERROR(GetDirectoryContents(path, subdirs)); // Erase non-directory entries... for (auto iter = subdirs->begin(); iter != subdirs->end();) { bool is_dir; RETURN_IF_ERROR(IsDirectory(JoinPath({path, *iter}), &is_dir)); if (!is_dir) { iter = subdirs->erase(iter); } else { ++iter; } } return Status::Success; } Status GCSFileSystem::GetDirectoryFiles( const std::string& path, std::set* files) { RETURN_IF_ERROR(GetDirectoryContents(path, files)); // Erase directory entries... for (auto iter = files->begin(); iter != files->end();) { bool is_dir; RETURN_IF_ERROR(IsDirectory(JoinPath({path, *iter}), &is_dir)); if (is_dir) { iter = files->erase(iter); } else { ++iter; } } return Status::Success; } Status GCSFileSystem::ReadTextFile(const std::string& path, std::string* contents) { bool exists; RETURN_IF_ERROR(FileExists(path, &exists)); if (!exists) { return Status(Status::Code::INTERNAL, "File does not exist at " + path); } std::string bucket, object; ParsePath(path, &bucket, &object); gcs::ObjectReadStream stream = client_->ReadObject(bucket, object); if (!stream) { return Status( Status::Code::INTERNAL, "Failed to open object read stream for " + path + " : " + stream.status().message()); } std::string data = ""; char c; while (stream.get(c)) { data += c; } *contents = data; return Status::Success; } Status GCSFileSystem::LocalizePath( const std::string& path, std::shared_ptr* localized) { bool exists; RETURN_IF_ERROR(FileExists(path, &exists)); if (!exists) { return Status( Status::Code::INTERNAL, "directory or file does not exist at " + path); } bool is_dir; RETURN_IF_ERROR(IsDirectory(path, &is_dir)); if (!is_dir) { return Status( Status::Code::UNSUPPORTED, "GCS file localization not yet implemented " + path); } std::string tmp_folder; RETURN_IF_ERROR( triton::core::MakeTemporaryDirectory(FileSystemType::LOCAL, &tmp_folder)); localized->reset(new LocalizedPath(path, tmp_folder)); std::set contents, filenames; RETURN_IF_ERROR(GetDirectoryContents(path, &filenames)); for (auto itr = filenames.begin(); itr != filenames.end(); ++itr) { contents.insert(JoinPath({path, *itr})); } while (contents.size() != 0) { std::set tmp_contents = contents; contents.clear(); for (auto iter = tmp_contents.begin(); iter != tmp_contents.end(); ++iter) { bool is_subdir; std::string gcs_fpath = *iter; std::string gcs_removed_path = gcs_fpath.substr(path.size()); std::string local_fpath = JoinPath({(*localized)->Path(), gcs_removed_path}); RETURN_IF_ERROR(IsDirectory(gcs_fpath, &is_subdir)); if (is_subdir) { // Create local mirror of sub-directories #ifdef _WIN32 int status = mkdir(const_cast(local_fpath.c_str())); #else int status = mkdir( const_cast(local_fpath.c_str()), S_IRUSR | S_IWUSR | S_IXUSR); #endif if (status == -1) { return Status( Status::Code::INTERNAL, "Failed to create local folder: " + local_fpath + ", errno:" + strerror(errno)); } // Add sub-directories and deeper files to contents std::set subdir_contents; RETURN_IF_ERROR(GetDirectoryContents(gcs_fpath, &subdir_contents)); for (auto itr = subdir_contents.begin(); itr != subdir_contents.end(); ++itr) { contents.insert(JoinPath({gcs_fpath, *itr})); } } else { // Create local copy of file std::string file_bucket, file_object; RETURN_IF_ERROR(ParsePath(gcs_fpath, &file_bucket, &file_object)); // Send a request to read the object gcs::ObjectReadStream filestream = client_->ReadObject(file_bucket, file_object); if (!filestream) { return Status( Status::Code::INTERNAL, "Failed to get object at " + *iter + " : " + filestream.status().message()); } std::string gcs_removed_path = (*iter).substr(path.size()); std::string local_file_path = JoinPath({(*localized)->Path(), gcs_removed_path}); std::ofstream output_file(local_file_path.c_str(), std::ios::binary); output_file << filestream.rdbuf(); output_file.close(); } } } return Status::Success; } Status GCSFileSystem::WriteTextFile( const std::string& path, const std::string& contents) { return Status( Status::Code::UNSUPPORTED, "Write text file operation not yet implemented " + path); } Status GCSFileSystem::WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) { return Status( Status::Code::UNSUPPORTED, "Write text file operation not yet implemented " + path); } Status GCSFileSystem::MakeDirectory(const std::string& dir, const bool recursive) { return Status( Status::Code::UNSUPPORTED, "Make temporary directory operation not yet implemented"); } Status GCSFileSystem::MakeTemporaryDirectory(std::string* temp_dir) { return Status( Status::Code::UNSUPPORTED, "Make temporary directory operation not yet implemented"); } Status GCSFileSystem::DeletePath(const std::string& path) { return Status( Status::Code::UNSUPPORTED, "Delete path operation not yet implemented"); } #endif // TRITON_ENABLE_GCS #ifdef TRITON_ENABLE_AZURE_STORAGE namespace as = azure::storage_lite; const std::string AS_URL_PATTERN = "as://([^/]+)/([^/?]+)(?:/([^?]*))?(\\?.*)?"; struct ASCredential { std::string account_str_; std::string account_key_; ASCredential(); // from env var ASCredential(triton::common::TritonJson::Value& cred_json); }; ASCredential::ASCredential() { const auto to_str = [](const char* s) -> std::string { return (s != nullptr ? std::string(s) : ""); }; const char* account_str = std::getenv("AZURE_STORAGE_ACCOUNT"); const char* account_key = std::getenv("AZURE_STORAGE_KEY"); account_str_ = to_str(account_str); account_key_ = to_str(account_key); } ASCredential::ASCredential(triton::common::TritonJson::Value& cred_json) { triton::common::TritonJson::Value account_str_json, account_key_json; if (cred_json.Find("account_str", &account_str_json)) account_str_json.AsString(&account_str_); if (cred_json.Find("account_key", &account_key_json)) account_key_json.AsString(&account_key_); } class ASFileSystem : public FileSystem { public: ASFileSystem(const std::string& path, const ASCredential& as_cred); Status CheckClient(); // unify with S3 interface Status CheckClient(const std::string& path) { return CheckClient(); } Status FileExists(const std::string& path, bool* exists) override; Status IsDirectory(const std::string& path, bool* is_dir) override; Status FileModificationTime( const std::string& path, int64_t* mtime_ns) override; Status GetDirectoryContents( const std::string& path, std::set* contents) override; Status GetDirectorySubdirs( const std::string& path, std::set* subdirs) override; Status GetDirectoryFiles( const std::string& path, std::set* files) override; Status ReadTextFile(const std::string& path, std::string* contents) override; Status LocalizePath( const std::string& path, std::shared_ptr* localized) override; Status WriteTextFile( const std::string& path, const std::string& contents) override; Status WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) override; Status MakeDirectory(const std::string& dir, const bool recursive) override; Status MakeTemporaryDirectory(std::string* temp_dir) override; Status DeletePath(const std::string& path) override; private: Status ParsePath( const std::string& path, std::string* bucket, std::string* object); std::shared_ptr client_; Status ListDirectory( const std::string& path, const std::string& dir_path, std::function< Status(const as::list_blobs_segmented_item&, const std::string&)> func); Status DownloadFolder( const std::string& container, const std::string& path, const std::string& dest); re2::RE2 as_regex_; }; Status ASFileSystem::ParsePath( const std::string& path, std::string* container, std::string* object) { std::string host_name, query; if (!RE2::FullMatch(path, as_regex_, &host_name, container, object, &query)) { return Status( Status::Code::INTERNAL, "Invalid azure storage path: " + path); } return Status::Success; } ASFileSystem::ASFileSystem(const std::string& path, const ASCredential& as_cred) : as_regex_(AS_URL_PATTERN) { std::shared_ptr account = nullptr; std::string host_name, container, blob_path, query; if (RE2::FullMatch( path, as_regex_, &host_name, &container, &blob_path, &query)) { size_t pos = host_name.rfind(".blob.core.windows.net"); std::string account_name; if (as_cred.account_str_.empty()) { if (pos != std::string::npos) { account_name = host_name.substr(0, pos); } else { account_name = host_name; } } else { account_name = as_cred.account_str_; } std::shared_ptr cred; if (!as_cred.account_key_.empty()) { // Shared Key cred = std::make_shared( account_name, as_cred.account_key_); } else { cred = std::make_shared(); } account = std::make_shared( account_name, cred, /* use_https */ true); client_ = std::make_shared(account, /*max_concurrency*/ 16); } } Status ASFileSystem::CheckClient() { if (client_ == nullptr) { return Status( Status::Code::INTERNAL, "Unable to create Azure filesystem client. Check account credentials."); } return Status::Success; } Status ASFileSystem::FileModificationTime(const std::string& path, int64_t* mtime_ns) { as::blob_client_wrapper bc(client_); std::string container, object_path; RETURN_IF_ERROR(ParsePath(path, &container, &object_path)); auto blobProperty = bc.get_blob_property(container, object_path); if (errno != 0) { return Status( Status::Code::INTERNAL, "Unable to get blob property for file at " + path + ", errno:" + strerror(errno)); } auto time = std::chrono::system_clock::from_time_t(blobProperty.last_modified); auto update_time = std::chrono::time_point_cast(time) .time_since_epoch() .count(); *mtime_ns = update_time; return Status::Success; }; Status ASFileSystem::ListDirectory( const std::string& container, const std::string& dir_path, std::function< Status(const as::list_blobs_segmented_item&, const std::string&)> func) { as::blob_client_wrapper bc(client_); // Append a slash to make it easier to list contents std::string full_dir = AppendSlash(dir_path); auto blobs = bc.list_blobs_segmented(container, "/", "", full_dir); if (errno != 0) { return Status( Status::Code::INTERNAL, "Failed to get contents of directory " + dir_path + ", errno:" + strerror(errno)); } for (auto&& item : blobs.blobs) { std::string name = item.name; int item_start = name.find(full_dir) + full_dir.size(); int item_end = name.find("/", item_start); // Let set take care of subdirectory contents std::string subfile = name.substr(item_start, item_end - item_start); auto status = func(item, subfile); if (!status.IsOk()) { return status; } } return Status::Success; } Status ASFileSystem::GetDirectoryContents( const std::string& path, std::set* contents) { auto func = [&](const as::list_blobs_segmented_item& item, const std::string& dir) { contents->insert(dir); return Status::Success; }; std::string container, dir_path; RETURN_IF_ERROR(ParsePath(path, &container, &dir_path)); return ListDirectory(container, dir_path, func); } Status ASFileSystem::GetDirectorySubdirs( const std::string& path, std::set* subdirs) { auto func = [&](const as::list_blobs_segmented_item& item, const std::string& dir) { if (item.is_directory) { subdirs->insert(dir); } return Status::Success; }; std::string container, dir_path; RETURN_IF_ERROR(ParsePath(path, &container, &dir_path)); return ListDirectory(container, dir_path, func); } Status ASFileSystem::GetDirectoryFiles( const std::string& path, std::set* files) { auto func = [&](const as::list_blobs_segmented_item& item, const std::string& file) { if (!item.is_directory) { files->insert(file); } return Status::Success; }; std::string container, dir_path; RETURN_IF_ERROR(ParsePath(path, &container, &dir_path)); return ListDirectory(container, dir_path, func); } Status ASFileSystem::IsDirectory(const std::string& path, bool* is_dir) { *is_dir = false; std::string container, object_path; RETURN_IF_ERROR(ParsePath(path, &container, &object_path)); as::blob_client_wrapper bc(client_); auto blobs = bc.list_blobs_segmented(container, "/", "", object_path, 1); if (errno != 0) { return Status( Status::Code::INTERNAL, "Failed to check if directory at " + path + ", errno:" + strerror(errno)); } *is_dir = blobs.blobs.size() > 0; return Status::Success; }; Status ASFileSystem::ReadTextFile(const std::string& path, std::string* contents) { as::blob_client_wrapper bc(client_); std::string container, object_path; RETURN_IF_ERROR(ParsePath(path, &container, &object_path)); using namespace azure::storage_lite; std::ostringstream out_stream; bc.download_blob_to_stream(container, object_path, 0, 0, out_stream); if (errno != 0) { return Status( Status::Code::INTERNAL, "Failed to fetch file stream at " + path + ", errno:" + strerror(errno)); } *contents = out_stream.str(); return Status::Success; } Status ASFileSystem::FileExists(const std::string& path, bool* exists) { *exists = false; std::string container, object; RETURN_IF_ERROR(ParsePath(path, &container, &object)); as::blob_client_wrapper bc(client_); auto blobs = bc.list_blobs_segmented(container, "/", "", object, 1); if (errno != 0) { return Status( Status::Code::INTERNAL, "Failed to check if file exists at " + path + ", errno:" + strerror(errno)); } if (blobs.blobs.size() > 0) { *exists = true; } return Status::Success; } Status ASFileSystem::DownloadFolder( const std::string& container, const std::string& path, const std::string& dest) { as::blob_client_wrapper bc(client_); auto func = [&](const as::list_blobs_segmented_item& item, const std::string& dir) { auto local_path = JoinPath({dest, dir}); auto blob_path = JoinPath({path, dir}); if (item.is_directory) { int status = mkdir( const_cast(local_path.c_str()), S_IRUSR | S_IWUSR | S_IXUSR); if (status == -1) { return Status( Status::Code::INTERNAL, "Failed to create local folder: " + local_path + ", errno:" + strerror(errno)); } auto ret = DownloadFolder(container, blob_path, local_path); if (!ret.IsOk()) { return ret; } } else { time_t last_modified; bc.download_blob_to_file(container, blob_path, local_path, last_modified); if (errno != 0) { return Status( Status::Code::INTERNAL, "Failed to download file at " + blob_path + ", errno:" + strerror(errno)); } } return Status::Success; }; return ListDirectory(container, path, func); } Status ASFileSystem::LocalizePath( const std::string& path, std::shared_ptr* localized) { bool exists; RETURN_IF_ERROR(FileExists(path, &exists)); if (!exists) { return Status( Status::Code::INTERNAL, "directory or file does not exist at " + path); } bool is_dir; RETURN_IF_ERROR(IsDirectory(path, &is_dir)); if (!is_dir) { return Status( Status::Code::UNSUPPORTED, "AS file localization not yet implemented " + path); } std::string folder_template = "/tmp/folderXXXXXX"; char* tmp_folder = mkdtemp(const_cast(folder_template.c_str())); if (tmp_folder == nullptr) { return Status( Status::Code::INTERNAL, "Failed to create local temp folder: " + folder_template + ", errno:" + strerror(errno)); } localized->reset(new LocalizedPath(path, tmp_folder)); std::string dest(folder_template); as::blob_client_wrapper bc(client_); std::string container, object; RETURN_IF_ERROR(ParsePath(path, &container, &object)); return DownloadFolder(container, object, dest); } Status ASFileSystem::WriteTextFile( const std::string& path, const std::string& contents) { std::stringstream ss(contents); std::istream is(ss.rdbuf()); std::string container, object; RETURN_IF_ERROR(ParsePath(path, &container, &object)); std::vector> metadata; auto ret = client_->upload_block_blob_from_stream(container, object, is, metadata) .get(); if (!ret.success()) { return Status( Status::Code::INTERNAL, "Failed to upload blob, Error: " + ret.error().code + ", " + ret.error().code_name); } return Status::Success; } Status ASFileSystem::WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) { return Status( Status::Code::UNSUPPORTED, "Write text file operation not yet implemented " + path); } Status ASFileSystem::MakeDirectory(const std::string& dir, const bool recursive) { return Status( Status::Code::UNSUPPORTED, "Make directory operation not yet implemented"); } Status ASFileSystem::MakeTemporaryDirectory(std::string* temp_dir) { return Status( Status::Code::UNSUPPORTED, "Make temporary directory operation not yet implemented"); } Status ASFileSystem::DeletePath(const std::string& path) { return Status( Status::Code::UNSUPPORTED, "Delete path operation not yet implemented"); } #endif // TRITON_ENABLE_AZURE_STORAGE #ifdef TRITON_ENABLE_S3 namespace s3 = Aws::S3; struct S3Credential { std::string secret_key_; std::string key_id_; std::string region_; std::string session_token_; std::string profile_name_; S3Credential(); // from env var S3Credential(triton::common::TritonJson::Value& cred_json); }; S3Credential::S3Credential() { const auto to_str = [](const char* s) -> std::string { return (s != nullptr ? std::string(s) : ""); }; const char* secret_key = std::getenv("AWS_SECRET_ACCESS_KEY"); const char* key_id = std::getenv("AWS_ACCESS_KEY_ID"); const char* region = std::getenv("AWS_DEFAULT_REGION"); const char* session_token = std::getenv("AWS_SESSION_TOKEN"); const char* profile = std::getenv("AWS_PROFILE"); secret_key_ = to_str(secret_key); key_id_ = to_str(key_id); region_ = to_str(region); session_token_ = to_str(session_token); profile_name_ = to_str(profile); } S3Credential::S3Credential(triton::common::TritonJson::Value& cred_json) { triton::common::TritonJson::Value secret_key_json, key_id_json, region_json, session_token_json, profile_json; if (cred_json.Find("secret_key", &secret_key_json)) secret_key_json.AsString(&secret_key_); if (cred_json.Find("key_id", &key_id_json)) key_id_json.AsString(&key_id_); if (cred_json.Find("region", ®ion_json)) region_json.AsString(®ion_); if (cred_json.Find("session_token", &session_token_json)) session_token_json.AsString(&session_token_); if (cred_json.Find("profile", &profile_json)) profile_json.AsString(&profile_name_); } class S3FileSystem : public FileSystem { public: S3FileSystem(const std::string& s3_path, const S3Credential& s3_cred); Status CheckClient(const std::string& s3_path); Status FileExists(const std::string& path, bool* exists) override; Status IsDirectory(const std::string& path, bool* is_dir) override; Status FileModificationTime( const std::string& path, int64_t* mtime_ns) override; Status GetDirectoryContents( const std::string& path, std::set* contents) override; Status GetDirectorySubdirs( const std::string& path, std::set* subdirs) override; Status GetDirectoryFiles( const std::string& path, std::set* files) override; Status ReadTextFile(const std::string& path, std::string* contents) override; Status LocalizePath( const std::string& path, std::shared_ptr* localized) override; Status WriteTextFile( const std::string& path, const std::string& contents) override; Status WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) override; Status MakeDirectory(const std::string& dir, const bool recursive) override; Status MakeTemporaryDirectory(std::string* temp_dir) override; Status DeletePath(const std::string& path) override; private: Status ParsePath( const std::string& path, std::string* bucket, std::string* object); Status CleanPath(const std::string& s3_path, std::string* clean_path); std::unique_ptr client_; // init after Aws::InitAPI is called re2::RE2 s3_regex_; }; Status S3FileSystem::ParsePath( const std::string& path, std::string* bucket, std::string* object) { // Cleanup extra slashes std::string clean_path; RETURN_IF_ERROR(CleanPath(path, &clean_path)); // Get the bucket name and the object path. Return error if path is malformed std::string protocol, host_name, host_port; if (!RE2::FullMatch( clean_path, s3_regex_, &protocol, &host_name, &host_port, bucket, object)) { int bucket_start = clean_path.find("s3://") + strlen("s3://"); int bucket_end = clean_path.find("/", bucket_start); // If there isn't a slash, the address has only the bucket if (bucket_end > bucket_start) { *bucket = clean_path.substr(bucket_start, bucket_end - bucket_start); *object = clean_path.substr(bucket_end + 1); } else { *bucket = clean_path.substr(bucket_start); *object = ""; } } else { // Erase leading '/' that is left behind in object name if ((*object)[0] == '/') { object->erase(0, 1); } } if (bucket->empty()) { return Status( Status::Code::INTERNAL, "No bucket name found in path: " + path); } return Status::Success; } Status S3FileSystem::CleanPath(const std::string& s3_path, std::string* clean_path) { // Must handle paths with s3 prefix size_t start = s3_path.find("s3://"); std::string path = ""; if (start != std::string::npos) { path = s3_path.substr(start + strlen("s3://")); *clean_path = "s3://"; } else { path = s3_path; *clean_path = ""; } // Must handle paths with https:// or http:// prefix size_t https_start = path.find("https://"); if (https_start != std::string::npos) { path = path.substr(https_start + strlen("https://")); *clean_path += "https://"; } else { size_t http_start = path.find("http://"); if (http_start != std::string::npos) { path = path.substr(http_start + strlen("http://")); *clean_path += "http://"; } } // Remove trailing slashes size_t rtrim_length = path.find_last_not_of('/'); if (rtrim_length == std::string::npos) { return Status( Status::Code::INVALID_ARG, "Invalid bucket name: '" + path + "'"); } // Remove leading slashes size_t ltrim_length = path.find_first_not_of('/'); if (ltrim_length == std::string::npos) { return Status( Status::Code::INVALID_ARG, "Invalid bucket name: '" + path + "'"); } // Remove extra internal slashes std::string true_path = path.substr(ltrim_length, rtrim_length + 1); std::vector slash_locations; bool previous_slash = false; for (size_t i = 0; i < true_path.size(); i++) { if (true_path[i] == '/') { if (!previous_slash) { *clean_path += true_path[i]; } previous_slash = true; } else { *clean_path += true_path[i]; previous_slash = false; } } return Status::Success; } S3FileSystem::S3FileSystem( const std::string& s3_path, const S3Credential& s3_cred) : s3_regex_( "s3://(http://|https://|)([0-9a-zA-Z\\-.]+):([0-9]+)/" "([0-9a-z.\\-]+)(((/[0-9a-zA-Z.\\-_]+)*)?)") { // init aws api if not already Aws::SDKOptions options; static std::once_flag onceFlag; std::call_once(onceFlag, [&options] { Aws::InitAPI(options); }); Aws::Client::ClientConfiguration config; Aws::Auth::AWSCredentials credentials; // check vars for S3 credentials -> aws profile -> default if (!s3_cred.secret_key_.empty() && !s3_cred.key_id_.empty()) { credentials.SetAWSAccessKeyId(s3_cred.key_id_.c_str()); credentials.SetAWSSecretKey(s3_cred.secret_key_.c_str()); if (!s3_cred.session_token_.empty()) { credentials.SetSessionToken(s3_cred.session_token_.c_str()); } config = Aws::Client::ClientConfiguration(); if (!s3_cred.region_.empty()) { config.region = s3_cred.region_.c_str(); } } else if (!s3_cred.profile_name_.empty()) { config = Aws::Client::ClientConfiguration(s3_cred.profile_name_.c_str()); } else { config = Aws::Client::ClientConfiguration("default"); } // Cleanup extra slashes std::string clean_path; LOG_STATUS_ERROR(CleanPath(s3_path, &clean_path), "failed to parse S3 path"); std::string protocol, host_name, host_port, bucket, object; if (RE2::FullMatch( clean_path, s3_regex_, &protocol, &host_name, &host_port, &bucket, &object)) { config.endpointOverride = Aws::String(host_name + ":" + host_port); if (protocol == "https://") { config.scheme = Aws::Http::Scheme::HTTPS; } else { config.scheme = Aws::Http::Scheme::HTTP; } } if (!s3_cred.secret_key_.empty() && !s3_cred.key_id_.empty()) { client_ = std::make_unique( credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, /*useVirtualAdressing*/ false); } else { client_ = std::make_unique( config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, /*useVirtualAdressing*/ false); } } Status S3FileSystem::CheckClient(const std::string& s3_path) { std::string bucket, object_path; RETURN_IF_ERROR(ParsePath(s3_path, &bucket, &object_path)); // check if can connect to the bucket s3::Model::HeadBucketRequest head_request; head_request.WithBucket(bucket.c_str()); if (!client_->HeadBucket(head_request).IsSuccess()) { return Status( Status::Code::INTERNAL, "Unable to create S3 filesystem client. Check account credentials."); } return Status::Success; } Status S3FileSystem::FileExists(const std::string& path, bool* exists) { *exists = false; // S3 doesn't make objects for directories, so it could still be a directory bool is_dir; RETURN_IF_ERROR(IsDirectory(path, &is_dir)); if (is_dir) { *exists = is_dir; return Status::Success; } std::string bucket, object; RETURN_IF_ERROR(ParsePath(path, &bucket, &object)); // Construct request for object metadata s3::Model::HeadObjectRequest head_request; head_request.SetBucket(bucket.c_str()); head_request.SetKey(object.c_str()); auto head_object_outcome = client_->HeadObject(head_request); if (!head_object_outcome.IsSuccess()) { if (head_object_outcome.GetError().GetErrorType() != s3::S3Errors::RESOURCE_NOT_FOUND) { return Status( Status::Code::INTERNAL, "Could not get MetaData for object at " + path + " due to exception: " + head_object_outcome.GetError().GetExceptionName() + ", error message: " + head_object_outcome.GetError().GetMessage()); } } else { *exists = true; } return Status::Success; } Status S3FileSystem::IsDirectory(const std::string& path, bool* is_dir) { *is_dir = false; std::string bucket, object_path; RETURN_IF_ERROR(ParsePath(path, &bucket, &object_path)); // Check if the bucket exists s3::Model::HeadBucketRequest head_request; head_request.WithBucket(bucket.c_str()); auto head_bucket_outcome = client_->HeadBucket(head_request); if (!head_bucket_outcome.IsSuccess()) { return Status( Status::Code::INTERNAL, "Could not get MetaData for bucket with name " + bucket + " due to exception: " + head_bucket_outcome.GetError().GetExceptionName() + ", error message: " + head_bucket_outcome.GetError().GetMessage()); } // Root case - bucket exists and object path is empty if (object_path.empty()) { *is_dir = true; return Status::Success; } // List the objects in the bucket s3::Model::ListObjectsRequest list_objects_request; list_objects_request.SetBucket(bucket.c_str()); list_objects_request.SetPrefix(AppendSlash(object_path).c_str()); auto list_objects_outcome = client_->ListObjects(list_objects_request); if (list_objects_outcome.IsSuccess()) { *is_dir = !list_objects_outcome.GetResult().GetContents().empty(); } else { return Status( Status::Code::INTERNAL, "Failed to list objects with prefix " + path + " due to exception: " + list_objects_outcome.GetError().GetExceptionName() + ", error message: " + list_objects_outcome.GetError().GetMessage()); } return Status::Success; } Status S3FileSystem::FileModificationTime(const std::string& path, int64_t* mtime_ns) { // We don't need to worry about the case when this is a directory bool is_dir; RETURN_IF_ERROR(IsDirectory(path, &is_dir)); if (is_dir) { *mtime_ns = 0; return Status::Success; } std::string bucket, object; RETURN_IF_ERROR(ParsePath(path, &bucket, &object)); // Send a request for the objects metadata s3::Model::HeadObjectRequest head_request; head_request.SetBucket(bucket.c_str()); head_request.SetKey(object.c_str()); // If request succeeds, copy over the modification time auto head_object_outcome = client_->HeadObject(head_request); if (head_object_outcome.IsSuccess()) { *mtime_ns = head_object_outcome.GetResult().GetLastModified().Millis() * NANOS_PER_MILLIS; } else { return Status( Status::Code::INTERNAL, "Failed to get modification time for object at " + path + " due to exception: " + head_object_outcome.GetError().GetExceptionName() + ", error message: " + head_object_outcome.GetError().GetMessage()); } return Status::Success; } Status S3FileSystem::GetDirectoryContents( const std::string& path, std::set* contents) { // Parse bucket and dir_path std::string bucket, dir_path, full_dir; RETURN_IF_ERROR(ParsePath(path, &bucket, &dir_path)); std::string true_path = "s3://" + bucket + '/' + dir_path; // Capture the full path to facilitate content listing full_dir = AppendSlash(dir_path); // Issue request for objects with prefix s3::Model::ListObjectsRequest objects_request; objects_request.SetBucket(bucket.c_str()); objects_request.SetPrefix(full_dir.c_str()); auto list_objects_outcome = client_->ListObjects(objects_request); if (list_objects_outcome.IsSuccess()) { Aws::Vector object_list = list_objects_outcome.GetResult().GetContents(); for (auto const& s3_object : object_list) { // In the case of empty directories, the directory itself will appear here if (s3_object.GetKey().c_str() == full_dir) { continue; } // We have to make sure that subdirectory contents do not appear here std::string name(s3_object.GetKey().c_str()); int item_start = name.find(full_dir) + full_dir.size(); // S3 response prepends parent directory name int item_end = name.find("/", item_start); // Let set take care of subdirectory contents std::string item = name.substr(item_start, item_end - item_start); contents->insert(item); } } else { return Status( Status::Code::INTERNAL, "Could not list contents of directory at " + true_path + " due to exception: " + list_objects_outcome.GetError().GetExceptionName() + ", error message: " + list_objects_outcome.GetError().GetMessage()); } return Status::Success; } Status S3FileSystem::GetDirectorySubdirs( const std::string& path, std::set* subdirs) { // Parse bucket and dir_path std::string bucket, dir_path; RETURN_IF_ERROR(ParsePath(path, &bucket, &dir_path)); std::string true_path = "s3://" + bucket + '/' + dir_path; RETURN_IF_ERROR(GetDirectoryContents(true_path, subdirs)); // Erase non-directory entries... for (auto iter = subdirs->begin(); iter != subdirs->end();) { bool is_dir; RETURN_IF_ERROR(IsDirectory(JoinPath({true_path, *iter}), &is_dir)); if (!is_dir) { iter = subdirs->erase(iter); } else { ++iter; } } return Status::Success; } Status S3FileSystem::GetDirectoryFiles( const std::string& path, std::set* files) { // Parse bucket and dir_path std::string bucket, dir_path; RETURN_IF_ERROR(ParsePath(path, &bucket, &dir_path)); std::string true_path = "s3://" + bucket + '/' + dir_path; RETURN_IF_ERROR(GetDirectoryContents(true_path, files)); // Erase directory entries... for (auto iter = files->begin(); iter != files->end();) { bool is_dir; RETURN_IF_ERROR(IsDirectory(JoinPath({true_path, *iter}), &is_dir)); if (is_dir) { iter = files->erase(iter); } else { ++iter; } } return Status::Success; } Status S3FileSystem::ReadTextFile(const std::string& path, std::string* contents) { bool exists; RETURN_IF_ERROR(FileExists(path, &exists)); if (!exists) { return Status(Status::Code::INTERNAL, "File does not exist at " + path); } std::string bucket, object; RETURN_IF_ERROR(ParsePath(path, &bucket, &object)); // Send a request for the objects metadata s3::Model::GetObjectRequest object_request; object_request.SetBucket(bucket.c_str()); object_request.SetKey(object.c_str()); auto get_object_outcome = client_->GetObject(object_request); if (get_object_outcome.IsSuccess()) { auto& object_result = get_object_outcome.GetResultWithOwnership().GetBody(); std::string data = ""; char c; while (object_result.get(c)) { data += c; } *contents = data; } else { return Status( Status::Code::INTERNAL, "Failed to get object at " + path + " due to exception: " + get_object_outcome.GetError().GetExceptionName() + ", error message: " + get_object_outcome.GetError().GetMessage()); } return Status::Success; } Status S3FileSystem::LocalizePath( const std::string& path, std::shared_ptr* localized) { // Check if the directory or file exists bool exists; RETURN_IF_ERROR(FileExists(path, &exists)); if (!exists) { return Status( Status::Code::INTERNAL, "directory or file does not exist at " + path); } // Cleanup extra slashes std::string clean_path; RETURN_IF_ERROR(CleanPath(path, &clean_path)); // Remove protocol and host name and port std::string effective_path, protocol, host_name, host_port, bucket, object; if (RE2::FullMatch( clean_path, s3_regex_, &protocol, &host_name, &host_port, &bucket, &object)) { effective_path = "s3://" + bucket + object; } else { effective_path = path; } // Create temporary directory std::string tmp_folder; RETURN_IF_ERROR( triton::core::MakeTemporaryDirectory(FileSystemType::LOCAL, &tmp_folder)); // Specify contents to be downloaded std::set contents; bool is_dir; RETURN_IF_ERROR(IsDirectory(path, &is_dir)); if (is_dir) { // Set localized path localized->reset(new LocalizedPath(effective_path, tmp_folder)); // Specify the entire directory to be downloaded std::set filenames; RETURN_IF_ERROR(GetDirectoryContents(effective_path, &filenames)); for (auto itr = filenames.begin(); itr != filenames.end(); ++itr) { contents.insert(JoinPath({effective_path, *itr})); } } else { // Set localized path std::string filename = effective_path.substr(effective_path.find_last_of('/') + 1); localized->reset( new LocalizedPath(effective_path, JoinPath({tmp_folder, filename}))); // Specify only the file to be downloaded contents.insert(effective_path); } // Download all specified contents and nested contents while (contents.size() != 0) { std::set tmp_contents = contents; contents.clear(); for (auto iter = tmp_contents.begin(); iter != tmp_contents.end(); ++iter) { std::string s3_fpath = *iter; std::string s3_removed_path = s3_fpath.substr(effective_path.size()); std::string local_fpath = s3_removed_path.empty() ? (*localized)->Path() : JoinPath({(*localized)->Path(), s3_removed_path}); bool is_subdir; RETURN_IF_ERROR(IsDirectory(s3_fpath, &is_subdir)); if (is_subdir) { // Create local mirror of sub-directories #ifdef _WIN32 int status = mkdir(const_cast(local_fpath.c_str())); #else int status = mkdir( const_cast(local_fpath.c_str()), S_IRUSR | S_IWUSR | S_IXUSR); #endif if (status == -1) { return Status( Status::Code::INTERNAL, "Failed to create local folder: " + local_fpath + ", errno:" + strerror(errno)); } // Add sub-directories and deeper files to contents std::set subdir_contents; RETURN_IF_ERROR(GetDirectoryContents(s3_fpath, &subdir_contents)); for (auto itr = subdir_contents.begin(); itr != subdir_contents.end(); ++itr) { contents.insert(JoinPath({s3_fpath, *itr})); } } else { // Create local copy of file std::string file_bucket, file_object; RETURN_IF_ERROR(ParsePath(s3_fpath, &file_bucket, &file_object)); s3::Model::GetObjectRequest object_request; object_request.SetBucket(file_bucket.c_str()); object_request.SetKey(file_object.c_str()); auto get_object_outcome = client_->GetObject(object_request); if (get_object_outcome.IsSuccess()) { auto& retrieved_file = get_object_outcome.GetResultWithOwnership().GetBody(); std::ofstream output_file(local_fpath.c_str(), std::ios::binary); output_file << retrieved_file.rdbuf(); output_file.close(); } else { return Status( Status::Code::INTERNAL, "Failed to get object at " + s3_fpath + " due to exception: " + get_object_outcome.GetError().GetExceptionName() + ", error message: " + get_object_outcome.GetError().GetMessage()); } } } } return Status::Success; } Status S3FileSystem::WriteTextFile( const std::string& path, const std::string& contents) { return Status( Status::Code::UNSUPPORTED, "Write text file operation not yet implemented " + path); } Status S3FileSystem::WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) { return Status( Status::Code::UNSUPPORTED, "Write text file operation not yet implemented " + path); } Status S3FileSystem::MakeDirectory(const std::string& dir, const bool recursive) { return Status( Status::Code::UNSUPPORTED, "Make directory operation not yet implemented"); } Status S3FileSystem::MakeTemporaryDirectory(std::string* temp_dir) { return Status( Status::Code::UNSUPPORTED, "Make temporary directory operation not yet implemented"); } Status S3FileSystem::DeletePath(const std::string& path) { return Status( Status::Code::UNSUPPORTED, "Delete path operation not yet implemented"); } #endif // TRITON_ENABLE_S3 class FileSystemManager { public: Status GetFileSystem( const std::string& path, std::shared_ptr& file_system); Status GetFileSystem( FileSystemType type, std::shared_ptr& file_system); FileSystemManager(); private: template Status GetFileSystem( const std::string& path, CacheType& cache, std::shared_ptr& file_system); template Status ReturnErrorOrReload( const Status& load_status, const Status& error_status, const std::string& path, CacheType& cache, std::shared_ptr& file_system); Status LoadCredentials(bool flush_cache = false); template static void LoadCredential( triton::common::TritonJson::Value& creds_json, const char* fs_type, CacheType& cache); template static void SortCache( std::vector>>& cache); template static Status GetLongestMatchingNameIndex( const std::vector>>& cache, const std::string& path, size_t& idx); std::shared_ptr local_fs_; std::mutex mu_; // protect concurrent access into variables bool is_cached_; // if name and credential is cached, lazy load file system // cloud credential cache should be sorted in descending name length order // [(name_long, credential, file_system), (name, ...)] #ifdef TRITON_ENABLE_GCS std::vector< std::tuple>> gs_cache_; #endif // TRITON_ENABLE_GCS #ifdef TRITON_ENABLE_S3 std::vector< std::tuple>> s3_cache_; #endif // TRITON_ENABLE_S3 #ifdef TRITON_ENABLE_AZURE_STORAGE std::vector< std::tuple>> as_cache_; #endif // TRITON_ENABLE_AZURE_STORAGE }; FileSystemManager::FileSystemManager() : local_fs_(new LocalFileSystem()), is_cached_(false) { } Status FileSystemManager::GetFileSystem( const std::string& path, std::shared_ptr& file_system) { // Check if this is a GCS path (gs://$BUCKET_NAME) if (!path.empty() && !path.rfind("gs://", 0)) { #ifndef TRITON_ENABLE_GCS return Status( Status::Code::INTERNAL, "gs:// file-system not supported. To enable, build with " "-DTRITON_ENABLE_GCS=ON."); #else return GetFileSystem< std::vector>>, GCSCredential, GCSFileSystem>(path, gs_cache_, file_system); #endif // TRITON_ENABLE_GCS } // Check if this is an S3 path (s3://$BUCKET_NAME) if (!path.empty() && !path.rfind("s3://", 0)) { #ifndef TRITON_ENABLE_S3 return Status( Status::Code::INTERNAL, "s3:// file-system not supported. To enable, build with " "-DTRITON_ENABLE_S3=ON."); #else return GetFileSystem< std::vector>>, S3Credential, S3FileSystem>(path, s3_cache_, file_system); #endif // TRITON_ENABLE_S3 } // Check if this is an Azure Storage path if (!path.empty() && !path.rfind("as://", 0)) { #ifndef TRITON_ENABLE_AZURE_STORAGE return Status( Status::Code::INTERNAL, "as:// file-system not supported. To enable, build with " "-DTRITON_ENABLE_AZURE_STORAGE=ON."); #else return GetFileSystem< std::vector>>, ASCredential, ASFileSystem>(path, as_cache_, file_system); #endif // TRITON_ENABLE_AZURE_STORAGE } // Assume path is for local filesystem file_system = local_fs_; return Status::Success; } Status FileSystemManager::GetFileSystem( FileSystemType type, std::shared_ptr& file_system) { // only LOCAL and GCS are not path-dependent and can be accessed by type switch (type) { case FileSystemType::LOCAL: return GetFileSystem("", file_system); case FileSystemType::GCS: return GetFileSystem("gs://", file_system); case FileSystemType::S3: return Status( Status::Code::UNSUPPORTED, "S3 filesystem cannot be accessed by type"); case FileSystemType::AS: return Status( Status::Code::UNSUPPORTED, "AS filesystem cannot be accessed by type"); default: return Status(Status::Code::UNSUPPORTED, "Unsupported filesystem type"); } } template Status FileSystemManager::GetFileSystem( const std::string& path, CacheType& cache, std::shared_ptr& file_system) { const Status& cred_status = LoadCredentials(); if (cred_status.IsOk() || cred_status.StatusCode() == Status::Code::ALREADY_EXISTS) { // Find credential size_t idx; const Status& match_status = GetLongestMatchingNameIndex(cache, path, idx); if (!match_status.IsOk()) { return ReturnErrorOrReload( cred_status, match_status, path, cache, file_system); } // Find or lazy load file system std::shared_ptr fs = std::get<2>(cache[idx]); if (fs == nullptr) { std::string cred_name = std::get<0>(cache[idx]); CredentialType cred = std::get<1>(cache[idx]); fs = std::make_shared(path, cred); cache[idx] = std::make_tuple(cred_name, cred, fs); } // Check client const Status& client_status = fs->CheckClient(path); if (!client_status.IsOk()) { return ReturnErrorOrReload( cred_status, client_status, path, cache, file_system); } // Return client file_system = fs; return Status::Success; } return cred_status; } template Status FileSystemManager::ReturnErrorOrReload( const Status& load_status, const Status& error_status, const std::string& path, CacheType& cache, std::shared_ptr& file_system) { if (load_status.StatusCode() == Status::Code::ALREADY_EXISTS) { return error_status; } LoadCredentials(true); // flush cache return GetFileSystem( path, cache, file_system); } // return status meaning: // - SUCCESS, "" -> loaded credential from file // - ALREADY_EXISTS, "Cached" -> credential already loaded Status FileSystemManager::LoadCredentials(bool flush_cache) { // prevent concurrent access into class variables std::lock_guard lock(mu_); // check if credential is already cached if (is_cached_ && !flush_cache) { return Status(Status::Code::ALREADY_EXISTS, "Cached"); } const char* file_path_c_str = std::getenv("TRITON_CLOUD_CREDENTIAL_PATH"); if (file_path_c_str != nullptr) { // Load from credential file std::string file_path = std::string(file_path_c_str); LOG_VERBOSE(1) << "Reading cloud credential from " << file_path; triton::common::TritonJson::Value creds_json; std::string cred_file_content; RETURN_IF_ERROR(local_fs_->ReadTextFile(file_path, &cred_file_content)); RETURN_IF_ERROR(creds_json.Parse(cred_file_content)); #ifdef TRITON_ENABLE_GCS // load GCS credentials LoadCredential< std::vector>>, GCSCredential, GCSFileSystem>(creds_json, "gs", gs_cache_); #endif // TRITON_ENABLE_GCS #ifdef TRITON_ENABLE_S3 // load S3 credentials LoadCredential< std::vector>>, S3Credential, S3FileSystem>(creds_json, "s3", s3_cache_); #endif // TRITON_ENABLE_S3 #ifdef TRITON_ENABLE_AZURE_STORAGE // load AS credentials LoadCredential< std::vector>>, ASCredential, ASFileSystem>(creds_json, "as", as_cache_); #endif // TRITON_ENABLE_AZURE_STORAGE } else { // Load from environment variables LOG_VERBOSE(1) << "TRITON_CLOUD_CREDENTIAL_PATH environment variable is " "not set, reading from environment variables"; #ifdef TRITON_ENABLE_GCS // load GCS credentials gs_cache_.clear(); gs_cache_.push_back( std::make_tuple("", GCSCredential(), std::shared_ptr())); #endif // TRITON_ENABLE_GCS #ifdef TRITON_ENABLE_S3 // load S3 credentials s3_cache_.clear(); s3_cache_.push_back( std::make_tuple("", S3Credential(), std::shared_ptr())); #endif // TRITON_ENABLE_S3 #ifdef TRITON_ENABLE_AZURE_STORAGE // load AS credentials as_cache_.clear(); as_cache_.push_back( std::make_tuple("", ASCredential(), std::shared_ptr())); #endif // TRITON_ENABLE_AZURE_STORAGE } is_cached_ = true; return Status::Success; } template void FileSystemManager::LoadCredential( triton::common::TritonJson::Value& creds_json, const char* fs_type, CacheType& cache) { cache.clear(); triton::common::TritonJson::Value creds_fs_json; if (creds_json.Find(fs_type, &creds_fs_json)) { std::vector cred_names; creds_fs_json.Members(&cred_names); for (size_t i = 0; i < cred_names.size(); i++) { std::string cred_name = cred_names[i]; triton::common::TritonJson::Value cred_json; creds_fs_json.Find(cred_name.c_str(), &cred_json); cache.push_back(std::make_tuple( cred_name, CredentialType(cred_json), std::shared_ptr())); } SortCache(cache); } } template void FileSystemManager::SortCache( std::vector>>& cache) { std::sort( cache.begin(), cache.end(), [](std::tuple< std::string, CredentialType, std::shared_ptr> a, std::tuple< std::string, CredentialType, std::shared_ptr> b) { return std::get<0>(a).size() >= std::get<0>(b).size(); }); } template Status FileSystemManager::GetLongestMatchingNameIndex( const std::vector>>& cache, const std::string& path, size_t& idx) { for (size_t i = 0; i < cache.size(); i++) { if (!path.rfind(std::get<0>(cache[i]), 0)) { idx = i; LOG_VERBOSE(1) << "Using credential " + std::get<0>(cache[i]) + " for path " + path; return Status::Success; } } return Status( Status::Code::NOT_FOUND, "Cannot match credential for path " + path); } static FileSystemManager fsm_; } // namespace // FIXME: Windows support '/'? If so, the below doesn't need to change bool IsAbsolutePath(const std::string& path) { return !path.empty() && (path[0] == '/'); } std::string JoinPath(std::initializer_list segments) { std::string joined; for (const auto& seg : segments) { if (joined.empty()) { joined = seg; } else if (IsAbsolutePath(seg)) { if (joined[joined.size() - 1] == '/') { joined.append(seg.substr(1)); } else { joined.append(seg); } } else { // !IsAbsolutePath(seg) if (joined[joined.size() - 1] != '/') { joined.append("/"); } joined.append(seg); } } return joined; } std::string BaseName(const std::string& path) { if (path.empty()) { return path; } size_t last = path.size() - 1; while ((last > 0) && (path[last] == '/')) { last -= 1; } if (path[last] == '/') { return std::string(); } const size_t idx = path.find_last_of("/", last); if (idx == std::string::npos) { return path.substr(0, last + 1); } return path.substr(idx + 1, last - idx); } std::string DirName(const std::string& path) { if (path.empty()) { return path; } size_t last = path.size() - 1; while ((last > 0) && (path[last] == '/')) { last -= 1; } if (path[last] == '/') { return std::string("/"); } const size_t idx = path.find_last_of("/", last); if (idx == std::string::npos) { return std::string("."); } if (idx == 0) { return std::string("/"); } return path.substr(0, idx); } Status FileExists(const std::string& path, bool* exists) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->FileExists(path, exists); } Status IsDirectory(const std::string& path, bool* is_dir) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->IsDirectory(path, is_dir); } Status FileModificationTime(const std::string& path, int64_t* mtime_ns) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->FileModificationTime(path, mtime_ns); } Status GetDirectoryContents(const std::string& path, std::set* contents) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->GetDirectoryContents(path, contents); } Status GetDirectorySubdirs(const std::string& path, std::set* subdirs) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->GetDirectorySubdirs(path, subdirs); } Status GetDirectoryFiles( const std::string& path, const bool skip_hidden_files, std::set* files) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); std::set all_files; RETURN_IF_ERROR(fs->GetDirectoryFiles(path, &all_files)); // Remove the hidden files for (auto f : all_files) { if ((f[0] != '.') || (!skip_hidden_files)) { files->insert(f); } } return Status::Success; } Status ReadTextFile(const std::string& path, std::string* contents) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->ReadTextFile(path, contents); } Status ReadTextProto(const std::string& path, google::protobuf::Message* msg) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); std::string contents; RETURN_IF_ERROR(fs->ReadTextFile(path, &contents)); if (!google::protobuf::TextFormat::ParseFromString(contents, msg)) { return Status( Status::Code::INTERNAL, "failed to read text proto from " + path); } return Status::Success; } Status LocalizePath(const std::string& path, std::shared_ptr* localized) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->LocalizePath(path, localized); } Status WriteTextProto(const std::string& path, const google::protobuf::Message& msg) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); std::string prototxt; if (!google::protobuf::TextFormat::PrintToString(msg, &prototxt)) { return Status( Status::Code::INTERNAL, "failed to write text proto to " + path); } return fs->WriteTextFile(path, prototxt); } Status WriteBinaryFile( const std::string& path, const char* contents, const size_t content_len) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->WriteBinaryFile(path, contents, content_len); } Status ReadBinaryProto(const std::string& path, google::protobuf::MessageLite* msg) { std::string msg_str; RETURN_IF_ERROR(ReadTextFile(path, &msg_str)); google::protobuf::io::CodedInputStream coded_stream( reinterpret_cast(msg_str.c_str()), msg_str.size()); coded_stream.SetTotalBytesLimit(INT_MAX); if (!msg->ParseFromCodedStream(&coded_stream)) { return Status( Status::Code::INTERNAL, "Can't parse " + path + " as binary proto"); } return Status::Success; } Status MakeDirectory(const std::string& dir, const bool recursive) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(dir, fs)); return fs->MakeDirectory(dir, recursive); } Status MakeTemporaryDirectory(const FileSystemType type, std::string* temp_dir) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(type, fs)); return fs->MakeTemporaryDirectory(temp_dir); } Status DeletePath(const std::string& path) { std::shared_ptr fs; RETURN_IF_ERROR(fsm_.GetFileSystem(path, fs)); return fs->DeletePath(path); } Status GetFileSystemType(const std::string& path, FileSystemType* type) { if (path.empty()) { return Status( Status::Code::INVALID_ARG, "Can not infer filesystem type from empty path"); } #ifdef TRITON_ENABLE_GCS // Check if this is a GCS path (gs://$BUCKET_NAME) if (!path.rfind("gs://", 0)) { *type = FileSystemType::GCS; return Status::Success; } #endif // TRITON_ENABLE_GCS #ifdef TRITON_ENABLE_S3 // Check if this is an S3 path (s3://$BUCKET_NAME) if (!path.rfind("s3://", 0)) { *type = FileSystemType::S3; return Status::Success; } #endif // TRITON_ENABLE_S3 #ifdef TRITON_ENABLE_AZURE_STORAGE // Check if this is an Azure Storage path if (!path.rfind("as://", 0)) { *type = FileSystemType::AS; return Status::Success; } #endif // TRITON_ENABLE_AZURE_STORAGE // Assume path is for local filesystem *type = FileSystemType::LOCAL; return Status::Success; } const std::string& FileSystemTypeString(const FileSystemType type) { static const std::string local_str("LOCAL"); static const std::string gcs_str("GCS"); static const std::string s3_str("S3"); static const std::string as_str("AS"); static const std::string unknown_str("UNKNOWN"); switch (type) { case FileSystemType::LOCAL: return local_str; case FileSystemType::GCS: return gcs_str; case FileSystemType::S3: return s3_str; case FileSystemType::AS: return as_str; default: return unknown_str; } } }} // namespace triton::core