Commit beeb6e0f authored by Markus Cozowicz's avatar Markus Cozowicz Committed by Guolin Ke
Browse files

Added additional APIs to better support JNI on Spark (#2032)

* added API changes required for JNI performance optimizations (e.g. predict is 3-4x faster)

* removed commented variables

* removed commented header

* renamed method to make it obvious it is created for Spark

* fixed comment alignment

* replaced GetPrimitiveArrayCritical with GetIntArrayElements for training. fixed dead-lock on databricks
parent 95246cda
......@@ -152,6 +152,25 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSR(const void* indptr,
const DatasetHandle reference,
DatasetHandle* out);
/*!
* \brief create a dataset from CSR format through callbacks
* \param get_row_funptr pointer to std::function<void(int idx, std::vector<std::pair<int, double>>& ret). CAlled for every row and expected to clear and fill ret
* \param num_rows number of rows
* \param num_col number of columns
* \param parameters additional parameters
* \param reference used to align bin mapper with other dataset, nullptr means don't used
* \param out created dataset
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSRFunc(void* get_row_funptr,
int num_rows,
int64_t num_col,
const char* parameters,
const DatasetHandle reference,
DatasetHandle* out);
/*!
* \brief create a dataset from CSC format
* \param col_ptr pointer to col headers
......@@ -622,7 +641,7 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterCalcNumPredict(BoosterHandle handle,
/*!
* \brief make prediction for an new data set
* Note: should pre-allocate memory for out_result,
* for noraml and raw score: its length is equal to num_class * num_data
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param indptr pointer to row headers
......@@ -658,10 +677,51 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForCSR(BoosterHandle handle,
int64_t* out_len,
double* out_result);
/*!
* \brief make prediction for an new data set. This method re-uses the internal predictor structure
* from previous calls and is optimized for single row invocation.
* Note: should pre-allocate memory for out_result,
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param indptr pointer to row headers
* \param indptr_type type of indptr, can be C_API_DTYPE_INT32 or C_API_DTYPE_INT64
* \param indices findex
* \param data fvalue
* \param data_type type of data pointer, can be C_API_DTYPE_FLOAT32 or C_API_DTYPE_FLOAT64
* \param nindptr number of rows in the matrix + 1
* \param nelem number of nonzero elements in the matrix
* \param num_col number of columns; when it's set to 0, then guess from data
* \param predict_type
* C_API_PREDICT_NORMAL: normal prediction, with transform (if needed)
* C_API_PREDICT_RAW_SCORE: raw score
* C_API_PREDICT_LEAF_INDEX: leaf index
* \param num_iteration number of iteration for prediction, <= 0 means no limit
* \param parameter Other parameters for the parameters, e.g. early stopping for prediction.
* \param out_len len of output result
* \param out_result used to set a pointer to array, should allocate memory before call this function
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForCSRSingleRow(BoosterHandle handle,
const void* indptr,
int indptr_type,
const int32_t* indices,
const void* data,
int data_type,
int64_t nindptr,
int64_t nelem,
int64_t num_col,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result);
/*!
* \brief make prediction for an new data set
* Note: should pre-allocate memory for out_result,
* for noraml and raw score: its length is equal to num_class * num_data
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param col_ptr pointer to col headers
......@@ -700,7 +760,7 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForCSC(BoosterHandle handle,
/*!
* \brief make prediction for an new data set
* Note: should pre-allocate memory for out_result,
* for noraml and raw score: its length is equal to num_class * num_data
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param data pointer to the data space
......@@ -730,6 +790,38 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForMat(BoosterHandle handle,
int64_t* out_len,
double* out_result);
/*!
* \brief make prediction for an new data set. This method re-uses the internal predictor structure
* from previous calls and is optimized for single row invocation.
* Note: should pre-allocate memory for out_result,
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param data pointer to the data space
* \param data_type type of data pointer, can be C_API_DTYPE_FLOAT32 or C_API_DTYPE_FLOAT64
* \param nrow number of rows
* \param ncol number columns
* \param is_row_major 1 for row major, 0 for column major
* \param predict_type
* C_API_PREDICT_NORMAL: normal prediction, with transform (if needed)
* C_API_PREDICT_RAW_SCORE: raw score
* C_API_PREDICT_LEAF_INDEX: leaf index
* \param num_iteration number of iteration for prediction, <= 0 means no limit
* \param parameter Other parameters for the parameters, e.g. early stopping for prediction.
* \param out_len len of output result
* \param out_result used to set a pointer to array, should allocate memory before call this function
* \return 0 when succeed, -1 when failure happens
*/LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForMatSingleRow(BoosterHandle handle,
const void* data,
int data_type,
int ncol,
int is_row_major,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result);
/*!
* \brief save model into file
* \param handle handle
......
......@@ -197,6 +197,41 @@ class Booster {
boosting_->RollbackOneIter();
}
void PredictSingleRow(int num_iteration, int predict_type,
std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun,
const Config& config,
double* out_result, int64_t* out_len) {
std::lock_guard<std::mutex> lock(mutex_);
if (single_row_predictor_.get() == nullptr) {
bool is_predict_leaf = false;
bool is_raw_score = false;
bool predict_contrib = false;
if (predict_type == C_API_PREDICT_LEAF_INDEX) {
is_predict_leaf = true;
} else if (predict_type == C_API_PREDICT_RAW_SCORE) {
is_raw_score = true;
} else if (predict_type == C_API_PREDICT_CONTRIB) {
predict_contrib = true;
} else {
is_raw_score = false;
}
// TODO: config could be optimized away... (maybe using lambda callback?)
single_row_predictor_.reset(new Predictor(boosting_.get(), num_iteration, is_raw_score, is_predict_leaf, predict_contrib,
config.pred_early_stop, config.pred_early_stop_freq, config.pred_early_stop_margin));
single_row_num_pred_in_one_row_ = boosting_->NumPredictOneRow(num_iteration, is_predict_leaf, predict_contrib);
single_row_predict_function_ = single_row_predictor_->GetPredictFunction();
}
auto one_row = get_row_fun(0);
auto pred_wrt_ptr = out_result;
single_row_predict_function_(one_row, pred_wrt_ptr);
*out_len = single_row_num_pred_in_one_row_;
}
void Predict(int num_iteration, int predict_type, int nrow,
std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun,
const Config& config,
......@@ -326,6 +361,10 @@ class Booster {
private:
const Dataset* train_data_;
std::unique_ptr<Boosting> boosting_;
std::unique_ptr<Predictor> single_row_predictor_;
PredictFunction single_row_predict_function_;
int64_t single_row_num_pred_in_one_row_;
/*! \brief All configs */
Config config_;
/*! \brief Metric for training data */
......@@ -665,6 +704,77 @@ int LGBM_DatasetCreateFromCSR(const void* indptr,
API_END();
}
int LGBM_DatasetCreateFromCSRFunc(void* get_row_funptr,
int num_rows,
int64_t num_col,
const char* parameters,
const DatasetHandle reference,
DatasetHandle* out) {
API_BEGIN();
auto get_row_fun = *static_cast<std::function<void(int idx, std::vector<std::pair<int, double>>&)>*>(get_row_funptr);
auto param = Config::Str2Map(parameters);
Config config;
config.Set(param);
if (config.num_threads > 0) {
omp_set_num_threads(config.num_threads);
}
std::unique_ptr<Dataset> ret;
int32_t nrow = num_rows;
if (reference == nullptr) {
// sample data first
Random rand(config.data_random_seed);
int sample_cnt = static_cast<int>(nrow < config.bin_construct_sample_cnt ? nrow : config.bin_construct_sample_cnt);
auto sample_indices = rand.Sample(nrow, sample_cnt);
sample_cnt = static_cast<int>(sample_indices.size());
std::vector<std::vector<double>> sample_values(num_col);
std::vector<std::vector<int>> sample_idx(num_col);
// local buffer to re-use memory
std::vector<std::pair<int, double>> buffer;
for (size_t i = 0; i < sample_indices.size(); ++i) {
auto idx = sample_indices[i];
get_row_fun(static_cast<int>(idx), buffer);
for (std::pair<int, double>& inner_data : buffer) {
CHECK(inner_data.first < num_col);
if (std::fabs(inner_data.second) > kZeroThreshold || std::isnan(inner_data.second)) {
sample_values[inner_data.first].emplace_back(inner_data.second);
sample_idx[inner_data.first].emplace_back(static_cast<int>(i));
}
}
}
DatasetLoader loader(config, nullptr, 1, nullptr);
ret.reset(loader.CostructFromSampleData(Common::Vector2Ptr<double>(sample_values).data(),
Common::Vector2Ptr<int>(sample_idx).data(),
static_cast<int>(sample_values.size()),
Common::VectorSize<double>(sample_values).data(),
sample_cnt, nrow));
} else {
ret.reset(new Dataset(nrow));
ret->CreateValid(
reinterpret_cast<const Dataset*>(reference));
}
OMP_INIT_EX();
std::vector<std::pair<int, double>> threadBuffer;
#pragma omp parallel for schedule(static) private(threadBuffer)
for (int i = 0; i < num_rows; ++i) {
OMP_LOOP_EX_BEGIN();
{
const int tid = omp_get_thread_num();
get_row_fun(i, threadBuffer);
ret->PushOneRow(tid, i, threadBuffer);
}
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
ret->FinishLoad();
*out = ret.release();
API_END();
}
int LGBM_DatasetCreateFromCSC(const void* col_ptr,
int col_ptr_type,
const int32_t* indices,
......@@ -1175,6 +1285,35 @@ int LGBM_BoosterPredictForCSR(BoosterHandle handle,
API_END();
}
int LGBM_BoosterPredictForCSRSingleRow(BoosterHandle handle,
const void* indptr,
int indptr_type,
const int32_t* indices,
const void* data,
int data_type,
int64_t nindptr,
int64_t nelem,
int64_t,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result) {
API_BEGIN();
auto param = Config::Str2Map(parameter);
Config config;
config.Set(param);
if (config.num_threads > 0) {
omp_set_num_threads(config.num_threads);
}
Booster* ref_booster = reinterpret_cast<Booster*>(handle);
auto get_row_fun = RowFunctionFromCSR(indptr, indptr_type, indices, data, data_type, nindptr, nelem);
ref_booster->PredictSingleRow(num_iteration, predict_type, get_row_fun,
config, out_result, out_len);
API_END();
}
int LGBM_BoosterPredictForCSC(BoosterHandle handle,
const void* col_ptr,
int col_ptr_type,
......@@ -1252,6 +1391,31 @@ int LGBM_BoosterPredictForMat(BoosterHandle handle,
API_END();
}
int LGBM_BoosterPredictForMatSingleRow(BoosterHandle handle,
const void* data,
int data_type,
int32_t ncol,
int is_row_major,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result) {
API_BEGIN();
auto param = Config::Str2Map(parameter);
Config config;
config.Set(param);
if (config.num_threads > 0) {
omp_set_num_threads(config.num_threads);
}
Booster* ref_booster = reinterpret_cast<Booster*>(handle);
auto get_row_fun = RowPairFunctionFromDenseMatric(data, 1, ncol, data_type, is_row_major);
ref_booster->PredictSingleRow(num_iteration, predict_type, get_row_fun,
config, out_result, out_len);
API_END();
}
int LGBM_BoosterSaveModel(BoosterHandle handle,
int start_iteration,
int num_iteration,
......
......@@ -16,6 +16,10 @@
%include "cpointer.i"
%include "carrays.i"
%typemap(in, numinputs=0) JNIEnv *jenv %{
$1 = jenv;
%}
%inline %{
char * LGBM_BoosterSaveModelToStringSWIG(BoosterHandle handle,
int start_iteration,
......@@ -48,6 +52,138 @@
return nullptr;
}
return dst;
}
int LGBM_BoosterPredictForMatSingle(JNIEnv *jenv,
jdoubleArray data,
BoosterHandle handle,
int data_type,
int ncol,
int is_row_major,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result) {
double* data0 = (double*)jenv->GetPrimitiveArrayCritical(data, 0);
int ret = LGBM_BoosterPredictForMatSingleRow(handle, data0, data_type, ncol, is_row_major, predict_type,
num_iteration, parameter, out_len, out_result);
jenv->ReleasePrimitiveArrayCritical(data, data0, JNI_ABORT);
return ret;
}
int LGBM_BoosterPredictForCSRSingle(JNIEnv *jenv,
jintArray indices,
jdoubleArray values,
int numNonZeros,
BoosterHandle handle,
int indptr_type,
int data_type,
int64_t nelem,
int64_t num_col,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result) {
// Alternatives
// - GetIntArrayElements: performs copy
// - GetDirectBufferAddress: fails on wrapped array
// Some words of warning for GetPrimitiveArrayCritical
// https://stackoverflow.com/questions/23258357/whats-the-trade-off-between-using-getprimitivearraycritical-and-getprimitivety
jboolean isCopy;
int* indices0 = (int*)jenv->GetPrimitiveArrayCritical(indices, &isCopy);
double* values0 = (double*)jenv->GetPrimitiveArrayCritical(values, &isCopy);
int32_t ind[2] = { 0, numNonZeros };
int ret = LGBM_BoosterPredictForCSRSingleRow(handle, ind, indptr_type, indices0, values0, data_type, 2,
nelem, num_col, predict_type, num_iteration, parameter, out_len, out_result);
jenv->ReleasePrimitiveArrayCritical(values, values0, JNI_ABORT);
jenv->ReleasePrimitiveArrayCritical(indices, indices0, JNI_ABORT);
return ret;
}
#include <vector>
#include <functional>
struct CSRDirect {
jintArray indices;
jdoubleArray values;
int* indices0;
double* values0;
int size;
};
int LGBM_DatasetCreateFromCSRSpark(JNIEnv *jenv,
jobjectArray arrayOfSparseVector,
int num_rows,
int64_t num_col,
const char* parameters,
const DatasetHandle reference,
DatasetHandle* out) {
jclass sparseVectorClass = jenv->FindClass("org/apache/spark/ml/linalg/SparseVector");
jmethodID sparseVectorIndices = jenv->GetMethodID(sparseVectorClass, "indices", "()[I");
jmethodID sparseVectorValues = jenv->GetMethodID(sparseVectorClass, "values", "()[D");
std::vector<CSRDirect> jniCache;
jniCache.reserve(num_rows);
// this needs to be done ahead of time as row_func is invoked from multiple threads
// these threads would have to be registered with the JVM and also unregistered.
// It is not clear if that can be achieved with OpenMP
for (int i=0; i<num_rows; i++) {
// get the row
jobject objSparseVec = jenv->GetObjectArrayElement(arrayOfSparseVector, i);
// get the size, indices and values
auto indices = (jintArray)jenv->CallObjectMethod(objSparseVec, sparseVectorIndices);
auto values = (jdoubleArray)jenv->CallObjectMethod(objSparseVec, sparseVectorValues);
int size = jenv->GetArrayLength(indices);
// Note: when testing on larger data (e.g. 288k rows per partition and 36mio rows total)
// using GetPrimitiveArrayCritical resulted in a dead-lock
// lock arrays
// int* indices0 = (int*)jenv->GetPrimitiveArrayCritical(indices, 0);
// double* values0 = (double*)jenv->GetPrimitiveArrayCritical(values, 0);
// in test-usecase an alternative to GetPrimitiveArrayCritical as it performs copies
int* indices0 = jenv->GetIntArrayElements(indices, 0);
double* values0 = jenv->GetDoubleArrayElements(values, 0);
jniCache.push_back({indices, values, indices0, values0, size});
}
// type is important here as we want a std::function, rather than a lambda
std::function<void(int idx, std::vector<std::pair<int, double>>& ret)> row_func = [&](int row_num, std::vector<std::pair<int, double>>& ret) {
auto& jc = jniCache[row_num];
ret.clear(); // reset size, but not free()
ret.reserve(jc.size); // make sure we have enough allocated
// copy data
int* indices0p = jc.indices0;
double* values0p = jc.values0;
int* indices0e = indices0p + jc.size;
for (; indices0p != indices0e; ++indices0p, ++values0p)
ret.emplace_back(*indices0p, *values0p);
};
int ret = LGBM_DatasetCreateFromCSRFunc(&row_func, num_rows, num_col, parameters, reference, out);
for (auto& jc : jniCache) {
// jenv->ReleasePrimitiveArrayCritical(jc.values, jc.values0, JNI_ABORT);
// jenv->ReleasePrimitiveArrayCritical(jc.indices, jc.indices0, JNI_ABORT);
jenv->ReleaseDoubleArrayElements(jc.values, jc.values0, JNI_ABORT);
jenv->ReleaseIntArrayElements(jc.indices, jc.indices0, JNI_ABORT);
}
return ret;
}
%}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment