Unverified Commit 5312b955 authored by Nikita Titov's avatar Nikita Titov Committed by GitHub
Browse files

[dask] fix Dask docstrings and mimic sklearn wrapper importing way (#3855)

* fix Dask docstrings and mimic sklearn importing way

* Update .vsts-ci.yml

* revert CI checks

* use import aliases for Dask classes

* check Dask is installed in _predict() func

* fix lint issues introduced during resolving merge conflicts

* Update dask.py
parent 56b99d4c
...@@ -14,7 +14,7 @@ from typing import Any, Dict ...@@ -14,7 +14,7 @@ from typing import Any, Dict
import numpy as np import numpy as np
import scipy.sparse import scipy.sparse
from .compat import PANDAS_INSTALLED, DataFrame, Series, is_dtype_sparse, DataTable from .compat import PANDAS_INSTALLED, DataFrame, Series, concat, is_dtype_sparse, DataTable
from .libpath import find_lib_path from .libpath import find_lib_path
...@@ -2081,7 +2081,6 @@ class Dataset: ...@@ -2081,7 +2081,6 @@ class Dataset:
if not PANDAS_INSTALLED: if not PANDAS_INSTALLED:
raise LightGBMError("Cannot add features to DataFrame type of raw data " raise LightGBMError("Cannot add features to DataFrame type of raw data "
"without pandas installed") "without pandas installed")
from pandas import concat
if isinstance(other.data, np.ndarray): if isinstance(other.data, np.ndarray):
self.data = concat((self.data, DataFrame(other.data)), self.data = concat((self.data, DataFrame(other.data)),
axis=1, ignore_index=True) axis=1, ignore_index=True)
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
"""pandas""" """pandas"""
try: try:
from pandas import Series, DataFrame from pandas import Series, DataFrame, concat
from pandas.api.types import is_sparse as is_dtype_sparse from pandas.api.types import is_sparse as is_dtype_sparse
PANDAS_INSTALLED = True PANDAS_INSTALLED = True
except ImportError: except ImportError:
...@@ -19,6 +19,7 @@ except ImportError: ...@@ -19,6 +19,7 @@ except ImportError:
pass pass
concat = None
is_dtype_sparse = None is_dtype_sparse = None
"""matplotlib""" """matplotlib"""
...@@ -108,9 +109,25 @@ except ImportError: ...@@ -108,9 +109,25 @@ except ImportError:
"""dask""" """dask"""
try: try:
from dask import array from dask import delayed
from dask import dataframe from dask.array import Array as dask_Array
from dask.distributed import Client from dask.dataframe import _Frame as dask_Frame
from dask.distributed import Client, default_client, get_worker, wait
DASK_INSTALLED = True DASK_INSTALLED = True
except ImportError: except ImportError:
DASK_INSTALLED = False DASK_INSTALLED = False
delayed = None
Client = object
default_client = None
get_worker = None
wait = None
class dask_Array:
"""Dummy class for dask.array.Array."""
pass
class dask_Frame:
"""Dummy class for ddask.dataframe._Frame."""
pass
...@@ -13,16 +13,12 @@ from typing import Dict, Iterable ...@@ -13,16 +13,12 @@ from typing import Dict, Iterable
from urllib.parse import urlparse from urllib.parse import urlparse
import numpy as np import numpy as np
import pandas as pd
import scipy.sparse as ss import scipy.sparse as ss
from dask import array as da
from dask import dataframe as dd
from dask import delayed
from dask.distributed import Client, default_client, get_worker, wait
from .basic import _choose_param_value, _ConfigAliases, _LIB, _log_warning, _safe_call, LightGBMError from .basic import _choose_param_value, _ConfigAliases, _LIB, _log_warning, _safe_call, LightGBMError
from .compat import DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED from .compat import (PANDAS_INSTALLED, DataFrame, Series, concat,
SKLEARN_INSTALLED,
DASK_INSTALLED, dask_Frame, dask_Array, delayed, Client, default_client, get_worker, wait)
from .sklearn import LGBMClassifier, LGBMRegressor, LGBMRanker from .sklearn import LGBMClassifier, LGBMRegressor, LGBMRanker
...@@ -46,7 +42,7 @@ def _find_open_port(worker_ip: str, local_listen_port: int, ports_to_skip: Itera ...@@ -46,7 +42,7 @@ def _find_open_port(worker_ip: str, local_listen_port: int, ports_to_skip: Itera
Returns Returns
------- -------
result : int port : int
A free port on the machine referenced by ``worker_ip``. A free port on the machine referenced by ``worker_ip``.
""" """
max_tries = 1000 max_tries = 1000
...@@ -81,7 +77,7 @@ def _find_ports_for_workers(client: Client, worker_addresses: Iterable[str], loc ...@@ -81,7 +77,7 @@ def _find_ports_for_workers(client: Client, worker_addresses: Iterable[str], loc
client : dask.distributed.Client client : dask.distributed.Client
Dask client. Dask client.
worker_addresses : Iterable[str] worker_addresses : Iterable[str]
An iterable of addresses for workers in the cluster. These are strings of the form ``<protocol>://<host>:port`` An iterable of addresses for workers in the cluster. These are strings of the form ``<protocol>://<host>:port``.
local_listen_port : int local_listen_port : int
First port to try when searching for open ports. First port to try when searching for open ports.
...@@ -109,8 +105,8 @@ def _find_ports_for_workers(client: Client, worker_addresses: Iterable[str], loc ...@@ -109,8 +105,8 @@ def _find_ports_for_workers(client: Client, worker_addresses: Iterable[str], loc
def _concat(seq): def _concat(seq):
if isinstance(seq[0], np.ndarray): if isinstance(seq[0], np.ndarray):
return np.concatenate(seq, axis=0) return np.concatenate(seq, axis=0)
elif isinstance(seq[0], (pd.DataFrame, pd.Series)): elif isinstance(seq[0], (DataFrame, Series)):
return pd.concat(seq, axis=0) return concat(seq, axis=0)
elif isinstance(seq[0], ss.spmatrix): elif isinstance(seq[0], ss.spmatrix):
return ss.vstack(seq, format='csr') return ss.vstack(seq, format='csr')
else: else:
...@@ -152,9 +148,9 @@ def _train_part(params, model_factory, list_of_parts, worker_address_to_port, re ...@@ -152,9 +148,9 @@ def _train_part(params, model_factory, list_of_parts, worker_address_to_port, re
try: try:
model = model_factory(**params) model = model_factory(**params)
if is_ranker: if is_ranker:
model.fit(data, y=label, sample_weight=weight, group=group, **kwargs) model.fit(data, label, sample_weight=weight, group=group, **kwargs)
else: else:
model.fit(data, y=label, sample_weight=weight, **kwargs) model.fit(data, label, sample_weight=weight, **kwargs)
finally: finally:
_safe_call(_LIB.LGBM_NetworkFree()) _safe_call(_LIB.LGBM_NetworkFree())
...@@ -178,13 +174,16 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group ...@@ -178,13 +174,16 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group
Parameters Parameters
---------- ----------
client: dask.Client - client client : dask.distributed.Client
X : dask array of shape = [n_samples, n_features] Dask client.
data : dask array of shape = [n_samples, n_features]
Input feature matrix. Input feature matrix.
y : dask array of shape = [n_samples] label : dask array of shape = [n_samples]
The target values (class labels in classification, real numbers in regression). The target values (class labels in classification, real numbers in regression).
params : dict params : dict
Parameters passed to constructor of the local underlying model.
model_factory : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class model_factory : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class
Class of the local underlying model.
sample_weight : array-like of shape = [n_samples] or None, optional (default=None) sample_weight : array-like of shape = [n_samples] or None, optional (default=None)
Weights of training data. Weights of training data.
group : array-like or None, optional (default=None) group : array-like or None, optional (default=None)
...@@ -193,6 +192,13 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group ...@@ -193,6 +192,13 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group
sum(group) = n_samples. sum(group) = n_samples.
For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups, For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups,
where the first 10 records are in the first group, records 11-30 are in the second group, records 31-70 are in the third group, etc. where the first 10 records are in the first group, records 11-30 are in the second group, records 31-70 are in the third group, etc.
**kwargs
Other parameters passed to ``fit`` method of the local underlying model.
Returns
-------
model : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class
Returns fitted underlying model.
""" """
params = deepcopy(params) params = deepcopy(params)
...@@ -298,7 +304,7 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group ...@@ -298,7 +304,7 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group
def _predict_part(part, model, raw_score, pred_proba, pred_leaf, pred_contrib, **kwargs): def _predict_part(part, model, raw_score, pred_proba, pred_leaf, pred_contrib, **kwargs):
data = part.values if isinstance(part, pd.DataFrame) else part data = part.values if isinstance(part, DataFrame) else part
if data.shape[0] == 0: if data.shape[0] == 0:
result = np.array([]) result = np.array([])
...@@ -319,11 +325,11 @@ def _predict_part(part, model, raw_score, pred_proba, pred_leaf, pred_contrib, * ...@@ -319,11 +325,11 @@ def _predict_part(part, model, raw_score, pred_proba, pred_leaf, pred_contrib, *
**kwargs **kwargs
) )
if isinstance(part, pd.DataFrame): if isinstance(part, DataFrame):
if pred_proba or pred_contrib: if pred_proba or pred_contrib:
result = pd.DataFrame(result, index=part.index) result = DataFrame(result, index=part.index)
else: else:
result = pd.Series(result, index=part.index, name='predictions') result = Series(result, index=part.index, name='predictions')
return result return result
...@@ -335,20 +341,34 @@ def _predict(model, data, raw_score=False, pred_proba=False, pred_leaf=False, pr ...@@ -335,20 +341,34 @@ def _predict(model, data, raw_score=False, pred_proba=False, pred_leaf=False, pr
Parameters Parameters
---------- ----------
model : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class model : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class
Fitted underlying model.
data : dask array of shape = [n_samples, n_features] data : dask array of shape = [n_samples, n_features]
Input feature matrix. Input feature matrix.
raw_score : bool, optional (default=False)
Whether to predict raw scores.
pred_proba : bool, optional (default=False) pred_proba : bool, optional (default=False)
Should method return results of ``predict_proba`` (``pred_proba=True``) or ``predict`` (``pred_proba=False``). Should method return results of ``predict_proba`` (``pred_proba=True``) or ``predict`` (``pred_proba=False``).
pred_leaf : bool, optional (default=False) pred_leaf : bool, optional (default=False)
Whether to predict leaf index. Whether to predict leaf index.
pred_contrib : bool, optional (default=False) pred_contrib : bool, optional (default=False)
Whether to predict feature contributions. Whether to predict feature contributions.
dtype : np.dtype dtype : np.dtype, optional (default=np.float32)
Dtype of the output. Dtype of the output.
kwargs : dict **kwargs
Other parameters passed to ``predict`` or ``predict_proba`` method. Other parameters passed to ``predict`` or ``predict_proba`` method.
Returns
-------
predicted_result : dask array of shape = [n_samples] or shape = [n_samples, n_classes]
The predicted values.
X_leaves : dask arrayof shape = [n_samples, n_trees] or shape = [n_samples, n_trees * n_classes]
If ``pred_leaf=True``, the predicted leaf of every tree for each sample.
X_SHAP_values : dask array of shape = [n_samples, n_features + 1] or shape = [n_samples, (n_features + 1) * n_classes] or list with n_classes length of such objects
If ``pred_contrib=True``, the feature contributions for each sample.
""" """
if isinstance(data, dd._Frame): if not all((DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED)):
raise LightGBMError('dask, pandas and scikit-learn are required for lightgbm.dask')
if isinstance(data, dask_Frame):
return data.map_partitions( return data.map_partitions(
_predict_part, _predict_part,
model=model, model=model,
...@@ -358,7 +378,7 @@ def _predict(model, data, raw_score=False, pred_proba=False, pred_leaf=False, pr ...@@ -358,7 +378,7 @@ def _predict(model, data, raw_score=False, pred_proba=False, pred_leaf=False, pr
pred_contrib=pred_contrib, pred_contrib=pred_contrib,
**kwargs **kwargs
).values ).values
elif isinstance(data, da.Array): elif isinstance(data, dask_Array):
if pred_proba: if pred_proba:
kwargs['chunks'] = (data.chunks[0], (model.n_classes_,)) kwargs['chunks'] = (data.chunks[0], (model.n_classes_,))
else: else:
...@@ -378,12 +398,9 @@ def _predict(model, data, raw_score=False, pred_proba=False, pred_leaf=False, pr ...@@ -378,12 +398,9 @@ def _predict(model, data, raw_score=False, pred_proba=False, pred_leaf=False, pr
class _DaskLGBMModel: class _DaskLGBMModel:
def __init__(self): def _fit(self, model_factory, X, y, sample_weight=None, group=None, client=None, **kwargs):
if not all((DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED)): if not all((DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED)):
raise LightGBMError('dask, pandas and scikit-learn are required for lightgbm.dask') raise LightGBMError('dask, pandas and scikit-learn are required for lightgbm.dask')
def _fit(self, model_factory, X, y=None, sample_weight=None, group=None, client=None, **kwargs):
"""Docstring is inherited from the LGBMModel."""
if client is None: if client is None:
client = default_client() client = default_client()
...@@ -422,7 +439,7 @@ class _DaskLGBMModel: ...@@ -422,7 +439,7 @@ class _DaskLGBMModel:
class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel): class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel):
"""Distributed version of lightgbm.LGBMClassifier.""" """Distributed version of lightgbm.LGBMClassifier."""
def fit(self, X, y=None, sample_weight=None, client=None, **kwargs): def fit(self, X, y, sample_weight=None, client=None, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMClassifier.fit.""" """Docstring is inherited from the lightgbm.LGBMClassifier.fit."""
return self._fit( return self._fit(
model_factory=LGBMClassifier, model_factory=LGBMClassifier,
...@@ -433,7 +450,12 @@ class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel): ...@@ -433,7 +450,12 @@ class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel):
**kwargs **kwargs
) )
fit.__doc__ = LGBMClassifier.fit.__doc__ _base_doc = LGBMClassifier.fit.__doc__
_before_init_score, _init_score, _after_init_score = _base_doc.partition('init_score :')
fit.__doc__ = (_before_init_score
+ 'client : dask.distributed.Client or None, optional (default=None)\n'
+ ' ' * 12 + 'Dask client.\n'
+ ' ' * 8 + _init_score + _after_init_score)
def predict(self, X, **kwargs): def predict(self, X, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMClassifier.predict.""" """Docstring is inherited from the lightgbm.LGBMClassifier.predict."""
...@@ -463,14 +485,15 @@ class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel): ...@@ -463,14 +485,15 @@ class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel):
Returns Returns
------- -------
model : lightgbm.LGBMClassifier model : lightgbm.LGBMClassifier
Local underlying model.
""" """
return self._to_local(LGBMClassifier) return self._to_local(LGBMClassifier)
class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel): class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel):
"""Docstring is inherited from the lightgbm.LGBMRegressor.""" """Distributed version of lightgbm.LGBMRegressor."""
def fit(self, X, y=None, sample_weight=None, client=None, **kwargs): def fit(self, X, y, sample_weight=None, client=None, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMRegressor.fit.""" """Docstring is inherited from the lightgbm.LGBMRegressor.fit."""
return self._fit( return self._fit(
model_factory=LGBMRegressor, model_factory=LGBMRegressor,
...@@ -481,7 +504,12 @@ class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel): ...@@ -481,7 +504,12 @@ class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel):
**kwargs **kwargs
) )
fit.__doc__ = LGBMRegressor.fit.__doc__ _base_doc = LGBMRegressor.fit.__doc__
_before_init_score, _init_score, _after_init_score = _base_doc.partition('init_score :')
fit.__doc__ = (_before_init_score
+ 'client : dask.distributed.Client or None, optional (default=None)\n'
+ ' ' * 12 + 'Dask client.\n'
+ ' ' * 8 + _init_score + _after_init_score)
def predict(self, X, **kwargs): def predict(self, X, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMRegressor.predict.""" """Docstring is inherited from the lightgbm.LGBMRegressor.predict."""
...@@ -499,14 +527,15 @@ class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel): ...@@ -499,14 +527,15 @@ class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel):
Returns Returns
------- -------
model : lightgbm.LGBMRegressor model : lightgbm.LGBMRegressor
Local underlying model.
""" """
return self._to_local(LGBMRegressor) return self._to_local(LGBMRegressor)
class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel): class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel):
"""Docstring is inherited from the lightgbm.LGBMRanker.""" """Distributed version of lightgbm.LGBMRanker."""
def fit(self, X, y=None, sample_weight=None, init_score=None, group=None, client=None, **kwargs): def fit(self, X, y, sample_weight=None, init_score=None, group=None, client=None, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMRanker.fit.""" """Docstring is inherited from the lightgbm.LGBMRanker.fit."""
if init_score is not None: if init_score is not None:
raise RuntimeError('init_score is not currently supported in lightgbm.dask') raise RuntimeError('init_score is not currently supported in lightgbm.dask')
...@@ -521,7 +550,12 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel): ...@@ -521,7 +550,12 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel):
**kwargs **kwargs
) )
fit.__doc__ = LGBMRanker.fit.__doc__ _base_doc = LGBMRanker.fit.__doc__
_before_eval_set, _eval_set, _after_eval_set = _base_doc.partition('eval_set :')
fit.__doc__ = (_before_eval_set
+ 'client : dask.distributed.Client or None, optional (default=None)\n'
+ ' ' * 12 + 'Dask client.\n'
+ ' ' * 8 + _eval_set + _after_eval_set)
def predict(self, X, **kwargs): def predict(self, X, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMRanker.predict.""" """Docstring is inherited from the lightgbm.LGBMRanker.predict."""
...@@ -535,5 +569,6 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel): ...@@ -535,5 +569,6 @@ class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel):
Returns Returns
------- -------
model : lightgbm.LGBMRanker model : lightgbm.LGBMRanker
Local underlying model.
""" """
return self._to_local(LGBMRanker) return self._to_local(LGBMRanker)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment