ranking.py 1.57 KB
Newer Older
1
2
import os

3
4
5
6
7
8
9
10
11
12
import dask.array as da
import numpy as np
from distributed import Client, LocalCluster
from sklearn.datasets import load_svmlight_file

import lightgbm as lgb

if __name__ == "__main__":
    print("loading data")

13
14
15
16
    X, y = load_svmlight_file(os.path.join(os.path.dirname(os.path.realpath(__file__)),
                                           '../../lambdarank/rank.train'))
    group = np.loadtxt(os.path.join(os.path.dirname(os.path.realpath(__file__)),
                                    '../../lambdarank/rank.train.query'))
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

    print("initializing a Dask cluster")

    cluster = LocalCluster(n_workers=2)
    client = Client(cluster)

    print("created a Dask LocalCluster")

    print("distributing training data on the Dask cluster")

    # split training data into two partitions
    rows_in_part1 = int(np.sum(group[:100]))
    rows_in_part2 = X.shape[0] - rows_in_part1
    num_features = X.shape[1]

    # make this array dense because we're splitting across
    # a sparse boundary to partition the data
    X = X.todense()

    dX = da.from_array(
        x=X,
        chunks=[
            (rows_in_part1, rows_in_part2),
            (num_features,)
        ]
    )
    dy = da.from_array(
        x=y,
        chunks=[
            (rows_in_part1, rows_in_part2),
        ]
    )
    dg = da.from_array(
        x=group,
        chunks=[
            (100, group.size - 100)
        ]
    )

    print("beginning training")

    dask_model = lgb.DaskLGBMRanker(n_estimators=10)
    dask_model.fit(dX, dy, group=dg)
    assert dask_model.fitted_

    print("done training")