Unverified Commit b5502d19 authored by Frank Fineis's avatar Frank Fineis Committed by GitHub
Browse files

[dask] add support for eval sets and custom eval functions (#4101)



* es WiP, need to add eval_sample_weight and eval_group

* add weight, group to dask es. WiP.

* dask es reorg

* Update python-package/lightgbm/dask.py

_train_part model.fit args to lines
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update tests/python_package_test/test_dask.py

_train_part model.fit args to lines, pt2
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update python-package/lightgbm/dask.py

_train_part model.fit args to lines pt3
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update tests/python_package_test/test_dask.py

dask_model.fit args to lines
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update tests/python_package_test/test_dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update python-package/lightgbm/dask.py

use is instead of id()
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update tests/python_package_test/test_dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update tests/python_package_test/test_dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>

* applying changes to eval_set PR WiP

* dask support for eval_names, eval_metric, eval_stopping_rounds

* add evals_result checks and other eval_set attribute-related test checks. need to merge master - WiP

* fix lint errors in test_dask.py

* drop group_shape from _lgbmmodel_doc_fit.format for non-rankers, add support for eval_at for dask ranker

* add eval_at to test_dask eval_set ranker tests

* add back group_shape to lgbmmmodel docs, tighten tests

* drop random eval weights from early stopping, probably causing training to terminate too early

* add eval data templates to sklearn fit docs, add eval data docs to dask

* add n_features to _create_data, eval_set tests stop w/ desirable tree counts

* import alphabetically

* add back get_worker for eval_set error handling

* test_dask argmin typo

* push forgotten eval_names bugfix

* eval_stopping_rounds -> early_stopping_rounds, fix failing non-es test

* change default eval_at to tuple 1-5

* re-drop get_worker

* drop early stopping support from eval_set commits, move eval_set worker check prior to client.submit

* add eval_class_weight and eval_init_score to lightgbm/dask, WiP

* clean up eval_set tests, allow user to specify fewer eval_names, clswghts than eval_sets

* remove redundant backslash

* lint fixes

* fix eval_at, eval_metric duplication, let eval_at be Iterable not just Tuple

* use all data_outputs for test_eval_set tests

* undo newlines from first pr

* add custom_eval_metric test, correct issue with eval_at and metric names

* move _constant_metric outside of test

* dataset reference names instead of __strings__

* add padding to eval_set parts makes each part has same len(eval_set)

* eval set code clean up

* revert n_evals to be max len eval_set across all parts on worker

* pylint errors in _DatasetNames

* more pylint fixes

* pylinting...

* add by pytest.mark, mistakenly deleted during merge conflict resolution

* address code review comments

* add _pad_eval_names to handle nondeterministic evals_result_ valid set names

* change not evaluated evals_result_ test criteria

* address fit eval docs issues, switch _DatasetNames to Enum

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update python-package/lightgbm/dask.py
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* update eval_metrics, eval_at dask fit docstr to match sklearn, make tests reflect that l2 (rmse), logloss in evals_result_ by default

* address eval_set dict keys naming in docstr and training eval_set naming issue

* in test_dask check for obj-default metric names in eval_results, remove check for training key

* lint fixes for _pad_eval_names

* remove unnecessary breaklinen in _pad_eval_names docstr

* use Enum.member syntax not Enum.member.name

* remove str from supported eval_at types

* add whitespace and remove DaskDataframes mention from eval_ param docstrs in _train

* remove "of shape = [n_samples]" from group_shape docs

* add eval_at base_doc in DaskLGBMRanker.fit

* remove excess paren from eval_names docs in _train

* make requested changes to test_dask.py

* remove Optional() wrapper on eval_at

* add _lgbmmodel_doc_custom_eval_note to dask.py fit.__doc__

* fix ordering of .sklearn imports to attempt lint fix

* dask custom eval note to f-string pt1
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* dask custom eval note to f-string pt 2
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* dask custom eval note to f-string pt 3
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>
Co-authored-by: default avatarJames Lamb <jaylamb20@gmail.com>
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>
parent bb39bc99
...@@ -9,7 +9,8 @@ It is based on dask-lightgbm, which was based on dask-xgboost. ...@@ -9,7 +9,8 @@ It is based on dask-lightgbm, which was based on dask-xgboost.
import socket import socket
from collections import defaultdict from collections import defaultdict
from copy import deepcopy from copy import deepcopy
from typing import Any, Callable, Dict, List, Optional, Type, Union from enum import Enum, auto
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union
from urllib.parse import urlparse from urllib.parse import urlparse
import numpy as np import numpy as np
...@@ -18,7 +19,8 @@ import scipy.sparse as ss ...@@ -18,7 +19,8 @@ import scipy.sparse as ss
from .basic import _LIB, LightGBMError, _choose_param_value, _ConfigAliases, _log_info, _log_warning, _safe_call from .basic import _LIB, LightGBMError, _choose_param_value, _ConfigAliases, _log_info, _log_warning, _safe_call
from .compat import (DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED, Client, LGBMNotFittedError, concat, from .compat import (DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED, Client, LGBMNotFittedError, concat,
dask_Array, dask_DataFrame, dask_Series, default_client, delayed, pd_DataFrame, pd_Series, wait) dask_Array, dask_DataFrame, dask_Series, default_client, delayed, pd_DataFrame, pd_Series, wait)
from .sklearn import LGBMClassifier, LGBMModel, LGBMRanker, LGBMRegressor, _lgbmmodel_doc_fit, _lgbmmodel_doc_predict from .sklearn import (LGBMClassifier, LGBMModel, LGBMRanker, LGBMRegressor, _lgbmmodel_doc_custom_eval_note,
_lgbmmodel_doc_fit, _lgbmmodel_doc_predict)
_DaskCollection = Union[dask_Array, dask_DataFrame, dask_Series] _DaskCollection = Union[dask_Array, dask_DataFrame, dask_Series]
_DaskMatrixLike = Union[dask_Array, dask_DataFrame] _DaskMatrixLike = Union[dask_Array, dask_DataFrame]
...@@ -27,6 +29,18 @@ _DaskPart = Union[np.ndarray, pd_DataFrame, pd_Series, ss.spmatrix] ...@@ -27,6 +29,18 @@ _DaskPart = Union[np.ndarray, pd_DataFrame, pd_Series, ss.spmatrix]
_PredictionDtype = Union[Type[np.float32], Type[np.float64], Type[np.int32], Type[np.int64]] _PredictionDtype = Union[Type[np.float32], Type[np.float64], Type[np.int32], Type[np.int64]]
class _DatasetNames(Enum):
"""Placeholder names used by lightgbm.dask internals to say 'also evaluate the training data'.
Avoid duplicating the training data when the validation set refers to elements of training data.
"""
TRAINSET = auto()
SAMPLE_WEIGHT = auto()
INIT_SCORE = auto()
GROUP = auto()
def _get_dask_client(client: Optional[Client]) -> Client: def _get_dask_client(client: Optional[Client]) -> Client:
"""Choose a Dask client to use. """Choose a Dask client to use.
...@@ -71,6 +85,25 @@ def _concat(seq: List[_DaskPart]) -> _DaskPart: ...@@ -71,6 +85,25 @@ def _concat(seq: List[_DaskPart]) -> _DaskPart:
raise TypeError(f'Data must be one of: numpy arrays, pandas dataframes, sparse matrices (from scipy). Got {type(seq[0])}.') raise TypeError(f'Data must be one of: numpy arrays, pandas dataframes, sparse matrices (from scipy). Got {type(seq[0])}.')
def _remove_list_padding(*args: Any) -> List[List[Any]]:
return [[z for z in arg if z is not None] for arg in args]
def _pad_eval_names(lgbm_model: LGBMModel, required_names: Optional[List[str]] = None) -> LGBMModel:
"""Append missing (key, value) pairs to a LightGBM model's evals_result_ and best_score_ OrderedDict attrs based on a set of required eval_set names.
Allows users to rely on expected eval_set names being present when fitting DaskLGBM estimators with ``eval_set``.
"""
not_evaluated = 'not evaluated'
for eval_name in required_names:
if eval_name not in lgbm_model.evals_result_:
lgbm_model.evals_result_[eval_name] = not_evaluated
if eval_name not in lgbm_model.best_score_:
lgbm_model.best_score_[eval_name] = not_evaluated
return lgbm_model
def _train_part( def _train_part(
params: Dict[str, Any], params: Dict[str, Any],
model_factory: Type[LGBMModel], model_factory: Type[LGBMModel],
...@@ -111,16 +144,140 @@ def _train_part( ...@@ -111,16 +144,140 @@ def _train_part(
else: else:
init_score = None init_score = None
# construct local eval_set data.
n_evals = max(len(x.get('eval_set', [])) for x in list_of_parts)
eval_names = kwargs.pop('eval_names', None)
eval_class_weight = kwargs.get('eval_class_weight')
local_eval_set = None
local_eval_names = None
local_eval_sample_weight = None
local_eval_init_score = None
local_eval_group = None
if n_evals:
has_eval_sample_weight = any(x.get('eval_sample_weight') is not None for x in list_of_parts)
has_eval_init_score = any(x.get('eval_init_score') is not None for x in list_of_parts)
local_eval_set = []
evals_result_names = []
if has_eval_sample_weight:
local_eval_sample_weight = []
if has_eval_init_score:
local_eval_init_score = []
if is_ranker:
local_eval_group = []
# store indices of eval_set components that were not contained within local parts.
missing_eval_component_idx = []
# consolidate parts of each individual eval component.
for i in range(n_evals):
x_e = []
y_e = []
w_e = []
init_score_e = []
g_e = []
for part in list_of_parts:
if not part.get('eval_set'):
continue
# require that eval_name exists in evaluated result data in case dropped due to padding.
# in distributed training the 'training' eval_set is not detected, will have name 'valid_<index>'.
if eval_names:
evals_result_name = eval_names[i]
else:
evals_result_name = f'valid_{i}'
eval_set = part['eval_set'][i]
if eval_set is _DatasetNames.TRAINSET:
x_e.append(part['data'])
y_e.append(part['label'])
else:
x_e.extend(eval_set[0])
y_e.extend(eval_set[1])
if evals_result_name not in evals_result_names:
evals_result_names.append(evals_result_name)
eval_weight = part.get('eval_sample_weight')
if eval_weight:
if eval_weight[i] is _DatasetNames.SAMPLE_WEIGHT:
w_e.append(part['weight'])
else:
w_e.extend(eval_weight[i])
eval_init_score = part.get('eval_init_score')
if eval_init_score:
if eval_init_score[i] is _DatasetNames.INIT_SCORE:
init_score_e.append(part['init_score'])
else:
init_score_e.extend(eval_init_score[i])
eval_group = part.get('eval_group')
if eval_group:
if eval_group[i] is _DatasetNames.GROUP:
g_e.append(part['group'])
else:
g_e.extend(eval_group[i])
# filter padding from eval parts then _concat each eval_set component.
x_e, y_e, w_e, init_score_e, g_e = _remove_list_padding(x_e, y_e, w_e, init_score_e, g_e)
if x_e:
local_eval_set.append((_concat(x_e), _concat(y_e)))
else:
missing_eval_component_idx.append(i)
continue
if w_e:
local_eval_sample_weight.append(_concat(w_e))
if init_score_e:
local_eval_init_score.append(_concat(init_score_e))
if g_e:
local_eval_group.append(_concat(g_e))
# reconstruct eval_set fit args/kwargs depending on which components of eval_set are on worker.
eval_component_idx = [i for i in range(n_evals) if i not in missing_eval_component_idx]
if eval_names:
local_eval_names = [eval_names[i] for i in eval_component_idx]
if eval_class_weight:
kwargs['eval_class_weight'] = [eval_class_weight[i] for i in eval_component_idx]
try: try:
model = model_factory(**params) model = model_factory(**params)
if is_ranker: if is_ranker:
model.fit(data, label, sample_weight=weight, init_score=init_score, group=group, **kwargs) model.fit(
data,
label,
sample_weight=weight,
init_score=init_score,
group=group,
eval_set=local_eval_set,
eval_sample_weight=local_eval_sample_weight,
eval_init_score=local_eval_init_score,
eval_group=local_eval_group,
eval_names=local_eval_names,
**kwargs
)
else: else:
model.fit(data, label, sample_weight=weight, init_score=init_score, **kwargs) model.fit(
data,
label,
sample_weight=weight,
init_score=init_score,
eval_set=local_eval_set,
eval_sample_weight=local_eval_sample_weight,
eval_init_score=local_eval_init_score,
eval_names=local_eval_names,
**kwargs
)
finally: finally:
_safe_call(_LIB.LGBM_NetworkFree()) _safe_call(_LIB.LGBM_NetworkFree())
if n_evals:
# ensure that expected keys for evals_result_ and best_score_ exist regardless of padding.
model = _pad_eval_names(model, required_names=evals_result_names)
return model if return_model else None return model if return_model else None
...@@ -218,6 +375,14 @@ def _train( ...@@ -218,6 +375,14 @@ def _train(
sample_weight: Optional[_DaskVectorLike] = None, sample_weight: Optional[_DaskVectorLike] = None,
init_score: Optional[_DaskVectorLike] = None, init_score: Optional[_DaskVectorLike] = None,
group: Optional[_DaskVectorLike] = None, group: Optional[_DaskVectorLike] = None,
eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
eval_names: Optional[List[str]] = None,
eval_sample_weight: Optional[List[_DaskCollection]] = None,
eval_class_weight: Optional[List[Union[dict, str]]] = None,
eval_init_score: Optional[List[_DaskCollection]] = None,
eval_group: Optional[List[_DaskCollection]] = None,
eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
eval_at: Optional[Iterable[int]] = None,
**kwargs: Any **kwargs: Any
) -> LGBMModel: ) -> LGBMModel:
"""Inner train routine. """Inner train routine.
...@@ -244,6 +409,29 @@ def _train( ...@@ -244,6 +409,29 @@ def _train(
sum(group) = n_samples. sum(group) = n_samples.
For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups, For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups,
where the first 10 records are in the first group, records 11-30 are in the second group, records 31-70 are in the third group, etc. where the first 10 records are in the first group, records 11-30 are in the second group, records 31-70 are in the third group, etc.
eval_set : list of (X, y) tuples of Dask data collections or None, optional (default=None)
List of (X, y) tuple pairs to use as validation sets.
Note, that not all workers may receive chunks of every eval set within ``eval_set``. When the returned
lightgbm estimator is not trained using any chunks of a particular eval set, its corresponding component
of evals_result_ and best_score_ will be 'not_evaluated'.
eval_names : list of strings or None, optional (default=None)
Names of eval_set.
eval_sample_weight : list of Dask Arrays, Dask Series or None, optional (default=None)
Weights for each validation set in eval_set.
eval_class_weight : list of dict or str, or None, optional (default=None)
Class weights, one dict or str for each validation set in eval_set.
eval_init_score : list of Dask Arrays, Dask Series or None, optional (default=None)
Initial model score for each validation set in eval_set.
eval_group : list of Dask Arrays, Dask Series or None, optional (default=None)
Group/query for each validation set in eval_set.
eval_metric : string, callable, list or None, optional (default=None)
If string, it should be a built-in evaluation metric to use.
If callable, it should be a custom evaluation metric, see note below for more details.
If list, it can be a list of built-in metrics, a list of custom evaluation metrics, or a mix of both.
In either case, the ``metric`` from the Dask model parameters (or inferred from the objective) will be evaluated and used as well.
Default: 'l2' for DaskLGBMRegressor, 'binary(multi)_logloss' for DaskLGBMClassifier, 'ndcg' for DaskLGBMRanker.
eval_at : iterable of int, optional (default=None)
The evaluation positions of the specified ranking metric.
**kwargs **kwargs
Other parameters passed to ``fit`` method of the local underlying model. Other parameters passed to ``fit`` method of the local underlying model.
...@@ -339,6 +527,116 @@ def _train( ...@@ -339,6 +527,116 @@ def _train(
for i in range(n_parts): for i in range(n_parts):
parts[i]['init_score'] = init_score_parts[i] parts[i]['init_score'] = init_score_parts[i]
# evals_set will to be re-constructed into smaller lists of (X, y) tuples, where
# X and y are each delayed sub-lists of original eval dask Collections.
if eval_set:
# find maximum number of parts in an individual eval set so that we can
# pad eval sets when they come in different sizes.
n_largest_eval_parts = max(x[0].npartitions for x in eval_set)
eval_sets = defaultdict(list)
if eval_sample_weight:
eval_sample_weights = defaultdict(list)
if eval_group:
eval_groups = defaultdict(list)
if eval_init_score:
eval_init_scores = defaultdict(list)
for i, (X_eval, y_eval) in enumerate(eval_set):
n_this_eval_parts = X_eval.npartitions
# when individual eval set is equivalent to training data, skip recomputing parts.
if X_eval is data and y_eval is label:
for parts_idx in range(n_parts):
eval_sets[parts_idx].append(_DatasetNames.TRAINSET)
else:
eval_x_parts = _split_to_parts(data=X_eval, is_matrix=True)
eval_y_parts = _split_to_parts(data=y_eval, is_matrix=False)
for j in range(n_largest_eval_parts):
parts_idx = j % n_parts
# add None-padding for individual eval_set member if it is smaller than the largest member.
if j < n_this_eval_parts:
x_e = eval_x_parts[j]
y_e = eval_y_parts[j]
else:
x_e = None
y_e = None
if j < n_parts:
# first time a chunk of this eval set is added to this part.
eval_sets[parts_idx].append(([x_e], [y_e]))
else:
# append additional chunks of this eval set to this part.
eval_sets[parts_idx][-1][0].append(x_e)
eval_sets[parts_idx][-1][1].append(y_e)
if eval_sample_weight:
if eval_sample_weight[i] is sample_weight:
for parts_idx in range(n_parts):
eval_sample_weights[parts_idx].append(_DatasetNames.SAMPLE_WEIGHT)
else:
eval_w_parts = _split_to_parts(data=eval_sample_weight[i], is_matrix=False)
# ensure that all evaluation parts map uniquely to one part.
for j in range(n_largest_eval_parts):
if j < n_this_eval_parts:
w_e = eval_w_parts[j]
else:
w_e = None
parts_idx = j % n_parts
if j < n_parts:
eval_sample_weights[parts_idx].append([w_e])
else:
eval_sample_weights[parts_idx][-1].append(w_e)
if eval_init_score:
if eval_init_score[i] is init_score:
for parts_idx in range(n_parts):
eval_init_scores[parts_idx].append(_DatasetNames.INIT_SCORE)
else:
eval_init_score_parts = _split_to_parts(data=eval_init_score[i], is_matrix=False)
for j in range(n_largest_eval_parts):
if j < n_this_eval_parts:
init_score_e = eval_init_score_parts[j]
else:
init_score_e = None
parts_idx = j % n_parts
if j < n_parts:
eval_init_scores[parts_idx].append([init_score_e])
else:
eval_init_scores[parts_idx][-1].append(init_score_e)
if eval_group:
if eval_group[i] is group:
for parts_idx in range(n_parts):
eval_groups[parts_idx].append(_DatasetNames.GROUP)
else:
eval_g_parts = _split_to_parts(data=eval_group[i], is_matrix=False)
for j in range(n_largest_eval_parts):
if j < n_this_eval_parts:
g_e = eval_g_parts[j]
else:
g_e = None
parts_idx = j % n_parts
if j < n_parts:
eval_groups[parts_idx].append([g_e])
else:
eval_groups[parts_idx][-1].append(g_e)
# assign sub-eval_set components to worker parts.
for parts_idx, e_set in eval_sets.items():
parts[parts_idx]['eval_set'] = e_set
if eval_sample_weight:
parts[parts_idx]['eval_sample_weight'] = eval_sample_weights[parts_idx]
if eval_init_score:
parts[parts_idx]['eval_init_score'] = eval_init_scores[parts_idx]
if eval_group:
parts[parts_idx]['eval_group'] = eval_groups[parts_idx]
# Start computation in the background # Start computation in the background
parts = list(map(delayed, parts)) parts = list(map(delayed, parts))
parts = client.compute(parts) parts = client.compute(parts)
...@@ -355,6 +653,32 @@ def _train( ...@@ -355,6 +653,32 @@ def _train(
for key, workers in who_has.items(): for key, workers in who_has.items():
worker_map[next(iter(workers))].append(key_to_part_dict[key]) worker_map[next(iter(workers))].append(key_to_part_dict[key])
# Check that all workers were provided some of eval_set. Otherwise warn user that validation
# data artifacts may not be populated depending on worker returning final estimator.
if eval_set:
for worker in worker_map:
has_eval_set = False
for part in worker_map[worker]:
if 'eval_set' in part.result():
has_eval_set = True
break
if not has_eval_set:
_log_warning(
f"Worker {worker} was not allocated eval_set data. Therefore evals_result_ and best_score_ data may be unreliable. "
"Try rebalancing data across workers."
)
# assign general validation set settings to fit kwargs.
if eval_names:
kwargs['eval_names'] = eval_names
if eval_class_weight:
kwargs['eval_class_weight'] = eval_class_weight
if eval_metric:
kwargs['eval_metric'] = eval_metric
if eval_at:
kwargs['eval_at'] = eval_at
master_worker = next(iter(worker_map)) master_worker = next(iter(worker_map))
worker_ncores = client.ncores() worker_ncores = client.ncores()
...@@ -608,11 +932,23 @@ class _DaskLGBMModel: ...@@ -608,11 +932,23 @@ class _DaskLGBMModel:
sample_weight: Optional[_DaskVectorLike] = None, sample_weight: Optional[_DaskVectorLike] = None,
init_score: Optional[_DaskVectorLike] = None, init_score: Optional[_DaskVectorLike] = None,
group: Optional[_DaskVectorLike] = None, group: Optional[_DaskVectorLike] = None,
eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
eval_names: Optional[List[str]] = None,
eval_sample_weight: Optional[List[_DaskCollection]] = None,
eval_class_weight: Optional[List[Union[dict, str]]] = None,
eval_init_score: Optional[List[_DaskCollection]] = None,
eval_group: Optional[List[_DaskCollection]] = None,
eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
eval_at: Optional[Iterable[int]] = None,
early_stopping_rounds: Optional[int] = None,
**kwargs: Any **kwargs: Any
) -> "_DaskLGBMModel": ) -> "_DaskLGBMModel":
if not all((DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED)): if not all((DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED)):
raise LightGBMError('dask, pandas and scikit-learn are required for lightgbm.dask') raise LightGBMError('dask, pandas and scikit-learn are required for lightgbm.dask')
if early_stopping_rounds is not None:
raise RuntimeError('early_stopping_rounds is not currently supported in lightgbm.dask')
params = self.get_params(True) params = self.get_params(True)
params.pop("client", None) params.pop("client", None)
...@@ -625,6 +961,14 @@ class _DaskLGBMModel: ...@@ -625,6 +961,14 @@ class _DaskLGBMModel:
sample_weight=sample_weight, sample_weight=sample_weight,
init_score=init_score, init_score=init_score,
group=group, group=group,
eval_set=eval_set,
eval_names=eval_names,
eval_sample_weight=eval_sample_weight,
eval_class_weight=eval_class_weight,
eval_init_score=eval_init_score,
eval_group=eval_group,
eval_metric=eval_metric,
eval_at=eval_at,
**kwargs **kwargs
) )
...@@ -725,15 +1069,31 @@ class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel): ...@@ -725,15 +1069,31 @@ class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel):
y: _DaskCollection, y: _DaskCollection,
sample_weight: Optional[_DaskVectorLike] = None, sample_weight: Optional[_DaskVectorLike] = None,
init_score: Optional[_DaskVectorLike] = None, init_score: Optional[_DaskVectorLike] = None,
eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
eval_names: Optional[List[str]] = None,
eval_sample_weight: Optional[List[_DaskCollection]] = None,
eval_class_weight: Optional[List[Union[dict, str]]] = None,
eval_init_score: Optional[List[_DaskCollection]] = None,
eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
early_stopping_rounds: Optional[int] = None,
**kwargs: Any **kwargs: Any
) -> "DaskLGBMClassifier": ) -> "DaskLGBMClassifier":
"""Docstring is inherited from the lightgbm.LGBMClassifier.fit.""" """Docstring is inherited from the lightgbm.LGBMClassifier.fit."""
if early_stopping_rounds is not None:
raise RuntimeError('early_stopping_rounds is not currently supported in lightgbm.dask')
return self._lgb_dask_fit( return self._lgb_dask_fit(
model_factory=LGBMClassifier, model_factory=LGBMClassifier,
X=X, X=X,
y=y, y=y,
sample_weight=sample_weight, sample_weight=sample_weight,
init_score=init_score, init_score=init_score,
eval_set=eval_set,
eval_names=eval_names,
eval_sample_weight=eval_sample_weight,
eval_class_weight=eval_class_weight,
eval_init_score=eval_init_score,
eval_metric=eval_metric,
**kwargs **kwargs
) )
...@@ -742,16 +1102,27 @@ class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel): ...@@ -742,16 +1102,27 @@ class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel):
y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]", y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]",
sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)", sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
init_score_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)", init_score_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
group_shape="Dask Array or Dask Series or None, optional (default=None)" group_shape="Dask Array or Dask Series or None, optional (default=None)",
eval_sample_weight_shape="list of Dask Arrays or Dask Series or None, optional (default=None)",
eval_init_score_shape="list of Dask Arrays or Dask Series or None, optional (default=None)",
eval_group_shape="list of Dask Arrays or Dask Series or None, optional (default=None)"
) )
# DaskLGBMClassifier does not support evaluation data, or early stopping # DaskLGBMClassifier does not support group, eval_group, early_stopping_rounds.
_base_doc = (_base_doc[:_base_doc.find('group :')] _base_doc = (_base_doc[:_base_doc.find('group :')]
+ _base_doc[_base_doc.find('eval_set :'):])
_base_doc = (_base_doc[:_base_doc.find('eval_group :')]
+ _base_doc[_base_doc.find('eval_metric :'):])
_base_doc = (_base_doc[:_base_doc.find('early_stopping_rounds :')]
+ _base_doc[_base_doc.find('verbose :'):]) + _base_doc[_base_doc.find('verbose :'):])
# DaskLGBMClassifier support for callbacks and init_model is not tested # DaskLGBMClassifier support for callbacks and init_model is not tested
fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs
Other parameters passed through to ``LGBMClassifier.fit()``. Other parameters passed through to ``LGBMClassifier.fit()``.
{_lgbmmodel_doc_custom_eval_note}
""" """
def predict(self, X: _DaskMatrixLike, **kwargs: Any) -> dask_Array: def predict(self, X: _DaskMatrixLike, **kwargs: Any) -> dask_Array:
...@@ -875,15 +1246,29 @@ class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel): ...@@ -875,15 +1246,29 @@ class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel):
y: _DaskCollection, y: _DaskCollection,
sample_weight: Optional[_DaskVectorLike] = None, sample_weight: Optional[_DaskVectorLike] = None,
init_score: Optional[_DaskVectorLike] = None, init_score: Optional[_DaskVectorLike] = None,
eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
eval_names: Optional[List[str]] = None,
eval_sample_weight: Optional[List[_DaskCollection]] = None,
eval_init_score: Optional[List[_DaskCollection]] = None,
eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
early_stopping_rounds: Optional[int] = None,
**kwargs: Any **kwargs: Any
) -> "DaskLGBMRegressor": ) -> "DaskLGBMRegressor":
"""Docstring is inherited from the lightgbm.LGBMRegressor.fit.""" """Docstring is inherited from the lightgbm.LGBMRegressor.fit."""
if early_stopping_rounds is not None:
raise RuntimeError('early_stopping_rounds is not currently supported in lightgbm.dask')
return self._lgb_dask_fit( return self._lgb_dask_fit(
model_factory=LGBMRegressor, model_factory=LGBMRegressor,
X=X, X=X,
y=y, y=y,
sample_weight=sample_weight, sample_weight=sample_weight,
init_score=init_score, init_score=init_score,
eval_set=eval_set,
eval_names=eval_names,
eval_sample_weight=eval_sample_weight,
eval_init_score=eval_init_score,
eval_metric=eval_metric,
**kwargs **kwargs
) )
...@@ -892,16 +1277,30 @@ class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel): ...@@ -892,16 +1277,30 @@ class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel):
y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]", y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]",
sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)", sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
init_score_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)", init_score_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
group_shape="Dask Array or Dask Series or None, optional (default=None)" group_shape="Dask Array or Dask Series or None, optional (default=None)",
eval_sample_weight_shape="list of Dask Arrays or Dask Series or None, optional (default=None)",
eval_init_score_shape="list of Dask Arrays or Dask Series or None, optional (default=None)",
eval_group_shape="list of Dask Arrays or Dask Series or None, optional (default=None)"
) )
# DaskLGBMRegressor does not support evaluation data, or early stopping # DaskLGBMRegressor does not support group, eval_class_weight, eval_group, early_stopping_rounds.
_base_doc = (_base_doc[:_base_doc.find('group :')] _base_doc = (_base_doc[:_base_doc.find('group :')]
+ _base_doc[_base_doc.find('eval_set :'):])
_base_doc = (_base_doc[:_base_doc.find('eval_class_weight :')]
+ _base_doc[_base_doc.find('eval_init_score :'):])
_base_doc = (_base_doc[:_base_doc.find('eval_group :')]
+ _base_doc[_base_doc.find('eval_metric :'):])
_base_doc = (_base_doc[:_base_doc.find('early_stopping_rounds :')]
+ _base_doc[_base_doc.find('verbose :'):]) + _base_doc[_base_doc.find('verbose :'):])
# DaskLGBMRegressor support for callbacks and init_model is not tested # DaskLGBMRegressor support for callbacks and init_model is not tested
fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs
Other parameters passed through to ``LGBMRegressor.fit()``. Other parameters passed through to ``LGBMRegressor.fit()``.
{_lgbmmodel_doc_custom_eval_note}
""" """
def predict(self, X: _DaskMatrixLike, **kwargs) -> dask_Array: def predict(self, X: _DaskMatrixLike, **kwargs) -> dask_Array:
...@@ -1008,9 +1407,20 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel): ...@@ -1008,9 +1407,20 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel):
sample_weight: Optional[_DaskVectorLike] = None, sample_weight: Optional[_DaskVectorLike] = None,
init_score: Optional[_DaskVectorLike] = None, init_score: Optional[_DaskVectorLike] = None,
group: Optional[_DaskVectorLike] = None, group: Optional[_DaskVectorLike] = None,
eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
eval_names: Optional[List[str]] = None,
eval_sample_weight: Optional[List[_DaskCollection]] = None,
eval_init_score: Optional[List[_DaskCollection]] = None,
eval_group: Optional[List[_DaskCollection]] = None,
eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
eval_at: Iterable[int] = (1, 2, 3, 4, 5),
early_stopping_rounds: Optional[int] = None,
**kwargs: Any **kwargs: Any
) -> "DaskLGBMRanker": ) -> "DaskLGBMRanker":
"""Docstring is inherited from the lightgbm.LGBMRanker.fit.""" """Docstring is inherited from the lightgbm.LGBMRanker.fit."""
if early_stopping_rounds is not None:
raise RuntimeError('early_stopping_rounds is not currently supported in lightgbm.dask')
return self._lgb_dask_fit( return self._lgb_dask_fit(
model_factory=LGBMRanker, model_factory=LGBMRanker,
X=X, X=X,
...@@ -1018,6 +1428,13 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel): ...@@ -1018,6 +1428,13 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel):
sample_weight=sample_weight, sample_weight=sample_weight,
init_score=init_score, init_score=init_score,
group=group, group=group,
eval_set=eval_set,
eval_names=eval_names,
eval_sample_weight=eval_sample_weight,
eval_init_score=eval_init_score,
eval_group=eval_group,
eval_metric=eval_metric,
eval_at=eval_at,
**kwargs **kwargs
) )
...@@ -1026,16 +1443,26 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel): ...@@ -1026,16 +1443,26 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel):
y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]", y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]",
sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)", sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
init_score_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)", init_score_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
group_shape="Dask Array or Dask Series or None, optional (default=None)" group_shape="Dask Array or Dask Series or None, optional (default=None)",
eval_sample_weight_shape="list of Dask Arrays or Dask Series or None, optional (default=None)",
eval_init_score_shape="list of Dask Arrays or Dask Series or None, optional (default=None)",
eval_group_shape="list of Dask Arrays or Dask Series or None, optional (default=None)"
) )
# DaskLGBMRanker does not support evaluation data, or early stopping # DaskLGBMRanker does not support eval_class_weight or early stopping
_base_doc = (_base_doc[:_base_doc.find('eval_set :')] _base_doc = (_base_doc[:_base_doc.find('eval_class_weight :')]
+ _base_doc[_base_doc.find('verbose :'):]) + _base_doc[_base_doc.find('eval_init_score :'):])
_base_doc = (_base_doc[:_base_doc.find('early_stopping_rounds :')]
+ "eval_at : iterable of int, optional (default=(1, 2, 3, 4, 5))\n"
+ f"{' ':8}The evaluation positions of the specified metric.\n"
+ f"{' ':4}{_base_doc[_base_doc.find('verbose :'):]}")
# DaskLGBMRanker support for callbacks and init_model is not tested # DaskLGBMRanker support for callbacks and init_model is not tested
fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs
Other parameters passed through to ``LGBMRanker.fit()``. Other parameters passed through to ``LGBMRanker.fit()``.
{_lgbmmodel_doc_custom_eval_note}
""" """
def predict(self, X: _DaskMatrixLike, **kwargs: Any) -> dask_Array: def predict(self, X: _DaskMatrixLike, **kwargs: Any) -> dask_Array:
......
...@@ -207,13 +207,13 @@ _lgbmmodel_doc_fit = ( ...@@ -207,13 +207,13 @@ _lgbmmodel_doc_fit = (
A list of (X, y) tuple pairs to use as validation sets. A list of (X, y) tuple pairs to use as validation sets.
eval_names : list of strings or None, optional (default=None) eval_names : list of strings or None, optional (default=None)
Names of eval_set. Names of eval_set.
eval_sample_weight : list of arrays or None, optional (default=None) eval_sample_weight : {eval_sample_weight_shape}
Weights of eval data. Weights of eval data.
eval_class_weight : list or None, optional (default=None) eval_class_weight : list or None, optional (default=None)
Class weights of eval data. Class weights of eval data.
eval_init_score : list of arrays or None, optional (default=None) eval_init_score : {eval_init_score_shape}
Init score of eval data. Init score of eval data.
eval_group : list of arrays or None, optional (default=None) eval_group : {eval_group_shape}
Group data of eval data. Group data of eval data.
eval_metric : string, callable, list or None, optional (default=None) eval_metric : string, callable, list or None, optional (default=None)
If string, it should be a built-in evaluation metric to use. If string, it should be a built-in evaluation metric to use.
...@@ -718,7 +718,10 @@ class LGBMModel(_LGBMModelBase): ...@@ -718,7 +718,10 @@ class LGBMModel(_LGBMModelBase):
y_shape="array-like of shape = [n_samples]", y_shape="array-like of shape = [n_samples]",
sample_weight_shape="array-like of shape = [n_samples] or None, optional (default=None)", sample_weight_shape="array-like of shape = [n_samples] or None, optional (default=None)",
init_score_shape="array-like of shape = [n_samples] or None, optional (default=None)", init_score_shape="array-like of shape = [n_samples] or None, optional (default=None)",
group_shape="array-like or None, optional (default=None)" group_shape="array-like or None, optional (default=None)",
eval_sample_weight_shape="list of arrays or None, optional (default=None)",
eval_init_score_shape="list of arrays or None, optional (default=None)",
eval_group_shape="list of arrays or None, optional (default=None)"
) + "\n\n" + _lgbmmodel_doc_custom_eval_note ) + "\n\n" + _lgbmmodel_doc_custom_eval_note
def predict(self, X, raw_score=False, start_iteration=0, num_iteration=None, def predict(self, X, raw_score=False, start_iteration=0, num_iteration=None,
......
...@@ -214,6 +214,13 @@ def _accuracy_score(dy_true, dy_pred): ...@@ -214,6 +214,13 @@ def _accuracy_score(dy_true, dy_pred):
return da.average(dy_true == dy_pred).compute() return da.average(dy_true == dy_pred).compute()
def _constant_metric(dy_true, dy_pred):
metric_name = 'constant_metric'
value = 0.708
is_higher_better = False
return metric_name, value, is_higher_better
def _pickle(obj, filepath, serializer): def _pickle(obj, filepath, serializer):
if serializer == 'pickle': if serializer == 'pickle':
with open(filepath, 'wb') as f: with open(filepath, 'wb') as f:
...@@ -745,6 +752,231 @@ def test_ranker(output, group, boosting_type, tree_learner, cluster): ...@@ -745,6 +752,231 @@ def test_ranker(output, group, boosting_type, tree_learner, cluster):
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='
@pytest.mark.parametrize('task', tasks)
@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('eval_sizes', [[0.5, 1, 1.5], [0]])
@pytest.mark.parametrize('eval_names_prefix', ['specified', None])
def test_eval_set_no_early_stopping(task, output, eval_sizes, eval_names_prefix, cluster):
if task == 'ranking' and output == 'scipy_csr_matrix':
pytest.skip('LGBMRanker is not currently tested on sparse matrices')
with Client(cluster) as client:
# Use larger trainset to prevent premature stopping due to zero loss, causing num_trees() < n_estimators.
# Use small chunk_size to avoid single-worker allocation of eval data partitions.
n_samples = 1000
chunk_size = 10
n_eval_sets = len(eval_sizes)
eval_set = []
eval_sample_weight = []
eval_class_weight = None
eval_init_score = None
if eval_names_prefix:
eval_names = [f'{eval_names_prefix}_{i}' for i in range(len(eval_sizes))]
else:
eval_names = None
X, y, w, g, dX, dy, dw, dg = _create_data(
objective=task,
n_samples=n_samples,
output=output,
chunk_size=chunk_size
)
if task == 'ranking':
eval_metrics = ['ndcg']
eval_at = (5, 6)
eval_metric_names = [f'ndcg@{k}' for k in eval_at]
eval_group = []
else:
# test eval_class_weight, eval_init_score on binary-classification task.
# Note: objective's default `metric` will be evaluated in evals_result_ in addition to all eval_metrics.
if task == 'binary-classification':
eval_metrics = ['binary_error', 'auc']
eval_metric_names = ['binary_logloss', 'binary_error', 'auc']
eval_class_weight = []
eval_init_score = []
elif task == 'multiclass-classification':
eval_metrics = ['multi_error']
eval_metric_names = ['multi_logloss', 'multi_error']
elif task == 'regression':
eval_metrics = ['l1']
eval_metric_names = ['l2', 'l1']
# create eval_sets by creating new datasets or copying training data.
for eval_size in eval_sizes:
if eval_size == 1:
y_e = y
dX_e = dX
dy_e = dy
dw_e = dw
dg_e = dg
else:
n_eval_samples = max(chunk_size, int(n_samples * eval_size))
_, y_e, _, _, dX_e, dy_e, dw_e, dg_e = _create_data(
objective=task,
n_samples=n_eval_samples,
output=output,
chunk_size=chunk_size
)
eval_set.append((dX_e, dy_e))
eval_sample_weight.append(dw_e)
if task == 'ranking':
eval_group.append(dg_e)
if task == 'binary-classification':
n_neg = np.sum(y_e == 0)
n_pos = np.sum(y_e == 1)
eval_class_weight.append({0: n_neg / n_pos, 1: n_pos / n_neg})
init_score_value = np.log(np.mean(y_e) / (1 - np.mean(y_e)))
if 'dataframe' in output:
d_init_score = dy_e.map_partitions(lambda x: pd.Series([init_score_value] * x.size))
else:
d_init_score = dy_e.map_blocks(lambda x: np.repeat(init_score_value, x.size))
eval_init_score.append(d_init_score)
fit_trees = 50
params = {
"random_state": 42,
"n_estimators": fit_trees,
"num_leaves": 2
}
model_factory = task_to_dask_factory[task]
dask_model = model_factory(
client=client,
**params
)
fit_params = {
'X': dX,
'y': dy,
'eval_set': eval_set,
'eval_names': eval_names,
'eval_sample_weight': eval_sample_weight,
'eval_init_score': eval_init_score,
'eval_metric': eval_metrics,
'verbose': True
}
if task == 'ranking':
fit_params.update(
{'group': dg,
'eval_group': eval_group,
'eval_at': eval_at}
)
elif task == 'binary-classification':
fit_params.update({'eval_class_weight': eval_class_weight})
if eval_sizes == [0]:
with pytest.warns(UserWarning, match='Worker (.*) was not allocated eval_set data. Therefore evals_result_ and best_score_ data may be unreliable.'):
dask_model.fit(**fit_params)
else:
dask_model = dask_model.fit(**fit_params)
# total number of trees scales up for ova classifier.
if task == 'multiclass-classification':
model_trees = fit_trees * dask_model.n_classes_
else:
model_trees = fit_trees
# check that early stopping was not applied.
assert dask_model.booster_.num_trees() == model_trees
assert dask_model.best_iteration_ is None
# checks that evals_result_ and best_score_ contain expected data and eval_set names.
evals_result = dask_model.evals_result_
best_scores = dask_model.best_score_
assert len(evals_result) == n_eval_sets
assert len(best_scores) == n_eval_sets
for eval_name in evals_result:
assert eval_name in dask_model.best_score_
if eval_names:
assert eval_name in eval_names
# check that each eval_name and metric exists for all eval sets, allowing for the
# case when a worker receives a fully-padded eval_set component which is not evaluated.
if evals_result[eval_name] != 'not evaluated':
for metric in eval_metric_names:
assert metric in evals_result[eval_name]
assert metric in best_scores[eval_name]
assert len(evals_result[eval_name][metric]) == fit_trees
@pytest.mark.parametrize('task', ['binary-classification', 'regression', 'ranking'])
def test_eval_set_with_custom_eval_metric(task, cluster):
with Client(cluster) as client:
n_samples = 1000
n_eval_samples = int(n_samples * 0.5)
chunk_size = 10
output = 'array'
X, y, w, g, dX, dy, dw, dg = _create_data(
objective=task,
n_samples=n_samples,
output=output,
chunk_size=chunk_size
)
_, _, _, _, dX_e, dy_e, _, dg_e = _create_data(
objective=task,
n_samples=n_eval_samples,
output=output,
chunk_size=chunk_size
)
if task == 'ranking':
eval_at = (5, 6)
eval_metrics = ['ndcg', _constant_metric]
eval_metric_names = [f'ndcg@{k}' for k in eval_at] + ['constant_metric']
elif task == 'binary-classification':
eval_metrics = ['binary_error', 'auc', _constant_metric]
eval_metric_names = ['binary_logloss', 'binary_error', 'auc', 'constant_metric']
else:
eval_metrics = ['l1', _constant_metric]
eval_metric_names = ['l2', 'l1', 'constant_metric']
fit_trees = 50
params = {
"random_state": 42,
"n_estimators": fit_trees,
"num_leaves": 2
}
model_factory = task_to_dask_factory[task]
dask_model = model_factory(
client=client,
**params
)
eval_set = [(dX_e, dy_e)]
fit_params = {
'X': dX,
'y': dy,
'eval_set': eval_set,
'eval_metric': eval_metrics
}
if task == 'ranking':
fit_params.update(
{'group': dg,
'eval_group': [dg_e],
'eval_at': eval_at}
)
dask_model = dask_model.fit(**fit_params)
eval_name = 'valid_0'
evals_result = dask_model.evals_result_
assert len(evals_result) == 1
assert eval_name in evals_result
for metric in eval_metric_names:
assert metric in evals_result[eval_name]
assert len(evals_result[eval_name][metric]) == fit_trees
np.testing.assert_allclose(evals_result[eval_name]['constant_metric'], 0.708)
@pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('task', tasks)
def test_training_works_if_client_not_provided_or_set_after_construction(task, cluster): def test_training_works_if_client_not_provided_or_set_after_construction(task, cluster):
with Client(cluster) as client: with Client(cluster) as client:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment