# coding: utf-8 """Library with training routines of LightGBM.""" import collections import copy from operator import attrgetter from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, Union import numpy as np from . import callback from .basic import (Booster, Dataset, LightGBMError, _ArrayLike, _choose_param_value, _ConfigAliases, _InnerPredictor, _log_warning) from .compat import SKLEARN_INSTALLED, _LGBMGroupKFold, _LGBMStratifiedKFold _LGBM_CustomObjectiveFunction = Callable[ [np.ndarray, Dataset], Tuple[_ArrayLike, _ArrayLike] ] _LGBM_CustomMetricFunction = Callable[ [np.ndarray, Dataset], Tuple[str, float, bool] ] def train( params: Dict[str, Any], train_set: Dataset, num_boost_round: int = 100, valid_sets: Optional[List[Dataset]] = None, valid_names: Optional[List[str]] = None, fobj: Optional[_LGBM_CustomObjectiveFunction] = None, feval: Optional[Union[_LGBM_CustomMetricFunction, List[_LGBM_CustomMetricFunction]]] = None, init_model: Optional[Union[str, Path, Booster]] = None, feature_name: Union[List[str], str] = 'auto', categorical_feature: Union[List[str], List[int], str] = 'auto', keep_training_booster: bool = False, callbacks: Optional[List[Callable]] = None ) -> Booster: """Perform the training with given parameters. Parameters ---------- params : dict Parameters for training. train_set : Dataset Data to be trained on. num_boost_round : int, optional (default=100) Number of boosting iterations. valid_sets : list of Dataset, or None, optional (default=None) List of data to be evaluated on during training. valid_names : list of str, or None, optional (default=None) Names of ``valid_sets``. fobj : callable or None, optional (default=None) Customized objective function. Should accept two parameters: preds, train_data, and return (grad, hess). preds : numpy 1-D array The predicted values. Predicted values are returned before any transformation, e.g. they are raw margin instead of probability of positive class for binary task. train_data : Dataset The training dataset. grad : list, numpy 1-D array or pandas Series The value of the first order derivative (gradient) of the loss with respect to the elements of preds for each sample point. hess : list, numpy 1-D array or pandas Series The value of the second order derivative (Hessian) of the loss with respect to the elements of preds for each sample point. For multi-class task, the preds is group by class_id first, then group by row_id. If you want to get i-th row preds in j-th class, the access way is score[j * num_data + i] and you should group grad and hess in this way as well. feval : callable, list of callable, or None, optional (default=None) Customized evaluation function. Each evaluation function should accept two parameters: preds, train_data, and return (eval_name, eval_result, is_higher_better) or list of such tuples. preds : numpy 1-D array The predicted values. If ``fobj`` is specified, predicted values are returned before any transformation, e.g. they are raw margin instead of probability of positive class for binary task in this case. train_data : Dataset The training dataset. eval_name : str The name of evaluation function (without whitespaces). eval_result : float The eval result. is_higher_better : bool Is eval result higher better, e.g. AUC is ``is_higher_better``. For multi-class task, the preds is group by class_id first, then group by row_id. If you want to get i-th row preds in j-th class, the access way is preds[j * num_data + i]. To ignore the default metric corresponding to the used objective, set the ``metric`` parameter to the string ``"None"`` in ``params``. init_model : str, pathlib.Path, Booster or None, optional (default=None) Filename of LightGBM model or Booster instance used for continue training. feature_name : list of str, or 'auto', optional (default="auto") Feature names. If 'auto' and data is pandas DataFrame, data columns names are used. categorical_feature : list of str or int, or 'auto', optional (default="auto") Categorical features. If list of int, interpreted as indices. If list of str, interpreted as feature names (need to specify ``feature_name`` as well). If 'auto' and data is pandas DataFrame, pandas unordered categorical columns are used. All values in categorical features should be less than int32 max value (2147483647). Large values could be memory consuming. Consider using consecutive integers starting from zero. All negative values in categorical features will be treated as missing values. The output cannot be monotonically constrained with respect to a categorical feature. keep_training_booster : bool, optional (default=False) Whether the returned Booster will be used to keep training. If False, the returned value will be converted into _InnerPredictor before returning. This means you won't be able to use ``eval``, ``eval_train`` or ``eval_valid`` methods of the returned Booster. When your model is very large and cause the memory error, you can try to set this param to ``True`` to avoid the model conversion performed during the internal call of ``model_to_string``. You can still use _InnerPredictor as ``init_model`` for future continue training. callbacks : list of callable, or None, optional (default=None) List of callback functions that are applied at each iteration. See Callbacks in Python API for more information. Returns ------- booster : Booster The trained Booster model. """ # create predictor first params = copy.deepcopy(params) if fobj is not None: for obj_alias in _ConfigAliases.get("objective"): params.pop(obj_alias, None) params['objective'] = 'none' for alias in _ConfigAliases.get("num_iterations"): if alias in params: num_boost_round = params.pop(alias) _log_warning(f"Found `{alias}` in params. Will use it instead of argument") params["num_iterations"] = num_boost_round # setting early stopping via global params should be possible params = _choose_param_value( main_param_name="early_stopping_round", params=params, default_value=None ) if params["early_stopping_round"] is None: params.pop("early_stopping_round") first_metric_only = params.get('first_metric_only', False) if num_boost_round <= 0: raise ValueError("num_boost_round should be greater than zero.") predictor: Optional[_InnerPredictor] = None if isinstance(init_model, (str, Path)): predictor = _InnerPredictor(model_file=init_model, pred_parameter=params) elif isinstance(init_model, Booster): predictor = init_model._to_predictor(dict(init_model.params, **params)) init_iteration = predictor.num_total_iteration if predictor is not None else 0 # check dataset if not isinstance(train_set, Dataset): raise TypeError("Training only accepts Dataset object") train_set._update_params(params) \ ._set_predictor(predictor) \ .set_feature_name(feature_name) \ .set_categorical_feature(categorical_feature) is_valid_contain_train = False train_data_name = "training" reduced_valid_sets = [] name_valid_sets = [] if valid_sets is not None: if isinstance(valid_sets, Dataset): valid_sets = [valid_sets] if isinstance(valid_names, str): valid_names = [valid_names] for i, valid_data in enumerate(valid_sets): # reduce cost for prediction training data if valid_data is train_set: is_valid_contain_train = True if valid_names is not None: train_data_name = valid_names[i] continue if not isinstance(valid_data, Dataset): raise TypeError("Training only accepts Dataset object") reduced_valid_sets.append(valid_data._update_params(params).set_reference(train_set)) if valid_names is not None and len(valid_names) > i: name_valid_sets.append(valid_names[i]) else: name_valid_sets.append(f'valid_{i}') # process callbacks if callbacks is None: callbacks_set = set() else: for i, cb in enumerate(callbacks): cb.__dict__.setdefault('order', i - len(callbacks)) callbacks_set = set(callbacks) if "early_stopping_round" in params: callbacks_set.add( callback.early_stopping( stopping_rounds=params["early_stopping_round"], first_metric_only=first_metric_only, verbose=_choose_param_value( main_param_name="verbosity", params=params, default_value=1 ).pop("verbosity") > 0 ) ) callbacks_before_iter_set = {cb for cb in callbacks_set if getattr(cb, 'before_iteration', False)} callbacks_after_iter_set = callbacks_set - callbacks_before_iter_set callbacks_before_iter = sorted(callbacks_before_iter_set, key=attrgetter('order')) callbacks_after_iter = sorted(callbacks_after_iter_set, key=attrgetter('order')) # construct booster try: booster = Booster(params=params, train_set=train_set) if is_valid_contain_train: booster.set_train_data_name(train_data_name) for valid_set, name_valid_set in zip(reduced_valid_sets, name_valid_sets): booster.add_valid(valid_set, name_valid_set) finally: train_set._reverse_update_params() for valid_set in reduced_valid_sets: valid_set._reverse_update_params() booster.best_iteration = 0 # start training for i in range(init_iteration, init_iteration + num_boost_round): for cb in callbacks_before_iter: cb(callback.CallbackEnv(model=booster, params=params, iteration=i, begin_iteration=init_iteration, end_iteration=init_iteration + num_boost_round, evaluation_result_list=None)) booster.update(fobj=fobj) evaluation_result_list = [] # check evaluation result. if valid_sets is not None: if is_valid_contain_train: evaluation_result_list.extend(booster.eval_train(feval)) evaluation_result_list.extend(booster.eval_valid(feval)) try: for cb in callbacks_after_iter: cb(callback.CallbackEnv(model=booster, params=params, iteration=i, begin_iteration=init_iteration, end_iteration=init_iteration + num_boost_round, evaluation_result_list=evaluation_result_list)) except callback.EarlyStopException as earlyStopException: booster.best_iteration = earlyStopException.best_iteration + 1 evaluation_result_list = earlyStopException.best_score break booster.best_score = collections.defaultdict(collections.OrderedDict) for dataset_name, eval_name, score, _ in evaluation_result_list: booster.best_score[dataset_name][eval_name] = score if not keep_training_booster: booster.model_from_string(booster.model_to_string()).free_dataset() return booster class CVBooster: """CVBooster in LightGBM. Auxiliary data structure to hold and redirect all boosters of ``cv`` function. This class has the same methods as Booster class. All method calls are actually performed for underlying Boosters and then all returned results are returned in a list. Attributes ---------- boosters : list of Booster The list of underlying fitted models. best_iteration : int The best iteration of fitted model. """ def __init__(self): """Initialize the CVBooster. Generally, no need to instantiate manually. """ self.boosters = [] self.best_iteration = -1 def _append(self, booster): """Add a booster to CVBooster.""" self.boosters.append(booster) def __getattr__(self, name): """Redirect methods call of CVBooster.""" def handler_function(*args, **kwargs): """Call methods with each booster, and concatenate their results.""" ret = [] for booster in self.boosters: ret.append(getattr(booster, name)(*args, **kwargs)) return ret return handler_function def _make_n_folds(full_data, folds, nfold, params, seed, fpreproc=None, stratified=True, shuffle=True, eval_train_metric=False): """Make a n-fold list of Booster from random indices.""" full_data = full_data.construct() num_data = full_data.num_data() if folds is not None: if not hasattr(folds, '__iter__') and not hasattr(folds, 'split'): raise AttributeError("folds should be a generator or iterator of (train_idx, test_idx) tuples " "or scikit-learn splitter object with split method") if hasattr(folds, 'split'): group_info = full_data.get_group() if group_info is not None: group_info = np.array(group_info, dtype=np.int32, copy=False) flatted_group = np.repeat(range(len(group_info)), repeats=group_info) else: flatted_group = np.zeros(num_data, dtype=np.int32) folds = folds.split(X=np.empty(num_data), y=full_data.get_label(), groups=flatted_group) else: if any(params.get(obj_alias, "") in {"lambdarank", "rank_xendcg", "xendcg", "xe_ndcg", "xe_ndcg_mart", "xendcg_mart"} for obj_alias in _ConfigAliases.get("objective")): if not SKLEARN_INSTALLED: raise LightGBMError('scikit-learn is required for ranking cv') # ranking task, split according to groups group_info = np.array(full_data.get_group(), dtype=np.int32, copy=False) flatted_group = np.repeat(range(len(group_info)), repeats=group_info) group_kfold = _LGBMGroupKFold(n_splits=nfold) folds = group_kfold.split(X=np.empty(num_data), groups=flatted_group) elif stratified: if not SKLEARN_INSTALLED: raise LightGBMError('scikit-learn is required for stratified cv') skf = _LGBMStratifiedKFold(n_splits=nfold, shuffle=shuffle, random_state=seed) folds = skf.split(X=np.empty(num_data), y=full_data.get_label()) else: if shuffle: randidx = np.random.RandomState(seed).permutation(num_data) else: randidx = np.arange(num_data) kstep = int(num_data / nfold) test_id = [randidx[i: i + kstep] for i in range(0, num_data, kstep)] train_id = [np.concatenate([test_id[i] for i in range(nfold) if k != i]) for k in range(nfold)] folds = zip(train_id, test_id) ret = CVBooster() for train_idx, test_idx in folds: train_set = full_data.subset(sorted(train_idx)) valid_set = full_data.subset(sorted(test_idx)) # run preprocessing on the data set if needed if fpreproc is not None: train_set, valid_set, tparam = fpreproc(train_set, valid_set, params.copy()) else: tparam = params cvbooster = Booster(tparam, train_set) if eval_train_metric: cvbooster.add_valid(train_set, 'train') cvbooster.add_valid(valid_set, 'valid') ret._append(cvbooster) return ret def _agg_cv_result(raw_results): """Aggregate cross-validation results.""" cvmap = collections.OrderedDict() metric_type = {} for one_result in raw_results: for one_line in one_result: key = f"{one_line[0]} {one_line[1]}" metric_type[key] = one_line[3] cvmap.setdefault(key, []) cvmap[key].append(one_line[2]) return [('cv_agg', k, np.mean(v), metric_type[k], np.std(v)) for k, v in cvmap.items()] def cv(params, train_set, num_boost_round=100, folds=None, nfold=5, stratified=True, shuffle=True, metrics=None, fobj=None, feval=None, init_model=None, feature_name='auto', categorical_feature='auto', fpreproc=None, seed=0, callbacks=None, eval_train_metric=False, return_cvbooster=False): """Perform the cross-validation with given parameters. Parameters ---------- params : dict Parameters for Booster. train_set : Dataset Data to be trained on. num_boost_round : int, optional (default=100) Number of boosting iterations. folds : generator or iterator of (train_idx, test_idx) tuples, scikit-learn splitter object or None, optional (default=None) If generator or iterator, it should yield the train and test indices for each fold. If object, it should be one of the scikit-learn splitter classes (https://scikit-learn.org/stable/modules/classes.html#splitter-classes) and have ``split`` method. This argument has highest priority over other data split arguments. nfold : int, optional (default=5) Number of folds in CV. stratified : bool, optional (default=True) Whether to perform stratified sampling. shuffle : bool, optional (default=True) Whether to shuffle before splitting data. metrics : str, list of str, or None, optional (default=None) Evaluation metrics to be monitored while CV. If not None, the metric in ``params`` will be overridden. fobj : callable or None, optional (default=None) Customized objective function. Should accept two parameters: preds, train_data, and return (grad, hess). preds : numpy 1-D array The predicted values. Predicted values are returned before any transformation, e.g. they are raw margin instead of probability of positive class for binary task. train_data : Dataset The training dataset. grad : list, numpy 1-D array or pandas Series The value of the first order derivative (gradient) of the loss with respect to the elements of preds for each sample point. hess : list, numpy 1-D array or pandas Series The value of the second order derivative (Hessian) of the loss with respect to the elements of preds for each sample point. For multi-class task, the preds is group by class_id first, then group by row_id. If you want to get i-th row preds in j-th class, the access way is score[j * num_data + i] and you should group grad and hess in this way as well. feval : callable, list of callable, or None, optional (default=None) Customized evaluation function. Each evaluation function should accept two parameters: preds, train_data, and return (eval_name, eval_result, is_higher_better) or list of such tuples. preds : numpy 1-D array The predicted values. If ``fobj`` is specified, predicted values are returned before any transformation, e.g. they are raw margin instead of probability of positive class for binary task in this case. train_data : Dataset The training dataset. eval_name : str The name of evaluation function (without whitespace). eval_result : float The eval result. is_higher_better : bool Is eval result higher better, e.g. AUC is ``is_higher_better``. For multi-class task, the preds is group by class_id first, then group by row_id. If you want to get i-th row preds in j-th class, the access way is preds[j * num_data + i]. To ignore the default metric corresponding to the used objective, set ``metrics`` to the string ``"None"``. init_model : str, pathlib.Path, Booster or None, optional (default=None) Filename of LightGBM model or Booster instance used for continue training. feature_name : list of str, or 'auto', optional (default="auto") Feature names. If 'auto' and data is pandas DataFrame, data columns names are used. categorical_feature : list of str or int, or 'auto', optional (default="auto") Categorical features. If list of int, interpreted as indices. If list of str, interpreted as feature names (need to specify ``feature_name`` as well). If 'auto' and data is pandas DataFrame, pandas unordered categorical columns are used. All values in categorical features should be less than int32 max value (2147483647). Large values could be memory consuming. Consider using consecutive integers starting from zero. All negative values in categorical features will be treated as missing values. The output cannot be monotonically constrained with respect to a categorical feature. fpreproc : callable or None, optional (default=None) Preprocessing function that takes (dtrain, dtest, params) and returns transformed versions of those. seed : int, optional (default=0) Seed used to generate the folds (passed to numpy.random.seed). callbacks : list of callable, or None, optional (default=None) List of callback functions that are applied at each iteration. See Callbacks in Python API for more information. eval_train_metric : bool, optional (default=False) Whether to display the train metric in progress. The score of the metric is calculated again after each training step, so there is some impact on performance. return_cvbooster : bool, optional (default=False) Whether to return Booster models trained on each fold through ``CVBooster``. Returns ------- eval_hist : dict Evaluation history. The dictionary has the following format: {'metric1-mean': [values], 'metric1-stdv': [values], 'metric2-mean': [values], 'metric2-stdv': [values], ...}. If ``return_cvbooster=True``, also returns trained boosters via ``cvbooster`` key. """ if not isinstance(train_set, Dataset): raise TypeError("Training only accepts Dataset object") params = copy.deepcopy(params) if fobj is not None: for obj_alias in _ConfigAliases.get("objective"): params.pop(obj_alias, None) params['objective'] = 'none' for alias in _ConfigAliases.get("num_iterations"): if alias in params: _log_warning(f"Found '{alias}' in params. Will use it instead of 'num_boost_round' argument") num_boost_round = params.pop(alias) params["num_iterations"] = num_boost_round # setting early stopping via global params should be possible params = _choose_param_value( main_param_name="early_stopping_round", params=params, default_value=None ) if params["early_stopping_round"] is None: params.pop("early_stopping_round") first_metric_only = params.get('first_metric_only', False) if num_boost_round <= 0: raise ValueError("num_boost_round should be greater than zero.") if isinstance(init_model, (str, Path)): predictor = _InnerPredictor(model_file=init_model, pred_parameter=params) elif isinstance(init_model, Booster): predictor = init_model._to_predictor(dict(init_model.params, **params)) else: predictor = None if metrics is not None: for metric_alias in _ConfigAliases.get("metric"): params.pop(metric_alias, None) params['metric'] = metrics train_set._update_params(params) \ ._set_predictor(predictor) \ .set_feature_name(feature_name) \ .set_categorical_feature(categorical_feature) results = collections.defaultdict(list) cvfolds = _make_n_folds(train_set, folds=folds, nfold=nfold, params=params, seed=seed, fpreproc=fpreproc, stratified=stratified, shuffle=shuffle, eval_train_metric=eval_train_metric) # setup callbacks if callbacks is None: callbacks = set() else: for i, cb in enumerate(callbacks): cb.__dict__.setdefault('order', i - len(callbacks)) callbacks = set(callbacks) if "early_stopping_round" in params: callbacks.add( callback.early_stopping( stopping_rounds=params["early_stopping_round"], first_metric_only=first_metric_only, verbose=_choose_param_value( main_param_name="verbosity", params=params, default_value=1 ).pop("verbosity") > 0 ) ) callbacks_before_iter = {cb for cb in callbacks if getattr(cb, 'before_iteration', False)} callbacks_after_iter = callbacks - callbacks_before_iter callbacks_before_iter = sorted(callbacks_before_iter, key=attrgetter('order')) callbacks_after_iter = sorted(callbacks_after_iter, key=attrgetter('order')) for i in range(num_boost_round): for cb in callbacks_before_iter: cb(callback.CallbackEnv(model=cvfolds, params=params, iteration=i, begin_iteration=0, end_iteration=num_boost_round, evaluation_result_list=None)) cvfolds.update(fobj=fobj) res = _agg_cv_result(cvfolds.eval_valid(feval)) for _, key, mean, _, std in res: results[f'{key}-mean'].append(mean) results[f'{key}-stdv'].append(std) try: for cb in callbacks_after_iter: cb(callback.CallbackEnv(model=cvfolds, params=params, iteration=i, begin_iteration=0, end_iteration=num_boost_round, evaluation_result_list=res)) except callback.EarlyStopException as earlyStopException: cvfolds.best_iteration = earlyStopException.best_iteration + 1 for k in results: results[k] = results[k][:cvfolds.best_iteration] break if return_cvbooster: results['cvbooster'] = cvfolds return dict(results)