Commit 2694b127 authored by Minjie Wang's avatar Minjie Wang
Browse files

import ffi solution from TVM

parent 61fa3c6c
/*!
* Copyright (c) 2017 by Contributors
* \file file_util.h
* \brief Minimum file manipulation util for runtime.
*/
#ifndef TVM_RUNTIME_FILE_UTIL_H_
#define TVM_RUNTIME_FILE_UTIL_H_
#include <string>
#include "meta_data.h"
namespace tvm {
namespace runtime {
/*!
* \brief Get file format from given file name or format argument.
* \param file_name The name of the file.
* \param format The format of the file.
*/
std::string GetFileFormat(const std::string& file_name,
const std::string& format);
/*!
* \return the directory in which TVM stores cached files.
* May be set using TVM_CACHE_DIR; defaults to system locations.
*/
std::string GetCacheDir();
/*!
* \brief Get meta file path given file name and format.
* \param file_name The name of the file.
*/
std::string GetMetaFilePath(const std::string& file_name);
/*!
* \brief Get file basename (i.e. without leading directories)
* \param file_name The name of the file.
* \return the base name
*/
std::string GetFileBasename(const std::string& file_name);
/*!
* \brief Load binary file into a in-memory buffer.
* \param file_name The name of the file.
* \param data The data to be loaded.
*/
void LoadBinaryFromFile(const std::string& file_name,
std::string* data);
/*!
* \brief Load binary file into a in-memory buffer.
* \param file_name The name of the file.
* \param data The binary data to be saved.
*/
void SaveBinaryToFile(const std::string& file_name,
const std::string& data);
/*!
* \brief Save meta data to file.
* \param file_name The name of the file.
* \param fmap The function info map.
*/
void SaveMetaDataToFile(
const std::string& file_name,
const std::unordered_map<std::string, FunctionInfo>& fmap);
/*!
* \brief Load meta data to file.
* \param file_name The name of the file.
* \param fmap The function info map.
*/
void LoadMetaDataFromFile(
const std::string& file_name,
std::unordered_map<std::string, FunctionInfo>* fmap);
} // namespace runtime
} // namespace tvm
#endif // TVM_RUNTIME_FILE_UTIL_H_
/*!
* Copyright (c) 2017 by Contributors
* \file meta_data.h
* \brief Meta data related utilities
*/
#ifndef TVM_RUNTIME_META_DATA_H_
#define TVM_RUNTIME_META_DATA_H_
#include <dmlc/json.h>
#include <dmlc/io.h>
#include <dgl/runtime/packed_func.h>
#include <string>
#include <vector>
#include "runtime_base.h"
namespace tvm {
namespace runtime {
/*! \brief function information needed by device */
struct FunctionInfo {
std::string name;
std::vector<TVMType> arg_types;
std::vector<std::string> thread_axis_tags;
void Save(dmlc::JSONWriter *writer) const;
void Load(dmlc::JSONReader *reader);
void Save(dmlc::Stream *writer) const;
bool Load(dmlc::Stream *reader);
};
} // namespace runtime
} // namespace tvm
namespace dmlc {
DMLC_DECLARE_TRAITS(has_saveload, ::tvm::runtime::FunctionInfo, true);
} // namespace dmlc
#endif // TVM_RUNTIME_META_DATA_H_
/*!
* Copyright (c) 2017 by Contributors
* \file module.cc
* \brief TVM module system
*/
#include <dgl/runtime/module.h>
#include <dgl/runtime/registry.h>
#include <dgl/runtime/packed_func.h>
#include <unordered_set>
#include <cstring>
#ifndef _LIBCPP_SGX_CONFIG
#include "file_util.h"
#endif
namespace tvm {
namespace runtime {
void Module::Import(Module other) {
// specially handle rpc
if (!std::strcmp((*this)->type_key(), "rpc")) {
static const PackedFunc* fimport_ = nullptr;
if (fimport_ == nullptr) {
fimport_ = runtime::Registry::Get("rpc._ImportRemoteModule");
CHECK(fimport_ != nullptr);
}
(*fimport_)(*this, other);
return;
}
// cyclic detection.
std::unordered_set<const ModuleNode*> visited{other.node_.get()};
std::vector<const ModuleNode*> stack{other.node_.get()};
while (!stack.empty()) {
const ModuleNode* n = stack.back();
stack.pop_back();
for (const Module& m : n->imports_) {
const ModuleNode* next = m.node_.get();
if (visited.count(next)) continue;
visited.insert(next);
stack.push_back(next);
}
}
CHECK(!visited.count(node_.get()))
<< "Cyclic dependency detected during import";
node_->imports_.emplace_back(std::move(other));
}
Module Module::LoadFromFile(const std::string& file_name,
const std::string& format) {
#ifndef _LIBCPP_SGX_CONFIG
std::string fmt = GetFileFormat(file_name, format);
CHECK(fmt.length() != 0)
<< "Cannot deduce format of file " << file_name;
if (fmt == "dll" || fmt == "dylib" || fmt == "dso") {
fmt = "so";
}
std::string load_f_name = "module.loadfile_" + fmt;
const PackedFunc* f = Registry::Get(load_f_name);
CHECK(f != nullptr)
<< "Loader of " << format << "("
<< load_f_name << ") is not presented.";
Module m = (*f)(file_name, format);
return m;
#else
LOG(FATAL) << "SGX does not support LoadFromFile";
#endif
}
void ModuleNode::SaveToFile(const std::string& file_name,
const std::string& format) {
LOG(FATAL) << "Module[" << type_key() << "] does not support SaveToFile";
}
void ModuleNode::SaveToBinary(dmlc::Stream* stream) {
LOG(FATAL) << "Module[" << type_key() << "] does not support SaveToBinary";
}
std::string ModuleNode::GetSource(const std::string& format) {
LOG(FATAL) << "Module[" << type_key() << "] does not support GetSource";
return "";
}
const PackedFunc* ModuleNode::GetFuncFromEnv(const std::string& name) {
auto it = import_cache_.find(name);
if (it != import_cache_.end()) return it->second.get();
PackedFunc pf;
for (Module& m : this->imports_) {
pf = m.GetFunction(name, false);
if (pf != nullptr) break;
}
if (pf == nullptr) {
const PackedFunc* f = Registry::Get(name);
CHECK(f != nullptr)
<< "Cannot find function " << name
<< " in the imported modules or global registry";
return f;
} else {
std::unique_ptr<PackedFunc> f(new PackedFunc(pf));
import_cache_[name] = std::move(f);
return import_cache_.at(name).get();
}
}
bool RuntimeEnabled(const std::string& target) {
std::string f_name;
if (target == "cpu") {
return true;
} else if (target == "cuda" || target == "gpu") {
f_name = "device_api.gpu";
} else if (target == "cl" || target == "opencl" || target == "sdaccel") {
f_name = "device_api.opencl";
} else if (target == "gl" || target == "opengl") {
f_name = "device_api.opengl";
} else if (target == "mtl" || target == "metal") {
f_name = "device_api.metal";
} else if (target == "vulkan") {
f_name = "device_api.vulkan";
} else if (target == "stackvm") {
f_name = "codegen.build_stackvm";
} else if (target == "rpc") {
f_name = "device_api.rpc";
} else if (target == "vpi" || target == "verilog") {
f_name = "device_api.vpi";
} else if (target.length() >= 5 && target.substr(0, 5) == "nvptx") {
f_name = "device_api.gpu";
} else if (target.length() >= 4 && target.substr(0, 4) == "rocm") {
f_name = "device_api.rocm";
} else if (target.length() >= 4 && target.substr(0, 4) == "llvm") {
const PackedFunc* pf = runtime::Registry::Get("codegen.llvm_target_enabled");
if (pf == nullptr) return false;
return (*pf)(target);
} else {
LOG(FATAL) << "Unknown optional runtime " << target;
}
return runtime::Registry::Get(f_name) != nullptr;
}
TVM_REGISTER_GLOBAL("module._Enabled")
.set_body([](TVMArgs args, TVMRetValue *ret) {
*ret = RuntimeEnabled(args[0]);
});
TVM_REGISTER_GLOBAL("module._GetSource")
.set_body([](TVMArgs args, TVMRetValue *ret) {
*ret = args[0].operator Module()->GetSource(args[1]);
});
TVM_REGISTER_GLOBAL("module._ImportsSize")
.set_body([](TVMArgs args, TVMRetValue *ret) {
*ret = static_cast<int64_t>(
args[0].operator Module()->imports().size());
});
TVM_REGISTER_GLOBAL("module._GetImport")
.set_body([](TVMArgs args, TVMRetValue *ret) {
*ret = args[0].operator Module()->
imports().at(args[1].operator int());
});
TVM_REGISTER_GLOBAL("module._GetTypeKey")
.set_body([](TVMArgs args, TVMRetValue *ret) {
*ret = std::string(args[0].operator Module()->type_key());
});
TVM_REGISTER_GLOBAL("module._LoadFromFile")
.set_body([](TVMArgs args, TVMRetValue *ret) {
*ret = Module::LoadFromFile(args[0], args[1]);
});
TVM_REGISTER_GLOBAL("module._SaveToFile")
.set_body([](TVMArgs args, TVMRetValue *ret) {
args[0].operator Module()->
SaveToFile(args[1], args[2]);
});
} // namespace runtime
} // namespace tvm
/*!
* Copyright (c) 2017 by Contributors
* \file module_util.cc
* \brief Utilities for module.
*/
#ifndef _LIBCPP_SGX_CONFIG
#include <dmlc/memory_io.h>
#endif
#include <dgl/runtime/module.h>
#include <dgl/runtime/registry.h>
#include <string>
#include "module_util.h"
namespace tvm {
namespace runtime {
void ImportModuleBlob(const char* mblob, std::vector<Module>* mlist) {
#ifndef _LIBCPP_SGX_CONFIG
CHECK(mblob != nullptr);
uint64_t nbytes = 0;
for (size_t i = 0; i < sizeof(nbytes); ++i) {
uint64_t c = mblob[i];
nbytes |= (c & 0xffUL) << (i * 8);
}
dmlc::MemoryFixedSizeStream fs(
const_cast<char*>(mblob + sizeof(nbytes)), static_cast<size_t>(nbytes));
dmlc::Stream* stream = &fs;
uint64_t size;
CHECK(stream->Read(&size));
for (uint64_t i = 0; i < size; ++i) {
std::string tkey;
CHECK(stream->Read(&tkey));
std::string fkey = "module.loadbinary_" + tkey;
const PackedFunc* f = Registry::Get(fkey);
CHECK(f != nullptr)
<< "Loader of " << tkey << "("
<< fkey << ") is not presented.";
Module m = (*f)(static_cast<void*>(stream));
mlist->push_back(m);
}
#else
LOG(FATAL) << "SGX does not support ImportModuleBlob";
#endif
}
PackedFunc WrapPackedFunc(BackendPackedCFunc faddr,
const std::shared_ptr<ModuleNode>& sptr_to_self) {
return PackedFunc([faddr, sptr_to_self](TVMArgs args, TVMRetValue* rv) {
int ret = (*faddr)(
const_cast<TVMValue*>(args.values),
const_cast<int*>(args.type_codes),
args.num_args);
CHECK_EQ(ret, 0) << TVMGetLastError();
});
}
} // namespace runtime
} // namespace tvm
/*!
* Copyright (c) 2017 by Contributors
* \file module_util.h
* \brief Helper utilities for module building
*/
#ifndef TVM_RUNTIME_MODULE_UTIL_H_
#define TVM_RUNTIME_MODULE_UTIL_H_
#include <dgl/runtime/module.h>
#include <dgl/runtime/c_runtime_api.h>
#include <dgl/runtime/c_backend_api.h>
#include <vector>
extern "C" {
// Function signature for generated packed function in shared library
typedef int (*BackendPackedCFunc)(void* args,
int* type_codes,
int num_args);
} // extern "C"
namespace tvm {
namespace runtime {
/*!
* \brief Wrap a BackendPackedCFunc to packed function.
* \param faddr The function address
* \param mptr The module pointer node.
*/
PackedFunc WrapPackedFunc(BackendPackedCFunc faddr, const std::shared_ptr<ModuleNode>& mptr);
/*!
* \brief Load and append module blob to module list
* \param mblob The module blob.
* \param module_list The module list to append to
*/
void ImportModuleBlob(const char* mblob, std::vector<Module>* module_list);
/*!
* \brief Utility to initialize conext function symbols during startup
* \param flookup A symbol lookup function.
* \tparam FLookup a function of signature string->void*
*/
template<typename FLookup>
void InitContextFunctions(FLookup flookup) {
#define TVM_INIT_CONTEXT_FUNC(FuncName) \
if (auto *fp = reinterpret_cast<decltype(&FuncName)*> \
(flookup("__" #FuncName))) { \
*fp = FuncName; \
}
// Initialize the functions
TVM_INIT_CONTEXT_FUNC(TVMFuncCall);
TVM_INIT_CONTEXT_FUNC(TVMAPISetLastError);
TVM_INIT_CONTEXT_FUNC(TVMBackendGetFuncFromEnv);
TVM_INIT_CONTEXT_FUNC(TVMBackendAllocWorkspace);
TVM_INIT_CONTEXT_FUNC(TVMBackendFreeWorkspace);
TVM_INIT_CONTEXT_FUNC(TVMBackendParallelLaunch);
TVM_INIT_CONTEXT_FUNC(TVMBackendParallelBarrier);
#undef TVM_INIT_CONTEXT_FUNC
}
} // namespace runtime
} // namespace tvm
#endif // TVM_RUNTIME_MODULE_UTIL_H_
/*!
* Copyright (c) 2017 by Contributors
* \file ndarray.cc
* \brief NDArray container infratructure.
*/
#include <dmlc/logging.h>
#include <dgl/runtime/ndarray.h>
#include <dgl/runtime/c_runtime_api.h>
#include <dgl/runtime/device_api.h>
#include "runtime_base.h"
// deleter for arrays used by DLPack exporter
extern "C" void NDArrayDLPackDeleter(DLManagedTensor* tensor);
namespace tvm {
namespace runtime {
inline void VerifyDataType(DLDataType dtype) {
CHECK_GE(dtype.lanes, 1);
if (dtype.code == kDLFloat) {
CHECK_EQ(dtype.bits % 8, 0);
} else {
CHECK_EQ(dtype.bits % 8, 0);
}
CHECK_EQ(dtype.bits & (dtype.bits - 1), 0);
}
inline size_t GetDataSize(const DLTensor& arr) {
size_t size = 1;
for (tvm_index_t i = 0; i < arr.ndim; ++i) {
size *= arr.shape[i];
}
size *= (arr.dtype.bits * arr.dtype.lanes + 7) / 8;
return size;
}
inline size_t GetDataAlignment(const DLTensor& arr) {
size_t align = (arr.dtype.bits / 8) * arr.dtype.lanes;
if (align < kAllocAlignment) return kAllocAlignment;
return align;
}
struct NDArray::Internal {
// Default deleter for the container
static void DefaultDeleter(NDArray::Container* ptr) {
using tvm::runtime::NDArray;
if (ptr->manager_ctx != nullptr) {
static_cast<NDArray::Container*>(ptr->manager_ctx)->DecRef();
} else if (ptr->dl_tensor.data != nullptr) {
tvm::runtime::DeviceAPI::Get(ptr->dl_tensor.ctx)->FreeDataSpace(
ptr->dl_tensor.ctx, ptr->dl_tensor.data);
}
delete ptr;
}
// Deleter for NDArray converted from DLPack
// This is used from data which is passed from external DLPack(DLManagedTensor)
// that are not allocated inside of TVM.
// This enables us to create NDArray from memory allocated by other
// frameworks that are DLPack compatible
static void DLPackDeleter(NDArray::Container* ptr) {
DLManagedTensor* tensor = static_cast<DLManagedTensor*>(ptr->manager_ctx);
if (tensor->deleter != nullptr) {
(*tensor->deleter)(tensor);
}
delete ptr;
}
// Local create function which allocates tensor metadata
// but does not allocate space for the data.
static NDArray Create(std::vector<int64_t> shape,
DLDataType dtype,
DLContext ctx) {
VerifyDataType(dtype);
// critical zone
NDArray::Container* data = new NDArray::Container();
data->deleter = DefaultDeleter;
NDArray ret(data);
ret.data_ = data;
// RAII now in effect
// setup shape
data->shape_ = std::move(shape);
data->dl_tensor.shape = dmlc::BeginPtr(data->shape_);
data->dl_tensor.ndim = static_cast<int>(data->shape_.size());
// setup dtype
data->dl_tensor.dtype = dtype;
// setup ctx
data->dl_tensor.ctx = ctx;
return ret;
}
// Implementation of API function
static DLTensor* MoveAsDLTensor(NDArray arr) {
DLTensor* tensor = const_cast<DLTensor*>(arr.operator->());
CHECK(reinterpret_cast<DLTensor*>(arr.data_) == tensor);
arr.data_ = nullptr;
return tensor;
}
// Container to DLManagedTensor
static DLManagedTensor* ToDLPack(NDArray::Container* from) {
CHECK(from != nullptr);
DLManagedTensor* ret = new DLManagedTensor();
ret->dl_tensor = from->dl_tensor;
ret->manager_ctx = from;
from->IncRef();
ret->deleter = NDArrayDLPackDeleter;
return ret;
}
};
NDArray NDArray::CreateView(std::vector<int64_t> shape,
DLDataType dtype) {
CHECK(data_ != nullptr);
CHECK(data_->dl_tensor.strides == nullptr)
<< "Can only create view for compact tensor";
NDArray ret = Internal::Create(shape, dtype, data_->dl_tensor.ctx);
ret.data_->dl_tensor.byte_offset =
this->data_->dl_tensor.byte_offset;
size_t curr_size = GetDataSize(this->data_->dl_tensor);
size_t view_size = GetDataSize(ret.data_->dl_tensor);
CHECK_LE(view_size, curr_size)
<< "Tries to create a view that has bigger memory than current one";
// increase ref count
this->data_->IncRef();
ret.data_->manager_ctx = this->data_;
ret.data_->dl_tensor.data = this->data_->dl_tensor.data;
return ret;
}
DLManagedTensor* NDArray::ToDLPack() const {
return Internal::ToDLPack(data_);
}
NDArray NDArray::Empty(std::vector<int64_t> shape,
DLDataType dtype,
DLContext ctx) {
NDArray ret = Internal::Create(shape, dtype, ctx);
// setup memory content
size_t size = GetDataSize(ret.data_->dl_tensor);
size_t alignment = GetDataAlignment(ret.data_->dl_tensor);
ret.data_->dl_tensor.data =
DeviceAPI::Get(ret->ctx)->AllocDataSpace(
ret->ctx, size, alignment, ret->dtype);
return ret;
}
NDArray NDArray::FromDLPack(DLManagedTensor* tensor) {
NDArray::Container* data = new NDArray::Container();
data->deleter = Internal::DLPackDeleter;
data->manager_ctx = tensor;
data->dl_tensor = tensor->dl_tensor;
return NDArray(data);
}
void NDArray::CopyFromTo(DLTensor* from,
DLTensor* to,
TVMStreamHandle stream) {
size_t from_size = GetDataSize(*from);
size_t to_size = GetDataSize(*to);
CHECK_EQ(from_size, to_size)
<< "TVMArrayCopyFromTo: The size must exactly match";
CHECK(from->ctx.device_type == to->ctx.device_type
|| from->ctx.device_type == kDLCPU
|| to->ctx.device_type == kDLCPU)
<< "Can not copy across different ctx types directly";
// Use the context that is *not* a cpu context to get the correct device
// api manager.
TVMContext ctx = from->ctx.device_type != kDLCPU ? from->ctx : to->ctx;
DeviceAPI::Get(ctx)->CopyDataFromTo(
from->data, static_cast<size_t>(from->byte_offset),
to->data, static_cast<size_t>(to->byte_offset),
from_size, from->ctx, to->ctx, from->dtype, stream);
}
} // namespace runtime
} // namespace tvm
using namespace tvm::runtime;
void NDArrayDLPackDeleter(DLManagedTensor* tensor) {
static_cast<NDArray::Container*>(tensor->manager_ctx)->DecRef();
delete tensor;
}
int TVMArrayAlloc(const tvm_index_t* shape,
int ndim,
int dtype_code,
int dtype_bits,
int dtype_lanes,
int device_type,
int device_id,
TVMArrayHandle* out) {
API_BEGIN();
DLDataType dtype;
dtype.code = static_cast<uint8_t>(dtype_code);
dtype.bits = static_cast<uint8_t>(dtype_bits);
dtype.lanes = static_cast<uint16_t>(dtype_lanes);
DLContext ctx;
ctx.device_type = static_cast<DLDeviceType>(device_type);
ctx.device_id = device_id;
*out = NDArray::Internal::MoveAsDLTensor(
NDArray::Empty(std::vector<int64_t>(shape, shape + ndim), dtype, ctx));
API_END();
}
int TVMArrayFree(TVMArrayHandle handle) {
API_BEGIN();
reinterpret_cast<NDArray::Container*>(handle)->DecRef();
API_END();
}
int TVMArrayCopyFromTo(TVMArrayHandle from,
TVMArrayHandle to,
TVMStreamHandle stream) {
API_BEGIN();
NDArray::CopyFromTo(from, to, stream);
API_END();
}
int TVMArrayFromDLPack(DLManagedTensor* from,
TVMArrayHandle* out) {
API_BEGIN();
*out = NDArray::Internal::MoveAsDLTensor(NDArray::FromDLPack(from));
API_END();
}
int TVMArrayToDLPack(TVMArrayHandle from,
DLManagedTensor** out) {
API_BEGIN();
*out = NDArray::Internal::ToDLPack(reinterpret_cast<NDArray::Container*>(from));
API_END();
}
void TVMDLManagedTensorCallDeleter(DLManagedTensor* dltensor) {
(*(dltensor->deleter))(dltensor);
}
int TVMArrayCopyFromBytes(TVMArrayHandle handle,
void* data,
size_t nbytes) {
API_BEGIN();
TVMContext cpu_ctx;
cpu_ctx.device_type = kDLCPU;
cpu_ctx.device_id = 0;
size_t arr_size = GetDataSize(*handle);
CHECK_EQ(arr_size, nbytes)
<< "TVMArrayCopyFromBytes: size mismatch";
DeviceAPI::Get(handle->ctx)->CopyDataFromTo(
data, 0,
handle->data, static_cast<size_t>(handle->byte_offset),
nbytes, cpu_ctx, handle->ctx, handle->dtype, nullptr);
API_END();
}
int TVMArrayCopyToBytes(TVMArrayHandle handle,
void* data,
size_t nbytes) {
API_BEGIN();
TVMContext cpu_ctx;
cpu_ctx.device_type = kDLCPU;
cpu_ctx.device_id = 0;
size_t arr_size = GetDataSize(*handle);
CHECK_EQ(arr_size, nbytes)
<< "TVMArrayCopyToBytes: size mismatch";
DeviceAPI::Get(handle->ctx)->CopyDataFromTo(
handle->data, static_cast<size_t>(handle->byte_offset),
data, 0,
nbytes, handle->ctx, cpu_ctx, handle->dtype, nullptr);
API_END();
}
/*!
* Copyright (c) 2017 by Contributors
* \file pack_args.h
* \brief Utility to pack TVMArgs to other type-erased fution calling convention.
*
* Two type erased function signatures are supported.
* - cuda_style(void** args, int num_args);
* - Pack everything by address
* - metal_style(void** buffers, int num_buffers,
* union_32bit args[N], int num_args);
* - Pack buffer by address, pack rest parameter into 32bit union buffer.
*/
#ifndef TVM_RUNTIME_PACK_ARGS_H_
#define TVM_RUNTIME_PACK_ARGS_H_
#include <dgl/runtime/c_runtime_api.h>
#include <vector>
#include <cstring>
namespace tvm {
namespace runtime {
/*!
* \brief argument union type of 32bit.
* Choose 32 bit because most GPU API do not work well with 64 bit.
*/
union ArgUnion {
int32_t v_int32;
uint32_t v_uint32;
float v_float32;
};
/*!
* \brief Create a packed function from void addr types.
*
* \param f with signiture (TVMArgs args, TVMRetValue* rv, void* void_args)
* \param arg_types The arguments type information.
* \tparam F the function type
*
* \return The wrapped packed function.
*/
template<typename F>
inline PackedFunc PackFuncVoidAddr(F f, const std::vector<TVMType>& arg_types);
/*!
* \brief Create a packed function that from function only packs buffer arguments.
*
* \param f with signiture (TVMArgs args, TVMRetValue* rv, ArgUnion* pack_args)
* \param arg_types The arguments type information.
* \tparam F the function type
*
* \return The wrapped packed function.
*/
template<typename F>
inline PackedFunc PackFuncNonBufferArg(F f, const std::vector<TVMType>& arg_types);
/*!
* \brief Create a packed function that from function that takes a packed arguments.
*
* \param f with signature (TVMArgs args, TVMRetValue* rv, void* pack_args, size_t nbytes)
* \param arg_types The arguments that wish to get from
* \tparam F the function type
*
* \return The wrapped packed function.
*/
template<typename F>
inline PackedFunc PackFuncPackedArg(F f, const std::vector<TVMType>& arg_types);
/*!
* \brief Extract number of buffer argument from the argument types.
* \param arg_types The argument types.
* \return number of buffer arguments
*/
inline size_t NumBufferArgs(const std::vector<TVMType>& arg_types);
// implementations details
namespace detail {
template<typename T, int kSize>
class TempArray {
public:
explicit TempArray(int size) {}
T* data() {
return data_;
}
private:
T data_[kSize];
};
template<typename T>
class TempArray<T, 0> {
public:
explicit TempArray(int size) : data_(size) {}
T* data() {
return data_.data();
}
private:
std::vector<T> data_;
};
/*! \brief conversion code used in void arg. */
enum ArgConvertCode {
INT64_TO_INT64,
INT64_TO_INT32,
INT64_TO_UINT32,
FLOAT64_TO_FLOAT32,
FLOAT64_TO_FLOAT64,
HANDLE_TO_HANDLE
};
inline ArgConvertCode GetArgConvertCode(TVMType t) {
CHECK_EQ(t.lanes, 1U)
<< "Cannot pass vector type argument to devic function for now";
if (t.code == kDLInt) {
if (t.bits == 64U) return INT64_TO_INT64;
if (t.bits == 32U) return INT64_TO_INT32;
} else if (t.code == kDLUInt) {
if (t.bits == 32U) return INT64_TO_UINT32;
} else if (t.code == kDLFloat) {
if (t.bits == 64U) return FLOAT64_TO_FLOAT64;
if (t.bits == 32U) return FLOAT64_TO_FLOAT32;
} else if (t.code == kHandle) {
return HANDLE_TO_HANDLE;
}
LOG(FATAL) << "Cannot handle " << t << " as device function argument";
return HANDLE_TO_HANDLE;
}
template<int N, typename F>
inline PackedFunc PackFuncVoidAddr_(F f, const std::vector<ArgConvertCode>& codes) {
int num_args = static_cast<int>(codes.size());
auto ret = [f, codes, num_args](TVMArgs args, TVMRetValue* ret) {
TempArray<void*, N> addr_(num_args);
TempArray<ArgUnion, N> holder_(num_args);
void** addr = addr_.data();
ArgUnion* holder = holder_.data();
for (int i = 0; i < num_args; ++i) {
switch (codes[i]) {
case INT64_TO_INT64:
case FLOAT64_TO_FLOAT64:
case HANDLE_TO_HANDLE: {
addr[i] = (void*)&(args.values[i]); // NOLINT(*)
break;
}
case INT64_TO_INT32: {
holder[i].v_int32 = static_cast<int32_t>(args.values[i].v_int64);
addr[i] = &(holder[i]);
break;
}
case INT64_TO_UINT32 : {
holder[i].v_uint32 = static_cast<uint32_t>(args.values[i].v_int64);
addr[i] = &(holder[i]);
break;
}
case FLOAT64_TO_FLOAT32: {
holder[i].v_float32 = static_cast<float>(args.values[i].v_float64);
addr[i] = &(holder[i]);
break;
}
}
}
f(args, ret, addr);
};
return PackedFunc(ret);
}
template<int N, typename F>
inline PackedFunc PackFuncNonBufferArg_(
F f, int base, const std::vector<ArgConvertCode>& codes) {
int num_args = static_cast<int>(codes.size());
auto ret = [f, codes, base, num_args](TVMArgs args, TVMRetValue* ret) {
TempArray<ArgUnion, N> holder_(num_args);
ArgUnion* holder = holder_.data();
for (int i = 0; i < num_args; ++i) {
switch (codes[i]) {
case INT64_TO_INT64:
case FLOAT64_TO_FLOAT64: {
LOG(FATAL) << "Donot support 64bit argument to device function"; break;
}
case INT64_TO_INT32: {
holder[i].v_int32 = static_cast<int32_t>(args.values[base + i].v_int64);
break;
}
case INT64_TO_UINT32 : {
holder[i].v_uint32 = static_cast<uint32_t>(args.values[base + i].v_int64);
break;
}
case FLOAT64_TO_FLOAT32: {
holder[i].v_float32 = static_cast<float>(args.values[base + i].v_float64);
break;
}
case HANDLE_TO_HANDLE: {
LOG(FATAL) << "not reached"; break;
}
}
}
f(args, ret, holder);
};
return PackedFunc(ret);
}
template<int N, typename F>
inline PackedFunc PackFuncPackedArg_(
F f, const std::vector<ArgConvertCode>& codes) {
int num_args = static_cast<int>(codes.size());
auto ret = [f, codes, num_args](TVMArgs args, TVMRetValue* ret) {
TempArray<uint64_t, N> pack_(num_args);
int32_t* pack = reinterpret_cast<int32_t*>(pack_.data());
int32_t* ptr = pack;
static_assert(sizeof(TVMValue) == 8, "invariant");
static_assert(sizeof(void*) % sizeof(int32_t) == 0, "invariant");
for (int i = 0; i < num_args; ++i) {
switch (codes[i]) {
case HANDLE_TO_HANDLE: {
std::memcpy(ptr, &(args.values[i].v_handle), sizeof(void*));
ptr += sizeof(void*) / sizeof(int32_t);
break;
}
case INT64_TO_INT64:
case FLOAT64_TO_FLOAT64: {
std::memcpy(ptr, &args.values[i], sizeof(TVMValue));
ptr += 2;
break;
}
case INT64_TO_INT32: {
*ptr = static_cast<int32_t>(args.values[i].v_int64);
++ptr;
break;
}
case INT64_TO_UINT32 : {
*reinterpret_cast<uint32_t*>(ptr) =
static_cast<uint32_t>(args.values[i].v_int64);
++ptr;
break;
}
case FLOAT64_TO_FLOAT32: {
*reinterpret_cast<float*>(ptr) =
static_cast<float>(args.values[i].v_float64);
++ptr;
break;
}
default: {
LOG(FATAL) << "not reached"; break;
}
}
}
f(args, ret, pack, (ptr - pack) * sizeof(int32_t));
};
return PackedFunc(ret);
}
} // namespace detail
template<typename F>
inline PackedFunc PackFuncVoidAddr(F f, const std::vector<TVMType>& arg_types) {
std::vector<detail::ArgConvertCode> codes(arg_types.size());
for (size_t i = 0; i < arg_types.size(); ++i) {
codes[i] = detail::GetArgConvertCode(arg_types[i]);
}
size_t num_void_args = arg_types.size();
// specialization
if (num_void_args <= 4) {
return detail::PackFuncVoidAddr_<4>(f, codes);
} else if (num_void_args <= 8) {
return detail::PackFuncVoidAddr_<8>(f, codes);
} else {
return detail::PackFuncVoidAddr_<0>(f, codes);
}
}
inline size_t NumBufferArgs(const std::vector<TVMType>& arg_types) {
size_t base = arg_types.size();
for (size_t i = 0; i < arg_types.size(); ++i) {
if (arg_types[i].code != kHandle) {
base = i; break;
}
}
for (size_t i = base; i < arg_types.size(); ++i) {
CHECK(arg_types[i].code != kHandle)
<< "Device function need to be organized";
}
return base;
}
template<typename F>
inline PackedFunc PackFuncNonBufferArg(F f, const std::vector<TVMType>& arg_types) {
size_t num_buffer = NumBufferArgs(arg_types);
std::vector<detail::ArgConvertCode> codes;
for (size_t i = num_buffer; i < arg_types.size(); ++i) {
codes.push_back(detail::GetArgConvertCode(arg_types[i]));
}
int base = static_cast<int>(num_buffer);
size_t nargs = codes.size();
// specialization
if (nargs <= 4) {
return detail::PackFuncNonBufferArg_<4>(f, base, codes);
} else {
return detail::PackFuncNonBufferArg_<0>(f, base, codes);
}
}
template<typename F>
inline PackedFunc PackFuncPackedArg(F f, const std::vector<TVMType>& arg_types) {
std::vector<detail::ArgConvertCode> codes;
for (size_t i = 0; i < arg_types.size(); ++i) {
codes.push_back(detail::GetArgConvertCode(arg_types[i]));
}
size_t nargs = codes.size();
// specialization
if (nargs <= 4) {
return detail::PackFuncPackedArg_<4>(f, codes);
} else {
return detail::PackFuncPackedArg_<0>(f, codes);
}
}
} // namespace runtime
} // namespace tvm
#endif // TVM_RUNTIME_PACK_ARGS_H_
/*!
* Copyright (c) 2017 by Contributors
* \file registry.cc
* \brief The global registry of packed function.
*/
#include <dmlc/logging.h>
#include <dmlc/thread_local.h>
#include <dgl/runtime/registry.h>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <array>
#include "runtime_base.h"
namespace tvm {
namespace runtime {
struct Registry::Manager {
// map storing the functions.
// We delibrately used raw pointer
// This is because PackedFunc can contain callbacks into the host languge(python)
// and the resource can become invalid because of indeterminstic order of destruction.
// The resources will only be recycled during program exit.
std::unordered_map<std::string, Registry*> fmap;
// vtable for extension type
std::array<ExtTypeVTable, kExtEnd> ext_vtable;
// mutex
std::mutex mutex;
Manager() {
for (auto& x : ext_vtable) {
x.destroy = nullptr;
}
}
static Manager* Global() {
static Manager inst;
return &inst;
}
};
Registry& Registry::set_body(PackedFunc f) { // NOLINT(*)
func_ = f;
return *this;
}
Registry& Registry::Register(const std::string& name, bool override) { // NOLINT(*)
Manager* m = Manager::Global();
std::lock_guard<std::mutex> lock(m->mutex);
auto it = m->fmap.find(name);
if (it == m->fmap.end()) {
Registry* r = new Registry();
r->name_ = name;
m->fmap[name] = r;
return *r;
} else {
CHECK(override)
<< "Global PackedFunc " << name << " is already registered";
return *it->second;
}
}
bool Registry::Remove(const std::string& name) {
Manager* m = Manager::Global();
std::lock_guard<std::mutex> lock(m->mutex);
auto it = m->fmap.find(name);
if (it == m->fmap.end()) return false;
m->fmap.erase(it);
return true;
}
const PackedFunc* Registry::Get(const std::string& name) {
Manager* m = Manager::Global();
std::lock_guard<std::mutex> lock(m->mutex);
auto it = m->fmap.find(name);
if (it == m->fmap.end()) return nullptr;
return &(it->second->func_);
}
std::vector<std::string> Registry::ListNames() {
Manager* m = Manager::Global();
std::lock_guard<std::mutex> lock(m->mutex);
std::vector<std::string> keys;
keys.reserve(m->fmap.size());
for (const auto &kv : m->fmap) {
keys.push_back(kv.first);
}
return keys;
}
ExtTypeVTable* ExtTypeVTable::Get(int type_code) {
CHECK(type_code > kExtBegin && type_code < kExtEnd);
Registry::Manager* m = Registry::Manager::Global();
ExtTypeVTable* vt = &(m->ext_vtable[type_code]);
CHECK(vt->destroy != nullptr)
<< "Extension type not registered";
return vt;
}
ExtTypeVTable* ExtTypeVTable::RegisterInternal(
int type_code, const ExtTypeVTable& vt) {
CHECK(type_code > kExtBegin && type_code < kExtEnd);
Registry::Manager* m = Registry::Manager::Global();
std::lock_guard<std::mutex> lock(m->mutex);
ExtTypeVTable* pvt = &(m->ext_vtable[type_code]);
pvt[0] = vt;
return pvt;
}
} // namespace runtime
} // namespace tvm
/*! \brief entry to to easily hold returning information */
struct TVMFuncThreadLocalEntry {
/*! \brief result holder for returning strings */
std::vector<std::string> ret_vec_str;
/*! \brief result holder for returning string pointers */
std::vector<const char *> ret_vec_charp;
};
/*! \brief Thread local store that can be used to hold return values. */
typedef dmlc::ThreadLocalStore<TVMFuncThreadLocalEntry> TVMFuncThreadLocalStore;
int TVMExtTypeFree(void* handle, int type_code) {
API_BEGIN();
tvm::runtime::ExtTypeVTable::Get(type_code)->destroy(handle);
API_END();
}
int TVMFuncRegisterGlobal(
const char* name, TVMFunctionHandle f, int override) {
API_BEGIN();
tvm::runtime::Registry::Register(name, override != 0)
.set_body(*static_cast<tvm::runtime::PackedFunc*>(f));
API_END();
}
int TVMFuncGetGlobal(const char* name, TVMFunctionHandle* out) {
API_BEGIN();
const tvm::runtime::PackedFunc* fp =
tvm::runtime::Registry::Get(name);
if (fp != nullptr) {
*out = new tvm::runtime::PackedFunc(*fp); // NOLINT(*)
} else {
*out = nullptr;
}
API_END();
}
int TVMFuncListGlobalNames(int *out_size,
const char*** out_array) {
API_BEGIN();
TVMFuncThreadLocalEntry *ret = TVMFuncThreadLocalStore::Get();
ret->ret_vec_str = tvm::runtime::Registry::ListNames();
ret->ret_vec_charp.clear();
for (size_t i = 0; i < ret->ret_vec_str.size(); ++i) {
ret->ret_vec_charp.push_back(ret->ret_vec_str[i].c_str());
}
*out_array = dmlc::BeginPtr(ret->ret_vec_charp);
*out_size = static_cast<int>(ret->ret_vec_str.size());
API_END();
}
/*!
* Copyright (c) 2016 by Contributors
* \file runtime_base.h
* \brief Base of all C APIs
*/
#ifndef TVM_RUNTIME_RUNTIME_BASE_H_
#define TVM_RUNTIME_RUNTIME_BASE_H_
#include <dgl/runtime/c_runtime_api.h>
#include <stdexcept>
/*! \brief macro to guard beginning and end section of all functions */
#define API_BEGIN() try {
/*! \brief every function starts with API_BEGIN();
and finishes with API_END() or API_END_HANDLE_ERROR */
#define API_END() } catch(std::runtime_error &_except_) { return TVMAPIHandleException(_except_); } return 0; // NOLINT(*)
/*!
* \brief every function starts with API_BEGIN();
* and finishes with API_END() or API_END_HANDLE_ERROR
* The finally clause contains procedure to cleanup states when an error happens.
*/
#define API_END_HANDLE_ERROR(Finalize) } catch(std::runtime_error &_except_) { Finalize; return TVMAPIHandleException(_except_); } return 0; // NOLINT(*)
/*!
* \brief handle exception throwed out
* \param e the exception
* \return the return value of API after exception is handled
*/
inline int TVMAPIHandleException(const std::runtime_error &e) {
TVMAPISetLastError(e.what());
return -1;
}
#endif // TVM_RUNTIME_RUNTIME_BASE_H_
/*!
* Copyright (c) 2017 by Contributors
* \file system_lib_module.cc
* \brief SystemLib module.
*/
#include <dgl/runtime/registry.h>
#include <dgl/runtime/c_backend_api.h>
#include <mutex>
#include "module_util.h"
namespace tvm {
namespace runtime {
class SystemLibModuleNode : public ModuleNode {
public:
SystemLibModuleNode() = default;
const char* type_key() const final {
return "system_lib";
}
PackedFunc GetFunction(
const std::string& name,
const std::shared_ptr<ModuleNode>& sptr_to_self) final {
std::lock_guard<std::mutex> lock(mutex_);
if (module_blob_ != nullptr) {
// If we previously recorded submodules, load them now.
ImportModuleBlob(reinterpret_cast<const char*>(module_blob_), &imports_);
module_blob_ = nullptr;
}
auto it = tbl_.find(name);
if (it != tbl_.end()) {
return WrapPackedFunc(
reinterpret_cast<BackendPackedCFunc>(it->second), sptr_to_self);
} else {
return PackedFunc();
}
}
void RegisterSymbol(const std::string& name, void* ptr) {
std::lock_guard<std::mutex> lock(mutex_);
if (name == symbol::tvm_module_ctx) {
void** ctx_addr = reinterpret_cast<void**>(ptr);
*ctx_addr = this;
} else if (name == symbol::tvm_dev_mblob) {
// Record pointer to content of submodules to be loaded.
// We defer loading submodules to the first call to GetFunction().
// The reason is that RegisterSymbol() gets called when initializing the
// syslib (i.e. library loading time), and the registeries aren't ready
// yet. Therefore, we might not have the functionality to load submodules
// now.
CHECK(module_blob_ == nullptr) << "Resetting mobule blob?";
module_blob_ = ptr;
} else {
auto it = tbl_.find(name);
if (it != tbl_.end() && ptr != it->second) {
LOG(WARNING) << "SystemLib symbol " << name
<< " get overriden to a different address "
<< ptr << "->" << it->second;
}
tbl_[name] = ptr;
}
}
static const std::shared_ptr<SystemLibModuleNode>& Global() {
static std::shared_ptr<SystemLibModuleNode> inst =
std::make_shared<SystemLibModuleNode>();
return inst;
}
private:
// Internal mutex
std::mutex mutex_;
// Internal symbol table
std::unordered_map<std::string, void*> tbl_;
// Module blob to be imported
void* module_blob_{nullptr};
};
TVM_REGISTER_GLOBAL("module._GetSystemLib")
.set_body([](TVMArgs args, TVMRetValue* rv) {
*rv = runtime::Module(SystemLibModuleNode::Global());
});
} // namespace runtime
} // namespace tvm
int TVMBackendRegisterSystemLibSymbol(const char* name, void* ptr) {
tvm::runtime::SystemLibModuleNode::Global()->RegisterSymbol(name, ptr);
return 0;
}
/*!
* Copyright (c) 2017 by Contributors
* \file thread_pool.cc
* \brief Threadpool for multi-threading runtime.
*/
#include <dgl/runtime/c_runtime_api.h>
#include <dgl/runtime/c_backend_api.h>
#include <dgl/runtime/registry.h>
#include <dgl/runtime/packed_func.h>
#include <dgl/runtime/threading_backend.h>
#include <dmlc/thread_local.h>
#include <dmlc/logging.h>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <atomic>
#include <algorithm>
#include <vector>
#include <string>
#include <cstring>
#include <memory>
#include <sstream>
const constexpr int kL1CacheBytes = 64;
namespace tvm {
namespace runtime {
// stride in the page, fit to cache line.
constexpr int kSyncStride = 64 / sizeof(std::atomic<int>);
/*!
* \brief Thread local master environment.
*/
class ParallelLauncher {
public:
// Reset the the task request.
void Init(FTVMParallelLambda flambda,
void* cdata,
int num_task,
bool need_sync) {
num_pending_.store(num_task);
this->cdata = cdata;
this->flambda = flambda;
this->env.num_task = num_task;
has_error_.store(false);
// reshape
if (static_cast<size_t>(num_task) > par_errors_.size()) {
par_errors_.resize(num_task + 1);
if (need_sync) {
delete[] sync_counter_;
sync_counter_ = new std::atomic<int>[num_task * kSyncStride];
}
}
if (need_sync) {
for (int i = 0; i < num_task; ++i) {
sync_counter_[i * kSyncStride].store(
0, std::memory_order_relaxed);
}
this->env.sync_handle = sync_counter_;
} else {
this->env.sync_handle = nullptr;
}
}
~ParallelLauncher() {
delete[] sync_counter_;
}
// Wait n jobs to finish
int WaitForJobs() {
while (num_pending_.load() != 0) {
tvm::runtime::threading::Yield();
}
if (!has_error_.load()) return 0;
// the following is intended to use string due to
// security issue raised in SGX backend
std::string err("");
for (size_t i = 0; i < par_errors_.size(); ++i) {
if (par_errors_[i].length() != 0) {
err += "Task " + std::to_string(i) + " error: " + par_errors_[i] + '\n';
par_errors_[i].clear();
}
}
TVMAPISetLastError(err.c_str());
return -1;
}
// Signal that one job has finished.
void SignalJobError(int task_id) {
num_pending_.fetch_sub(1);
par_errors_[task_id] = TVMGetLastError();
has_error_.store(true);
}
// Signal that one job has finished.
void SignalJobFinish() {
num_pending_.fetch_sub(1);
}
// Get thread local version of the store.
static ParallelLauncher* ThreadLocal() {
return dmlc::ThreadLocalStore<ParallelLauncher>::Get();
}
// The parallel lambda
FTVMParallelLambda flambda;
// The closure data
void* cdata;
// Local env
TVMParallelGroupEnv env;
// Whether this thread is worker of the pool.
// used to prevent recursive launch.
bool is_worker{false};
private:
// The pending jobs.
std::atomic<int32_t> num_pending_;
// Whether error has been countered.
std::atomic<bool> has_error_;
// The counter page.
std::atomic<int32_t>* sync_counter_{nullptr};
// The error message
std::vector<std::string> par_errors_;
};
/*! \brief Lock-free single-producer-single-consumer queue for each thread */
class SpscTaskQueue {
public:
/*! \brief The task entry */
struct Task {
ParallelLauncher* launcher;
int32_t task_id;
};
SpscTaskQueue() :
buffer_(new Task[kRingSize]),
head_(0),
tail_(0) {
}
~SpscTaskQueue() {
delete[] buffer_;
}
/*!
* \brief Push a task into the queue and notify the comsumer if it is on wait.
* \param input The task to be dequeued.
*/
void Push(const Task& input) {
while (!Enqueue(input)) {
tvm::runtime::threading::Yield();
}
if (pending_.fetch_add(1) == -1) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.notify_one();
}
}
/*!
* \brief Pop a task out of the queue and condition wait if no tasks.
* \param output The pointer to the task to be dequeued.
* \param spin_count The number of iterations to spin before sleep.
* \return Whether pop is successful (true) or we need to exit now (false).
*/
bool Pop(Task* output, uint32_t spin_count = 300000) {
// Busy wait a bit when the queue is empty.
// If a new task comes to the queue quickly, this wait avoid the worker from sleeping.
// The default spin count is set by following the typical omp convention
for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) {
tvm::runtime::threading::Yield();
}
if (pending_.fetch_sub(1) == 0) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] {
return pending_.load() >= 0 || exit_now_.load();
});
}
if (exit_now_.load(std::memory_order_relaxed)) {
return false;
}
const uint32_t head = head_.load(std::memory_order_relaxed);
// sanity check if the queue is empty
CHECK(tail_.load(std::memory_order_acquire) != head);
*output = buffer_[head];
head_.store((head + 1) % kRingSize, std::memory_order_release);
return true;
}
/*!
* \brief Signal to terminate the worker.
*/
void SignalForKill() {
std::lock_guard<std::mutex> lock(mutex_);
exit_now_.store(true);
cv_.notify_all();
}
protected:
/*!
* \brief Lock-free enqueue.
* \param input The task to be enqueued.
* \return Whether the task is enqueued.
*/
bool Enqueue(const Task& input) {
if (exit_now_.load(std::memory_order_relaxed)) return false;
const uint32_t tail = tail_.load(std::memory_order_relaxed);
if ((tail + 1) % kRingSize != (head_.load(std::memory_order_acquire))) {
buffer_[tail] = input;
tail_.store((tail + 1) % kRingSize, std::memory_order_release);
return true;
}
return false;
}
// the cache line paddings are used for avoid false sharing between atomic variables
typedef char cache_line_pad_t[kL1CacheBytes];
cache_line_pad_t pad0_;
// size of the queue, the queue can host size_ - 1 items at most
// define it as a constant for better compiler optimization
static constexpr const int kRingSize = 2;
// pointer to access the item
Task* const buffer_;
cache_line_pad_t pad1_;
// queue head, where one gets a task from the queue
std::atomic<uint32_t> head_;
cache_line_pad_t pad2_;
// queue tail, when one puts a task to the queue
std::atomic<uint32_t> tail_;
cache_line_pad_t pad3_;
// pending tasks in the queue
std::atomic<int8_t> pending_{0};
cache_line_pad_t pad4_;
// signal for exit now
std::atomic<bool> exit_now_{false};
// internal mutex
std::mutex mutex_;
// cv for consumer
std::condition_variable cv_;
};
// The thread pool
class ThreadPool {
public:
ThreadPool(): num_workers_(tvm::runtime::threading::MaxConcurrency()) {
for (int i = 0; i < num_workers_; ++i) {
// The SpscTaskQueue only hosts ONE item at a time
queues_.emplace_back(std::unique_ptr<SpscTaskQueue>(new SpscTaskQueue()));
}
threads_ = std::unique_ptr<tvm::runtime::threading::ThreadGroup>(
new tvm::runtime::threading::ThreadGroup(
num_workers_, [this](int worker_id) { this->RunWorker(worker_id); },
exclude_worker0_ /* include_main_thread */));
num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_);
}
~ThreadPool() {
for (std::unique_ptr<SpscTaskQueue>& q : queues_) {
q->SignalForKill();
}
threads_.reset();
}
int Launch(FTVMParallelLambda flambda,
void* cdata,
int num_task,
int need_sync) {
ParallelLauncher* launcher = ParallelLauncher::ThreadLocal();
CHECK(!launcher->is_worker)
<< "Cannot launch parallel job inside worker, consider fuse then parallel";
if (num_task == 0) {
num_task = num_workers_used_;
}
if (need_sync != 0) {
CHECK_LE(num_task, num_workers_used_)
<< "Request parallel sync task larger than number of threads used "
<< " workers=" << num_workers_used_ << " request=" << num_task;
}
launcher->Init(flambda, cdata, num_task, need_sync != 0);
SpscTaskQueue::Task tsk;
tsk.launcher = launcher;
// if worker0 is taken by the master, queues_[0] is abandoned
for (int i = exclude_worker0_; i < num_task; ++i) {
tsk.task_id = i;
queues_[i]->Push(tsk);
}
// use the master thread to run task 0
if (exclude_worker0_) {
TVMParallelGroupEnv* penv = &(tsk.launcher->env);
if ((*tsk.launcher->flambda)(0, penv, cdata) == 0) {
tsk.launcher->SignalJobFinish();
} else {
tsk.launcher->SignalJobError(tsk.task_id);
}
}
int res = launcher->WaitForJobs();
return res;
}
static ThreadPool* ThreadLocal() {
return dmlc::ThreadLocalStore<ThreadPool>::Get();
}
void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads) {
// this will also reset the affinity of the ThreadGroup
// may use less than the MaxConcurrency number of workers
num_workers_used_ = threads_->Configure(mode, nthreads,
exclude_worker0_);
// if MaxConcurrency restricted the number of workers (e.g., due to
// hyperthreading), respect the restriction
num_workers_used_ = std::min(num_workers_, num_workers_used_);
}
private:
// Internal worker function.
void RunWorker(int worker_id) {
SpscTaskQueue* queue = queues_[worker_id].get();
SpscTaskQueue::Task task;
ParallelLauncher::ThreadLocal()->is_worker = true;
while (queue->Pop(&task)) {
CHECK(task.launcher != nullptr);
TVMParallelGroupEnv* penv = &(task.launcher->env);
void* cdata = task.launcher->cdata;
if ((*task.launcher->flambda)(task.task_id, penv, cdata) == 0) {
task.launcher->SignalJobFinish();
} else {
task.launcher->SignalJobError(task.task_id);
}
}
}
int num_workers_;
// number of workers used (can be restricted with affinity pref)
int num_workers_used_;
// if excluding worker 0 and using master to run task 0
#ifndef _LIBCPP_SGX_CONFIG
bool exclude_worker0_{true};
#else
bool exclude_worker0_{false};
#endif
std::vector<std::unique_ptr<SpscTaskQueue> > queues_;
std::unique_ptr<tvm::runtime::threading::ThreadGroup> threads_;
};
TVM_REGISTER_GLOBAL("runtime.config_threadpool")
.set_body([](TVMArgs args, TVMRetValue* rv) {
threading::ThreadGroup::AffinityMode mode =\
static_cast<threading::ThreadGroup::AffinityMode>(\
static_cast<int>(args[0]));
int nthreads = args[1];
ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads);
});
} // namespace runtime
} // namespace tvm
int TVMBackendParallelLaunch(
FTVMParallelLambda flambda,
void* cdata,
int num_task) {
int res = tvm::runtime::ThreadPool::ThreadLocal()->Launch(
flambda, cdata, num_task, 1);
return res;
}
int TVMBackendParallelBarrier(int task_id, TVMParallelGroupEnv* penv) {
using tvm::runtime::kSyncStride;
int num_task = penv->num_task;
std::atomic<int>* sync_counter =
reinterpret_cast<std::atomic<int>*>(penv->sync_handle);
int old_counter = sync_counter[task_id * kSyncStride].fetch_add(
1, std::memory_order_release);
for (int i = 0; i < num_task; ++i) {
if (i != task_id) {
while (sync_counter[i * kSyncStride].load(
std::memory_order_relaxed) <= old_counter) {
tvm::runtime::threading::Yield();
}
}
}
std::atomic_thread_fence(std::memory_order_acquire);
return 0;
}
/*!
* Copyright (c) 2017 by Contributors
* \file thread_storage_scope.h
* \brief Extract thread axis configuration from TVMArgs.
*/
#ifndef TVM_RUNTIME_THREAD_STORAGE_SCOPE_H_
#define TVM_RUNTIME_THREAD_STORAGE_SCOPE_H_
#include <dgl/runtime/packed_func.h>
#include <string>
#include <vector>
namespace tvm {
namespace runtime {
/*!
* \brief Memory hierachy rank in the storage system
* \note The global rank and shared rank have one to one
* correspondence to the thread rank.
*/
enum class StorageRank {
/*! \brief global memory */
kGlobal = 0,
/*! \brief shared memory among thread group */
kShared = 1,
/*!
* \brief reserved for warp memory.
* This is only used by programming model.
* There is no such memory usually in GPU.
* Instead, we can simulate it by registers and shuffle.
*/
kWarp = 2,
/*! \brief thread local memory */
kLocal = 3
};
/*!
* \param thread_scope_rank The thread scope rank
* \return default storage rank given the thread scope
*/
inline StorageRank DefaultStorageRank(int thread_scope_rank) {
switch (thread_scope_rank) {
case -1: return StorageRank::kGlobal;
case 0: return StorageRank::kShared;
case 1: return StorageRank::kLocal;
default: {
LOG(FATAL) << "unknown rank";
return StorageRank::kGlobal;
}
}
}
/*! \brief class to represent storage scope */
struct StorageScope {
/*! \brief The rank of the storage */
StorageRank rank{StorageRank::kGlobal};
/*! \brief tag for special purpose memory. */
std::string tag;
// comparator
inline bool operator==(const StorageScope& other) const {
return rank == other.rank && tag == other.tag;
}
inline bool operator!=(const StorageScope& other) const {
return !(*this == other);
}
inline std::string to_string() const {
std::string ret;
switch (rank) {
case StorageRank::kGlobal: return "global" + tag;
case StorageRank::kShared: return "shared" + tag;
case StorageRank::kWarp: return "warp" + tag;
case StorageRank::kLocal: return "local" + tag;
default: LOG(FATAL) << "unknown storage scope"; return "";
}
}
/*!
* \brief make storage scope from string
* \param s The string to be parsed.
* \return The storage scope.
*/
static StorageScope make(const std::string& s) {
StorageScope r;
if (s.compare(0, 6, "global") == 0) {
r.rank = StorageRank::kGlobal;
r.tag = s.substr(6, std::string::npos);
} else if (s.compare(0, 6, "shared") == 0) {
r.rank = StorageRank::kShared;
r.tag = s.substr(6, std::string::npos);
} else if (s.compare(0, 4, "warp") == 0) {
r.rank = StorageRank::kWarp;
r.tag = s.substr(4, std::string::npos);
} else if (s.compare(0, 5, "local") == 0) {
r.rank = StorageRank::kLocal;
r.tag = s.substr(5, std::string::npos);
} else {
LOG(FATAL) << "unknown storage scope " << s;
}
return r;
}
};
/*! \brief class to represent thread scope */
struct ThreadScope {
/*! \brief The rank of thread scope */
int rank{0};
/*! \brief the dimension index under the rank */
int dim_index{0};
/*!
* \brief make storage scope from string
* \param s The string to be parsed.
* \return The storage scope.
*/
static ThreadScope make(const std::string& s) {
ThreadScope r;
if (s == "vthread" || s == "cthread") {
// virtual thread at the same level as local
r.rank = 1;
r.dim_index = -1;
} else if (s.compare(0, 9, "blockIdx.") == 0) {
r.rank = 0;
r.dim_index = static_cast<int>(s[9] - 'x');
} else if (s.compare(0, 10, "threadIdx.") == 0) {
r.rank = 1;
r.dim_index = static_cast<int>(s[10] - 'x');
} else {
LOG(FATAL) << "Unknown threadscope " << s;
}
return r;
}
};
/*! \brief workload speccification */
struct ThreadWorkLoad {
// array, first three are thread configuration.
size_t work_size[6];
/*!
* \param i The block dimension.
* \return i-th block dim
*/
inline size_t block_dim(size_t i) const {
return work_size[i + 3];
}
/*!
* \param i The grid dimension.
* \return i-th grid dim
*/
inline size_t grid_dim(size_t i) const {
return work_size[i];
}
};
/*! \brief Thread axis configuration */
class ThreadAxisConfig {
public:
void Init(size_t base,
const std::vector<std::string>& thread_axis_tags) {
base_ = base;
std::vector<bool> filled(6, false);
for (size_t i = 0; i < thread_axis_tags.size(); ++i) {
const std::string& tag = thread_axis_tags[i];
ThreadScope ts = ThreadScope::make(tag);
arg_index_map_.push_back(ts.rank * 3 + ts.dim_index);
filled[ts.rank * 3 + ts.dim_index] = true;
}
work_dim_ = 1;
for (int i = 0; i < 3; ++i) {
if (filled[i] || filled[i + 3]) {
work_dim_ = i + 1;
}
}
}
// extract workload from arguments.
ThreadWorkLoad Extract(TVMArgs x) const {
ThreadWorkLoad w;
std::fill(w.work_size, w.work_size + 6, 1);
for (size_t i = 0; i < arg_index_map_.size(); ++i) {
w.work_size[arg_index_map_[i]] =
static_cast<size_t>(x.values[base_ + i].v_int64);
}
return w;
}
// return the work dim
size_t work_dim() const {
return work_dim_;
}
private:
/*! \brief base axis */
size_t base_;
/*! \brief The worker dimension */
size_t work_dim_;
/*! \brief The index mapping. */
std::vector<uint32_t> arg_index_map_;
};
} // namespace runtime
} // namespace tvm
namespace std {
template <>
struct hash<::tvm::runtime::StorageScope> {
std::size_t operator()(const ::tvm::runtime::StorageScope& k) const {
return static_cast<size_t>(k.rank);
}
};
} // namespace std
#endif // TVM_RUNTIME_THREAD_STORAGE_SCOPE_H_
/*!
* Copyright (c) 2018 by Contributors
* \file threading_backend.cc
* \brief Native threading backend
*/
#include <dgl/runtime/threading_backend.h>
#include <dmlc/logging.h>
#include <thread>
#include <algorithm>
#if defined(__linux__) || defined(__ANDROID__)
#include <fstream>
#else
#endif
#if defined(__linux__)
#include <sched.h>
#endif
namespace tvm {
namespace runtime {
namespace threading {
class ThreadGroup::Impl {
public:
Impl(int num_workers,
std::function<void(int)> worker_callback,
bool exclude_worker0)
: num_workers_(num_workers) {
CHECK_GE(num_workers, 1)
<< "Requested a non-positive number of worker threads.";
for (int i = exclude_worker0; i < num_workers_; ++i) {
threads_.emplace_back([worker_callback, i] { worker_callback(i); });
}
InitSortedOrder();
}
~Impl() { Join(); }
void Join() {
for (auto& t : threads_) {
if (t.joinable()) t.join();
}
}
int Configure(AffinityMode mode, int nthreads, bool exclude_worker0) {
int num_workers_used = 0;
if (mode == kLittle) {
num_workers_used = little_count_;
} else if (mode == kBig) {
num_workers_used = big_count_;
} else {
// use default
num_workers_used = threading::MaxConcurrency();
}
// if a specific number was given, use that
if (nthreads) {
num_workers_used = nthreads;
}
// if MaxConcurrency restricted the number of workers (e.g., due to
// hyperthreading), respect the restriction. On CPUs with N logical cores
// and N/2 physical cores this will set affinity to the first N/2 logical
// ones.
num_workers_used = std::min(num_workers_, num_workers_used);
const char *val = getenv("TVM_BIND_THREADS");
if (val == nullptr || atoi(val) == 1) {
// Do not set affinity if there are more workers than found cores
if (sorted_order_.size() >= static_cast<unsigned int>(num_workers_)) {
SetAffinity(exclude_worker0, mode == kLittle);
} else {
LOG(WARNING)
<< "The thread affinity cannot be set when the number of workers"
<< "is larger than the number of available cores in the system.";
}
}
return num_workers_used;
}
private:
// bind worker threads to disjoint cores
// if worker 0 is offloaded to master, i.e. exclude_worker0 is true,
// the master thread is bound to core 0.
void SetAffinity(bool exclude_worker0, bool reverse = false) {
#if defined(__ANDROID__)
#ifndef CPU_SET
#define CPU_SETSIZE 1024
#define __NCPUBITS (8 * sizeof (uint64_t))
typedef struct {
uint64_t __bits[CPU_SETSIZE / __NCPUBITS];
} cpu_set_t;
#define CPU_SET(cpu, cpusetp) \
((cpusetp)->__bits[(cpu)/__NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS)))
#define CPU_ZERO(cpusetp) \
memset((cpusetp), 0, sizeof(cpu_set_t))
#endif
#endif
#if defined(__linux__) || defined(__ANDROID__)
CHECK_GE(sorted_order_.size(), num_workers_);
for (unsigned i = 0; i < threads_.size(); ++i) {
unsigned core_id;
if (reverse) {
core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1];
} else {
core_id = sorted_order_[i + exclude_worker0];
}
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_id, &cpuset);
#if defined(__ANDROID__)
sched_setaffinity(threads_[i].native_handle(), sizeof(cpu_set_t), &cpuset);
#else
pthread_setaffinity_np(threads_[i].native_handle(),
sizeof(cpu_set_t), &cpuset);
#endif
}
if (exclude_worker0) { // bind the master thread to core 0
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
if (reverse) {
CPU_SET(sorted_order_[sorted_order_.size() - 1], &cpuset);
} else {
CPU_SET(sorted_order_[0], &cpuset);
}
#if defined(__ANDROID__)
sched_setaffinity(pthread_self(),
sizeof(cpu_set_t), &cpuset);
#else
pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
#endif
}
#endif
}
void InitSortedOrder() {
unsigned int threads = std::thread::hardware_concurrency();
std::vector<std::pair <unsigned int, int64_t> > max_freqs;
for (unsigned int i = 0; i < threads; ++i) {
int64_t cur_freq = 0;
#if defined(__linux__) || defined(__ANDROID__)
std::ostringstream filepath;
filepath << "/sys/devices/system/cpu/cpu" << i << "/cpufreq/cpuinfo_max_freq";
std::ifstream ifs(filepath.str());
if (!ifs.fail()) {
if (!(ifs >> cur_freq)) {
cur_freq = -1;
}
ifs.close();
}
#endif
max_freqs.push_back(std::make_pair(i, cur_freq));
}
auto fcmpbyfreq = [] (const std::pair<unsigned int, int64_t> &a,
const std::pair<unsigned int, int64_t> &b) {
return a.second == b.second ? a.first < b.first : a.second > b.second;
};
std::sort(max_freqs.begin(), max_freqs.end(), fcmpbyfreq);
int64_t big_freq = max_freqs.begin()->second;
int64_t little_freq = max_freqs.rbegin()->second;
for (auto it = max_freqs.begin(); it != max_freqs.end(); it++) {
sorted_order_.push_back(it->first);
if (big_freq == it->second) {
big_count_++;
}
if (big_freq != little_freq && little_freq == it->second) {
little_count_++;
}
}
if (big_count_ + little_count_ != static_cast<int>(sorted_order_.size())) {
LOG(WARNING) << "more than two frequencies detected!";
}
}
int num_workers_;
std::vector<std::thread> threads_;
std::vector<unsigned int> sorted_order_;
int big_count_ = 0;
int little_count_ = 0;
};
ThreadGroup::ThreadGroup(int num_workers,
std::function<void(int)> worker_callback,
bool exclude_worker0)
: impl_(new ThreadGroup::Impl(num_workers, worker_callback, exclude_worker0)) {}
ThreadGroup::~ThreadGroup() { delete impl_; }
void ThreadGroup::Join() { impl_->Join(); }
int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0) {
return impl_->Configure(mode, nthreads, exclude_worker0);
}
void Yield() {
std::this_thread::yield();
}
int MaxConcurrency() {
int max_concurrency = 1;
const char *val = getenv("TVM_NUM_THREADS");
if (val == nullptr) {
val = getenv("OMP_NUM_THREADS");
}
if (val != nullptr) {
max_concurrency = atoi(val);
} else {
max_concurrency = std::thread::hardware_concurrency();
#if defined(_M_X64) || defined(__x86_64__)
max_concurrency /= 2; // ignore hyper-threading
#endif
}
return std::max(max_concurrency, 1);
}
} // namespace threading
} // namespace runtime
} // namespace tvm
/*!
* Copyright (c) 2017 by Contributors
* \file workspace_pool.h
* \brief Workspace pool utility.
*/
#include "workspace_pool.h"
namespace tvm {
namespace runtime {
// page size.
constexpr size_t kWorkspacePageSize = 4 << 10;
class WorkspacePool::Pool {
public:
// constructor
Pool() {
// safe guard header on each list.
Entry e;
e.data = nullptr;
e.size = 0;
free_list_.push_back(e);
allocated_.push_back(e);
}
// allocate from pool
void* Alloc(TVMContext ctx, DeviceAPI* device, size_t nbytes) {
// Allocate align to page.
nbytes = (nbytes + (kWorkspacePageSize - 1)) / kWorkspacePageSize * kWorkspacePageSize;
if (nbytes == 0) nbytes = kWorkspacePageSize;
Entry e;
TVMType type;
type.code = kDLUInt;
type.bits = 8;
type.lanes = 1;
if (free_list_.size() == 2) {
e = free_list_.back();
free_list_.pop_back();
if (e.size < nbytes) {
// resize the page
device->FreeDataSpace(ctx, e.data);
e.data = device->AllocDataSpace(ctx, nbytes, kTempAllocaAlignment, type);
e.size = nbytes;
}
} else if (free_list_.size() == 1) {
e.data = device->AllocDataSpace(ctx, nbytes, kTempAllocaAlignment, type);
e.size = nbytes;
} else {
if (free_list_.back().size >= nbytes) {
// find smallest fit
auto it = free_list_.end() - 2;
for (; it->size >= nbytes; --it) {}
e = *(it + 1);
free_list_.erase(it + 1);
} else {
// resize the page
e = free_list_.back();
free_list_.pop_back();
device->FreeDataSpace(ctx, e.data);
e.data = device->AllocDataSpace(ctx, nbytes, kTempAllocaAlignment, type);
e.size = nbytes;
}
}
allocated_.push_back(e);
return e.data;
}
// free resource back to pool
void Free(void* data) {
Entry e;
if (allocated_.back().data == data) {
// quick path, last allocated.
e = allocated_.back();
allocated_.pop_back();
} else {
int index = static_cast<int>(allocated_.size()) - 2;
for (; index > 0 && allocated_[index].data != data; --index) {}
CHECK_GT(index, 0) << "trying to free things that has not been allocated";
e = allocated_[index];
allocated_.erase(allocated_.begin() + index);
}
if (free_list_.back().size < e.size) {
free_list_.push_back(e);
} else if (free_list_.size() == 2) {
free_list_.push_back(free_list_.back());
free_list_[1] = e;
} else {
size_t i = free_list_.size() - 1;
free_list_.resize(free_list_.size() + 1);
for (; e.size < free_list_[i].size; --i) {
free_list_[i + 1] = free_list_[i];
}
free_list_[i + 1] = e;
}
}
// Release all resources
void Release(TVMContext ctx, DeviceAPI* device) {
CHECK_EQ(allocated_.size(), 1);
for (size_t i = 1; i < free_list_.size(); ++i) {
device->FreeDataSpace(ctx, free_list_[i].data);
}
free_list_.clear();
}
private:
/*! \brief a single entry in the pool */
struct Entry {
void* data;
size_t size;
};
/*! \brief List of free items, sorted from small to big size */
std::vector<Entry> free_list_;
/*! \brief List of allocated items */
std::vector<Entry> allocated_;
};
WorkspacePool::WorkspacePool(DLDeviceType device_type, std::shared_ptr<DeviceAPI> device)
: device_type_(device_type), device_(device) {
}
WorkspacePool::~WorkspacePool() {
for (size_t i = 0; i < array_.size(); ++i) {
if (array_[i] != nullptr) {
TVMContext ctx;
ctx.device_type = device_type_;
ctx.device_id = static_cast<int>(i);
array_[i]->Release(ctx, device_.get());
delete array_[i];
}
}
}
void* WorkspacePool::AllocWorkspace(TVMContext ctx, size_t size) {
if (static_cast<size_t>(ctx.device_id) >= array_.size()) {
array_.resize(ctx.device_id + 1, nullptr);
}
if (array_[ctx.device_id] == nullptr) {
array_[ctx.device_id] = new Pool();
}
return array_[ctx.device_id]->Alloc(ctx, device_.get(), size);
}
void WorkspacePool::FreeWorkspace(TVMContext ctx, void* ptr) {
CHECK(static_cast<size_t>(ctx.device_id) < array_.size() &&
array_[ctx.device_id] != nullptr);
array_[ctx.device_id]->Free(ptr);
}
} // namespace runtime
} // namespace tvm
/*!
* Copyright (c) 2017 by Contributors
* \file workspace_pool.h
* \brief Workspace pool utility.
*/
#ifndef TVM_RUNTIME_WORKSPACE_POOL_H_
#define TVM_RUNTIME_WORKSPACE_POOL_H_
#include <dgl/runtime/device_api.h>
#include <vector>
namespace tvm {
namespace runtime {
/*!
* \brief A workspace pool to manage
*
* \note We have the following assumption about backend temporal
* workspace allocation, and will optimize for such assumption,
* some of these assumptions can be enforced by the compiler.
*
* - Only a few allocation will happen, and space will be released after use.
* - The release order is usually in reverse order of allocate
* - Repeative pattern of same allocations over different runs.
*/
class WorkspacePool {
public:
/*!
* \brief Create pool with specific device type and device.
* \param device_type The device type.
* \param device The device API.
*/
WorkspacePool(DLDeviceType device_type, std::shared_ptr<DeviceAPI> device);
/*! \brief destructor */
~WorkspacePool();
/*!
* \brief Allocate temporal workspace.
* \param ctx The context of allocation.
* \param size The size to be allocated.
*/
void* AllocWorkspace(TVMContext ctx, size_t size);
/*!
* \brief Free temporal workspace in backend execution.
*
* \param ctx The context of allocation.
* \param ptr The pointer to be freed.
*/
void FreeWorkspace(TVMContext ctx, void* ptr);
private:
class Pool;
/*! \brief pool of device local array */
std::vector<Pool*> array_;
/*! \brief device type this pool support */
DLDeviceType device_type_;
/*! \brief The device API */
std::shared_ptr<DeviceAPI> device_;
};
} // namespace runtime
} // namespace tvm
#endif // TVM_RUNTIME_WORKSPACE_POOL_H_
Subproject commit bee4d1dd8dc1ee4a1fd8fa6a96476c2f8b7492a3
Subproject commit ee773cd6ab2a32c07cf3f09ebaf9205ddf0a616e
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment