train_higgs.py 11.4 KB
Newer Older
Taylor Robie's avatar
Taylor Robie committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
r"""A script that builds boosted trees over higgs data.

If you haven't, please run data_download.py beforehand to prepare the data.

For some more details on this example, please refer to README.md as well.

Note that the model_dir is cleaned up before starting the training.

Usage:
$ python train_higgs.py --n_trees=100 --max_depth=6 --learning_rate=0.1 \
    --model_dir=/tmp/higgs_model

Note that BoostedTreesClassifier is available since Tensorflow 1.8.0.
So you need to install recent enough version of Tensorflow to use this example.

The training data is by default the first million examples out of 11M examples,
and eval data is by default the last million examples.
They are controlled by --train_start, --train_count, --eval_start, --eval_count.
e.g. to train over the first 10 million examples instead of 1 million:
$ python train_higgs.py --n_trees=100 --max_depth=6 --learning_rate=0.1 \
    --model_dir=/tmp/higgs_model --train_count=10000000

Training history and metrics can be inspected using tensorboard.
Set --logdir as the --model_dir set by flag when training
(or the default /tmp/higgs_model).
$ tensorboard --logdir=/tmp/higgs_model
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os

from absl import app as absl_app
from absl import flags
50
import numpy as np
51
import tensorflow.compat.v1 as tf
52

53
from official.r1.utils.logs import logger
54
55
56
from official.utils.flags import core as flags_core
from official.utils.flags._conventions import help_wrap

57
NPZ_FILE = "HIGGS.csv.gz.npz"  # numpy compressed file containing "data" array
58
59


60
61
def read_higgs_data(data_dir, train_start, train_count, eval_start, eval_count):
  """Reads higgs data from csv and returns train and eval data.
62

63
64
65
66
67
68
  Args:
    data_dir: A string, the directory of higgs dataset.
    train_start: An integer, the start index of train examples within the data.
    train_count: An integer, the number of train examples within the data.
    eval_start: An integer, the start index of eval examples within the data.
    eval_count: An integer, the number of eval examples within the data.
69

70
71
72
  Returns:
    Numpy array of train data and eval data.
  """
73
74
75
  npz_filename = os.path.join(data_dir, NPZ_FILE)
  try:
    # gfile allows numpy to read data from network data sources as well.
76
    with tf.gfile.Open(npz_filename, "rb") as npz_file:
77
      with np.load(npz_file) as npz:
78
        data = npz["data"]
79
  except tf.errors.NotFoundError as e:
80
    raise RuntimeError(
81
        "Error loading data; use data_download.py to prepare the data.\n{}: {}"
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
        .format(type(e).__name__, e))
  return (data[train_start:train_start+train_count],
          data[eval_start:eval_start+eval_count])


# This showcases how to make input_fn when the input data is available in the
# form of numpy arrays.
def make_inputs_from_np_arrays(features_np, label_np):
  """Makes and returns input_fn and feature_columns from numpy arrays.

  The generated input_fn will return tf.data.Dataset of feature dictionary and a
  label, and feature_columns will consist of the list of
  tf.feature_column.BucketizedColumn.

  Note, for in-memory training, tf.data.Dataset should contain the whole data
  as a single tensor. Don't use batch.

  Args:
100
    features_np: A numpy ndarray (shape=[batch_size, num_features]) for
101
        float32 features.
102
    label_np: A numpy ndarray (shape=[batch_size, 1]) for labels.
103
104

  Returns:
105
    input_fn: A function returning a Dataset of feature dict and label.
106
    feature_names: A list of feature names.
107
    feature_column: A list of tf.feature_column.BucketizedColumn.
108
109
110
111
  """
  num_features = features_np.shape[1]
  features_np_list = np.split(features_np, num_features, axis=1)
  # 1-based feature names.
112
  feature_names = ["feature_%02d" % (i + 1) for i in range(num_features)]
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142

  # Create source feature_columns and bucketized_columns.
  def get_bucket_boundaries(feature):
    """Returns bucket boundaries for feature by percentiles."""
    return np.unique(np.percentile(feature, range(0, 100))).tolist()
  source_columns = [
      tf.feature_column.numeric_column(
          feature_name, dtype=tf.float32,
          # Although higgs data have no missing values, in general, default
          # could be set as 0 or some reasonable value for missing values.
          default_value=0.0)
      for feature_name in feature_names
  ]
  bucketized_columns = [
      tf.feature_column.bucketized_column(
          source_columns[i],
          boundaries=get_bucket_boundaries(features_np_list[i]))
      for i in range(num_features)
  ]

  # Make an input_fn that extracts source features.
  def input_fn():
    """Returns features as a dictionary of numpy arrays, and a label."""
    features = {
        feature_name: tf.constant(features_np_list[i])
        for i, feature_name in enumerate(feature_names)
    }
    return tf.data.Dataset.zip((tf.data.Dataset.from_tensors(features),
                                tf.data.Dataset.from_tensors(label_np),))

143
  return input_fn, feature_names, bucketized_columns
144
145
146
147
148
149
150


def make_eval_inputs_from_np_arrays(features_np, label_np):
  """Makes eval input as streaming batches."""
  num_features = features_np.shape[1]
  features_np_list = np.split(features_np, num_features, axis=1)
  # 1-based feature names.
151
  feature_names = ["feature_%02d" % (i + 1) for i in range(num_features)]
152
153
154
155
156
157

  def input_fn():
    features = {
        feature_name: tf.constant(features_np_list[i])
        for i, feature_name in enumerate(feature_names)
    }
158
159
160
    return tf.data.Dataset.zip((
        tf.data.Dataset.from_tensor_slices(features),
        tf.data.Dataset.from_tensor_slices(label_np),)).batch(1000)
161
162
163
164

  return input_fn


165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def _make_csv_serving_input_receiver_fn(column_names, column_defaults):
  """Returns serving_input_receiver_fn for csv.

  The input arguments are relevant to `tf.decode_csv()`.

  Args:
    column_names: a list of column names in the order within input csv.
    column_defaults: a list of default values with the same size of
        column_names. Each entity must be either a list of one scalar, or an
        empty list to denote the corresponding column is required.
        e.g. [[""], [2.5], []] indicates the third column is required while
            the first column must be string and the second must be float/double.

  Returns:
    a serving_input_receiver_fn that handles csv for serving.
  """
  def serving_input_receiver_fn():
    csv = tf.placeholder(dtype=tf.string, shape=[None], name="csv")
    features = dict(zip(column_names, tf.decode_csv(csv, column_defaults)))
    receiver_tensors = {"inputs": csv}
    return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)

  return serving_input_receiver_fn


190
191
192
193
194
195
196
197
198
def train_boosted_trees(flags_obj):
  """Train boosted_trees estimator on HIGGS data.

  Args:
    flags_obj: An object containing parsed flag values.
  """
  # Clean up the model directory if present.
  if tf.gfile.Exists(flags_obj.model_dir):
    tf.gfile.DeleteRecursively(flags_obj.model_dir)
199
  tf.logging.info("## Data loading...")
200
201
202
  train_data, eval_data = read_higgs_data(
      flags_obj.data_dir, flags_obj.train_start, flags_obj.train_count,
      flags_obj.eval_start, flags_obj.eval_count)
203
  tf.logging.info("## Data loaded; train: {}{}, eval: {}{}".format(
204
      train_data.dtype, train_data.shape, eval_data.dtype, eval_data.shape))
205
  # Data consists of one label column followed by 28 feature columns.
206
  train_input_fn, feature_names, feature_columns = make_inputs_from_np_arrays(
207
208
209
      features_np=train_data[:, 1:], label_np=train_data[:, 0:1])
  eval_input_fn = make_eval_inputs_from_np_arrays(
      features_np=eval_data[:, 1:], label_np=eval_data[:, 0:1])
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
  tf.logging.info("## Features prepared. Training starts...")

  # Create benchmark logger to log info about the training and metric values
  run_params = {
      "train_start": flags_obj.train_start,
      "train_count": flags_obj.train_count,
      "eval_start": flags_obj.eval_start,
      "eval_count": flags_obj.eval_count,
      "n_trees": flags_obj.n_trees,
      "max_depth": flags_obj.max_depth,
  }
  benchmark_logger = logger.config_benchmark_logger(flags_obj)
  benchmark_logger.log_run_info(
      model_name="boosted_trees",
      dataset_name="higgs",
225
226
      run_params=run_params,
      test_id=flags_obj.benchmark_test_id)
227
228
229

  # Though BoostedTreesClassifier is under tf.estimator, faster in-memory
  # training is yet provided as a contrib library.
230
231
  from tensorflow.contrib import estimator as contrib_estimator  # pylint: disable=g-import-not-at-top
  classifier = contrib_estimator.boosted_trees_classifier_train_in_memory(
232
233
234
235
236
237
238
239
      train_input_fn,
      feature_columns,
      model_dir=flags_obj.model_dir or None,
      n_trees=flags_obj.n_trees,
      max_depth=flags_obj.max_depth,
      learning_rate=flags_obj.learning_rate)

  # Evaluation.
240
241
242
  eval_results = classifier.evaluate(eval_input_fn)
  # Benchmark the evaluation results
  benchmark_logger.log_evaluation_result(eval_results)
243

244
  # Exporting the savedmodel with csv parsing.
245
  if flags_obj.export_dir is not None:
246
247
248
249
250
    classifier.export_savedmodel(
        flags_obj.export_dir,
        _make_csv_serving_input_receiver_fn(
            column_names=feature_names,
            # columns are all floats.
251
252
            column_defaults=[[0.0]] * len(feature_names)),
        strip_default_attrs=True)
253
254
255
256
257
258


def main(_):
  train_boosted_trees(flags.FLAGS)


259
260
def define_train_higgs_flags():
  """Add tree related flags as well as training/eval configuration."""
261
  flags_core.define_base(clean=False, stop_threshold=False, batch_size=False,
262
                         num_gpu=False, export_dir=True)
263
  flags_core.define_benchmark()
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
  flags.adopt_module_key_flags(flags_core)

  flags.DEFINE_integer(
      name="train_start", default=0,
      help=help_wrap("Start index of train examples within the data."))
  flags.DEFINE_integer(
      name="train_count", default=1000000,
      help=help_wrap("Number of train examples within the data."))
  flags.DEFINE_integer(
      name="eval_start", default=10000000,
      help=help_wrap("Start index of eval examples within the data."))
  flags.DEFINE_integer(
      name="eval_count", default=1000000,
      help=help_wrap("Number of eval examples within the data."))

  flags.DEFINE_integer(
      "n_trees", default=100, help=help_wrap("Number of trees to build."))
  flags.DEFINE_integer(
      "max_depth", default=6, help=help_wrap("Maximum depths of each tree."))
  flags.DEFINE_float(
      "learning_rate", default=0.1,
      help=help_wrap("The learning rate."))

  flags_core.set_defaults(data_dir="/tmp/higgs_data",
                          model_dir="/tmp/higgs_model")


if __name__ == "__main__":
292
293
294
295
  # Training progress and eval results are shown as logging.INFO; so enables it.
  tf.logging.set_verbosity(tf.logging.INFO)
  define_train_higgs_flags()
  absl_app.run(main)