Commit dff0f0c1 authored by Alexander Gorban's avatar Alexander Gorban
Browse files

Merge branch 'master' of github.com:tensorflow/models

parents da341f70 36203f09
CIFAR-10 is a common benchmark in machine learning for image recognition.
http://www.cs.toronto.edu/~kriz/cifar.html
Code in this directory focuses on how to use TensorFlow Estimators to train and evaluate a CIFAR-10 ResNet model on:
* A single host with one CPU;
* A single host with multiple GPUs;
* Multiple hosts with CPU or multiple GPUs;
Before trying to run the model we highly encourage you to read all the README.
## Prerequisite
1. Install TensorFlow version 1.2.1 or later with GPU support.
You can see how to do it [here](https://www.tensorflow.org/install/).
2. Download the CIFAR-10 dataset.
```shell
$ curl -o cifar-10-python.tar.gz https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
$ tar xzf cifar-10-python.tar.gz
```
After running the commands above, you should see the following files in the folder where the data was downloaded.
``` shell
$ ls -R cifar-10-batches-py
batches.meta data_batch_1 data_batch_2 data_batch_3
data_batch_4 data_batch_5 readme.html test_batch
```
3. Generate TFRecord files.
```shell
# This will generate a tf record for the training and test data available at the input_dir.
# You can see more details in generate_cifar10_tf_records.py
$ python generate_cifar10_tfrecords.py --input_dir=/prefix/to/downloaded/data/cifar-10-batches-py \
--output_dir=/prefix/to/downloaded/data/cifar-10-batches-py
```
After running the command above, you should see the following new files in the output_dir.
``` shell
$ ls -R cifar-10-batches-py
train.tfrecords validation.tfrecords eval.tfrecords
```
## How to run on local mode
```
# Run the model on CPU only. After training, it runs the evaluation.
$ python cifar10_main.py --data_dir=/prefix/to/downloaded/data/cifar-10-batches-py \
--model_dir=/tmp/cifar10 \
--is_cpu_ps=True \
--num_gpus=0 \
--train_steps=1000
# Run the model on 2 GPUs using CPU as parameter server. After training, it runs the evaluation.
$ python cifar10_main.py --data_dir=/prefix/to/downloaded/data/cifar-10-batches-py \
--model_dir=/tmp/cifar10 \
--is_cpu_ps=True \
--force_gpu_compatible=True \
--num_gpus=2 \
--train_steps=1000
# Run the model on 2 GPUs using GPU as parameter server.
# It will run an experiment, which for local setting basically means it will run stop training
# a couple of times to perform evaluation.
$ python cifar10_main.py --data_dir=/prefix/to/downloaded/data/cifar-10-batches-bin \
--model_dir=/tmp/cifar10 \
--is_cpu_ps=False \
--force_gpu_compatible=True \
--num_gpus=2 \
--train_steps=1000
--run_experiment=True
# There are more command line flags to play with; check cifar10_main.py for details.
```
## How to run on distributed mode
### Set TF_CONFIG
Considering that you already have multiple hosts configured, all you need is a `TF_CONFIG`
environment variable on each host. You can set up the hosts manually or check [tensorflow/ecosystem](https://github.com/tensorflow/ecosystem) for instructions about how to set up a Cluster.
The `TF_CONFIG` will be used by the `RunConfig` to know the existing hosts and their task: `master`, `ps` or `worker`.
Here's an example of `TF_CONFIG`.
```python
cluster = {'master': ['master-ip:8000'],
'ps': ['ps-ip:8000'],
'worker': ['worker-ip:8000']}
TF_CONFIG = json.dumps(
{'cluster': cluster,
'task': {'type': master, 'index': 0},
'model_dir': 'gs://<bucket_path>/<dir_path>',
'environment': 'cloud'
})
```
*Cluster*
A cluster spec, which is basically a dictionary that describes all of the tasks in the cluster. More about it [here](https://www.tensorflow.org/deploy/distributed).
In this cluster spec we are defining a cluster with 1 master, 1 ps and 1 worker.
* `ps`: saves the parameters among all workers. All workers can read/write/update the parameters for model via ps.
As some models are extremely large the parameters are shared among the ps (each ps stores a subset).
* `worker`: does the training.
* `master`: basically a special worker, it does training, but also restores and saves checkpoints and do evaluation.
*Task*
The Task defines what is the role of the current node, for this example the node is the master on index 0
on the cluster spec, the task will be different for each node. An example of the `TF_CONFIG` for a worker would be:
```python
cluster = {'master': ['master-ip:8000'],
'ps': ['ps-ip:8000'],
'worker': ['worker-ip:8000']}
TF_CONFIG = json.dumps(
{'cluster': cluster,
'task': {'type': worker, 'index': 0},
'model_dir': 'gs://<bucket_path>/<dir_path>',
'environment': 'cloud'
})
```
*Model_dir*
This is the path where the master will save the checkpoints, graph and TensorBoard files.
For a multi host environment you may want to use a Distributed File System, Google Storage and DFS are supported.
*Environment*
By the default environment is *local*, for a distributed setting we need to change it to *cloud*.
### Running script
Once you have a `TF_CONFIG` configured properly on each host you're ready to run on distributed settings.
#### Master
```shell
# Run this on master:
# Runs an Experiment in sync mode on 4 GPUs using CPU as parameter server for 40000 steps.
# It will run evaluation a couple of times during training.
# The num_workers arugument is used only to update the learning rate correctly.
# Make sure the model_dir is the same as defined on the TF_CONFIG.
$ python cifar10_main.py --data_dir=gs://path/cifar-10-batches-py \
--model_dir=gs://path/model_dir/ \
--is_cpu_ps=True \
--force_gpu_compatible=True \
--num_gpus=4 \
--train_steps=40000 \
--sync=True \
--run_experiment=True \
--num_workers=2
```
*Output:*
```shell
INFO:tensorflow:Using model_dir in TF_CONFIG: gs://path/model_dir/
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_num_ps_replicas': 1, '_keep_checkpoint_max': 5, '_task_type': u'master', '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fd16fb2be10>, '_model_dir': 'gs://path/model_dir/', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_session_config': intra_op_parallelism_threads: 1
gpu_options {
}
allow_soft_placement: true
, '_tf_random_seed': None, '_environment': u'cloud', '_num_worker_replicas': 1, '_task_id': 0, '_save_summary_steps': 100, '_tf_config': gpu_options {
per_process_gpu_memory_fraction: 1.0
}
, '_evaluation_master': '', '_master': u'grpc://master-ip:8000'}
...
2017-08-01 19:59:26.496208: I tensorflow/core/common_runtime/gpu/gpu_device.cc:940] Found device 0 with properties:
name: Tesla K80
major: 3 minor: 7 memoryClockRate (GHz) 0.8235
pciBusID 0000:00:04.0
Total memory: 11.17GiB
Free memory: 11.09GiB
2017-08-01 19:59:26.775660: I tensorflow/core/common_runtime/gpu/gpu_device.cc:940] Found device 1 with properties:
name: Tesla K80
major: 3 minor: 7 memoryClockRate (GHz) 0.8235
pciBusID 0000:00:05.0
Total memory: 11.17GiB
Free memory: 11.10GiB
...
2017-08-01 19:59:29.675171: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:8000
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_1/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_2/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_3/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_4/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_5/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_6/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1/avg_pool/: (?, 16, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_2/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_3/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_4/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_2/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_3/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_4/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_5/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_6/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1/avg_pool/: (?, 32, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_1/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_2/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_3/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_4/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_5/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_6/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/global_avg_pool/: (?, 64)
INFO:tensorflow:image after unit resnet/tower_0/fully_connected/: (?, 11)
INFO:tensorflow:SyncReplicasV2: replicas_to_aggregate=1; total_num_replicas=1
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Restoring parameters from gs://path/model_dir/model.ckpt-0
2017-08-01 19:59:37.560775: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session 156fcb55fe6648d6 with config:
intra_op_parallelism_threads: 1
gpu_options {
per_process_gpu_memory_fraction: 1
}
allow_soft_placement: true
INFO:tensorflow:Saving checkpoints for 1 into gs://path/model_dir/model.ckpt.
INFO:tensorflow:loss = 1.20682, step = 1
INFO:tensorflow:loss = 1.20682, learning_rate = 0.1
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_1/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_2/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_3/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_4/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_5/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_6/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1/avg_pool/: (?, 16, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_2/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_3/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_4/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_5/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_6/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1/avg_pool/: (?, 32, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_1/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_2/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_3/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_4/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_5/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_6/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/global_avg_pool/: (?, 64)
INFO:tensorflow:image after unit resnet/tower_0/fully_connected/: (?, 11)
INFO:tensorflow:SyncReplicasV2: replicas_to_aggregate=2; total_num_replicas=2
INFO:tensorflow:Starting evaluation at 2017-08-01-20:00:14
2017-08-01 20:00:15.745881: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Creating TensorFlow device (/gpu:0) -> (device: 0, name: Tesla K80, pci bus id: 0000:00:04.0)
2017-08-01 20:00:15.745949: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Creating TensorFlow device (/gpu:1) -> (device: 1, name: Tesla K80, pci bus id: 0000:00:05.0)
2017-08-01 20:00:15.745958: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Creating TensorFlow device (/gpu:2) -> (device: 2, name: Tesla K80, pci bus id: 0000:00:06.0)
2017-08-01 20:00:15.745964: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Creating TensorFlow device (/gpu:3) -> (device: 3, name: Tesla K80, pci bus id: 0000:00:07.0)
2017-08-01 20:00:15.745969: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Creating TensorFlow device (/gpu:4) -> (device: 4, name: Tesla K80, pci bus id: 0000:00:08.0)
2017-08-01 20:00:15.745975: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Creating TensorFlow device (/gpu:5) -> (device: 5, name: Tesla K80, pci bus id: 0000:00:09.0)
2017-08-01 20:00:15.745987: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Creating TensorFlow device (/gpu:6) -> (device: 6, name: Tesla K80, pci bus id: 0000:00:0a.0)
2017-08-01 20:00:15.745997: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Creating TensorFlow device (/gpu:7) -> (device: 7, name: Tesla K80, pci bus id: 0000:00:0b.0)
INFO:tensorflow:Restoring parameters from gs://path/model_dir/model.ckpt-10023
INFO:tensorflow:Evaluation [1/100]
INFO:tensorflow:Evaluation [2/100]
INFO:tensorflow:Evaluation [3/100]
INFO:tensorflow:Evaluation [4/100]
INFO:tensorflow:Evaluation [5/100]
INFO:tensorflow:Evaluation [6/100]
INFO:tensorflow:Evaluation [7/100]
INFO:tensorflow:Evaluation [8/100]
INFO:tensorflow:Evaluation [9/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [11/100]
INFO:tensorflow:Evaluation [12/100]
INFO:tensorflow:Evaluation [13/100]
...
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Finished evaluation at 2017-08-01-20:00:31
INFO:tensorflow:Saving dict for global step 1: accuracy = 0.0994, global_step = 1, loss = 630.425
```
#### Worker
```shell
# Run this on worker:
# Runs an Experiment in sync mode on 4 GPUs using CPU as parameter server for 40000 steps.
# It will run evaluation a couple of times during training.
# Make sure the model_dir is the same as defined on the TF_CONFIG.
$ python cifar10_main.py --data_dir=gs://path/cifar-10-batches-py \
--model_dir=gs://path/model_dir/ \
--is_cpu_ps=True \
--force_gpu_compatible=True \
--num_gpus=4 \
--train_steps=40000 \
--sync=True
--run_experiment=True
```
*Output:*
```shell
INFO:tensorflow:Using model_dir in TF_CONFIG: gs://path/model_dir/
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600,
'_num_ps_replicas': 1, '_keep_checkpoint_max': 5, '_task_type': u'worker',
'_is_chief': False, '_cluster_spec':
<tensorflow.python.training.server_lib.ClusterSpec object at 0x7f6918438e10>,
'_model_dir': 'gs://<path>/model_dir/',
'_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000,
'_session_config': intra_op_parallelism_threads: 1
gpu_options {
}
allow_soft_placement: true
, '_tf_random_seed': None, '_environment': u'cloud', '_num_worker_replicas': 1,
'_task_id': 0, '_save_summary_steps': 100, '_tf_config': gpu_options {
per_process_gpu_memory_fraction: 1.0
}
...
2017-08-01 19:59:26.496208: I tensorflow/core/common_runtime/gpu/gpu_device.cc:940] Found device 0 with properties:
name: Tesla K80
major: 3 minor: 7 memoryClockRate (GHz) 0.8235
pciBusID 0000:00:04.0
Total memory: 11.17GiB
Free memory: 11.09GiB
2017-08-01 19:59:26.775660: I tensorflow/core/common_runtime/gpu/gpu_device.cc:940] Found device 1 with properties:
name: Tesla K80
major: 3 minor: 7 memoryClockRate (GHz) 0.8235
pciBusID 0000:00:05.0
Total memory: 11.17GiB
Free memory: 11.10GiB
...
2017-08-01 19:59:29.675171: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:8000
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_1/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_2/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_3/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_4/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_5/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage/residual_v1_6/: (?, 16, 32, 32)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1/avg_pool/: (?, 16, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_2/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_3/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_4/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_1/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_2/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_3/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_4/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_5/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_1/residual_v1_6/: (?, 32, 16, 16)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1/avg_pool/: (?, 32, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_1/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_2/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_3/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_4/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_5/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/stage_2/residual_v1_6/: (?, 64, 8, 8)
INFO:tensorflow:image after unit resnet/tower_0/global_avg_pool/: (?, 64)
INFO:tensorflow:image after unit resnet/tower_0/fully_connected/: (?, 11)
INFO:tensorflow:SyncReplicasV2: replicas_to_aggregate=2; total_num_replicas=2
INFO:tensorflow:Create CheckpointSaverHook.
2017-07-31 22:38:04.629150: I
tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting
for response from worker: /job:master/replica:0/task:0
2017-07-31 22:38:09.263492: I
tensorflow/core/distributed_runtime/master_session.cc:999] Start master
session cc58f93b1e259b0c with config:
intra_op_parallelism_threads: 1
gpu_options {
per_process_gpu_memory_fraction: 1
}
allow_soft_placement: true
INFO:tensorflow:loss = 5.82382, step = 0
INFO:tensorflow:loss = 5.82382, learning_rate = 0.8
INFO:tensorflow:Average examples/sec: 1116.92 (1116.92), step = 10
INFO:tensorflow:Average examples/sec: 1233.73 (1377.83), step = 20
INFO:tensorflow:Average examples/sec: 1485.43 (2509.3), step = 30
INFO:tensorflow:Average examples/sec: 1680.27 (2770.39), step = 40
INFO:tensorflow:Average examples/sec: 1825.38 (2788.78), step = 50
INFO:tensorflow:Average examples/sec: 1929.32 (2697.27), step = 60
INFO:tensorflow:Average examples/sec: 2015.17 (2749.05), step = 70
INFO:tensorflow:loss = 37.6272, step = 79 (19.554 sec)
INFO:tensorflow:loss = 37.6272, learning_rate = 0.8 (19.554 sec)
INFO:tensorflow:Average examples/sec: 2074.92 (2618.36), step = 80
INFO:tensorflow:Average examples/sec: 2132.71 (2744.13), step = 90
INFO:tensorflow:Average examples/sec: 2183.38 (2777.21), step = 100
INFO:tensorflow:Average examples/sec: 2224.4 (2739.03), step = 110
INFO:tensorflow:Average examples/sec: 2240.28 (2431.26), step = 120
INFO:tensorflow:Average examples/sec: 2272.12 (2739.32), step = 130
INFO:tensorflow:Average examples/sec: 2300.68 (2750.03), step = 140
INFO:tensorflow:Average examples/sec: 2325.81 (2745.63), step = 150
INFO:tensorflow:Average examples/sec: 2347.14 (2721.53), step = 160
INFO:tensorflow:Average examples/sec: 2367.74 (2754.54), step = 170
INFO:tensorflow:loss = 27.8453, step = 179 (18.893 sec)
...
```
#### PS
```shell
# Run this on ps:
# The ps will not do training so most of the arguments won't affect the execution
$ python cifar10_main.py --run_experiment=True --model_dir=gs://path/model_dir/
# There are more command line flags to play with; check cifar10_main.py for details.
```
*Output:*
```shell
INFO:tensorflow:Using model_dir in TF_CONFIG: gs://path/model_dir/
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_num_ps_replicas': 1, '_keep_checkpoint_max': 5, '_task_type': u'ps', '_is_chief': False, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f48f1addf90>, '_model_dir': 'gs://path/model_dir/', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_session_config': intra_op_parallelism_threads: 1
gpu_options {
}
allow_soft_placement: true
, '_tf_random_seed': None, '_environment': u'cloud', '_num_worker_replicas': 1, '_task_id': 0, '_save_summary_steps': 100, '_tf_config': gpu_options {
per_process_gpu_memory_fraction: 1.0
}
, '_evaluation_master': '', '_master': u'grpc://master-ip:8000'}
2017-07-31 22:54:58.928088: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job master -> {0 -> master-ip:8000}
2017-07-31 22:54:58.928153: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:8000}
2017-07-31 22:54:58.928160: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> worker-ip:8000}
2017-07-31 22:54:58.929873: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:8000
```
## Visualizing results with TensorFlow
When using Estimators you can also visualize your data in TensorBoard, with no changes in your code. You can use TensorBoard to visualize your TensorFlow graph, plot quantitative metrics about the execution of your graph, and show additional data like images that pass through it.
You'll see something similar to this if you "point" TensorBoard to the `model_dir` you used to train or evaluate your model.
```shell
# Check TensorBoard during training or after it.
# Just point TensorBoard to the model_dir you chose on the previous step
# by default the model_dir is "sentiment_analysis_output"
$ tensorboard --log_dir="sentiment_analysis_output"
```
## Warnings
When runninng `cifar10_main.py` with `--sync=True` argument you may see an error similar to:
```python
File "cifar10_main.py", line 538, in <module>
tf.app.run()
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/platform/app.py", line 48, in run
_sys.exit(main(_sys.argv[:1] + flags_passthrough))
File "cifar10_main.py", line 518, in main
hooks), run_config=config)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/contrib/learn/python/learn/learn_runner.py", line 210, in run
return _execute_schedule(experiment, schedule)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/contrib/learn/python/learn/learn_runner.py", line 47, in _execute_schedule
return task()
File "/usr/local/lib/python2.7/dist-packages/tensorflow/contrib/learn/python/learn/experiment.py", line 501, in train_and_evaluate
hooks=self._eval_hooks)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/contrib/learn/python/learn/experiment.py", line 681, in _call_evaluate
hooks=hooks)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 292, in evaluate
name=name)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 638, in _evaluate_model
features, labels, model_fn_lib.ModeKeys.EVAL)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 545, in _call_model_fn
features=features, labels=labels, **kwargs)
File "cifar10_main.py", line 331, in _resnet_model_fn
gradvars, global_step=tf.train.get_global_step())
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/sync_replicas_optimizer.py", line 252, in apply_gradients
variables.global_variables())
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrapped
return _add_should_use_warning(fn(*args, **kwargs))
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warning
wrapped = TFShouldUseWarningWrapper(x)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__
stack = [s.strip() for s in traceback.format_stack()]
```
This should not affect your training, and should be fixed on the next releases.
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""CIFAR-10 data set.
See http://www.cs.toronto.edu/~kriz/cifar.html.
"""
import os
import tensorflow as tf
HEIGHT = 32
WIDTH = 32
DEPTH = 3
class Cifar10DataSet(object):
"""Cifar10 data set.
Described by http://www.cs.toronto.edu/~kriz/cifar.html.
"""
def __init__(self, data_dir, subset='train', use_distortion=True):
self.data_dir = data_dir
self.subset = subset
self.use_distortion = use_distortion
def get_filenames(self):
if self.subset in ['train', 'validation', 'eval']:
return [os.path.join(self.data_dir, self.subset + '.tfrecords')]
else:
raise ValueError('Invalid data subset "%s"' % self.subset)
def parser(self, serialized_example):
"""Parses a single tf.Example into image and label tensors."""
# Dimensions of the images in the CIFAR-10 dataset.
# See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the
# input format.
features = tf.parse_single_example(
serialized_example,
features={
'image': tf.FixedLenFeature([], tf.string),
'label': tf.FixedLenFeature([], tf.int64),
})
image = tf.decode_raw(features['image'], tf.uint8)
image.set_shape([DEPTH * HEIGHT * WIDTH])
# Reshape from [depth * height * width] to [depth, height, width].
image = tf.cast(
tf.transpose(tf.reshape(image, [DEPTH, HEIGHT, WIDTH]), [1, 2, 0]),
tf.float32)
label = tf.cast(features['label'], tf.int32)
# Custom preprocessing.
image = self.preprocess(image)
return image, label
def make_batch(self, batch_size):
"""Read the images and labels from 'filenames'."""
filenames = self.get_filenames()
# Repeat infinitely.
dataset = tf.contrib.data.TFRecordDataset(filenames).repeat()
# Parse records.
dataset = dataset.map(self.parser, num_threads=batch_size,
output_buffer_size=2 * batch_size)
# Potentially shuffle records.
if self.subset == 'train':
min_queue_examples = int(
Cifar10DataSet.num_examples_per_epoch(self.subset) * 0.4)
# Ensure that the capacity is sufficiently large to provide good random
# shuffling.
dataset = dataset.shuffle(buffer_size=min_queue_examples + 3 * batch_size)
# Batch it up.
dataset = dataset.batch(batch_size)
iterator = dataset.make_one_shot_iterator()
image_batch, label_batch = iterator.get_next()
return image_batch, label_batch
def preprocess(self, image):
"""Preprocess a single image in [height, width, depth] layout."""
if self.subset == 'train' and self.use_distortion:
# Pad 4 pixels on each dimension of feature map, done in mini-batch
image = tf.image.resize_image_with_crop_or_pad(image, 40, 40)
image = tf.random_crop(image, [HEIGHT, WIDTH, DEPTH])
image = tf.image.random_flip_left_right(image)
return image
@staticmethod
def num_examples_per_epoch(subset='train'):
if subset == 'train':
return 45000
elif subset == 'validation':
return 5000
elif subset == 'eval':
return 10000
else:
raise ValueError('Invalid data subset "%s"' % subset)
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""ResNet model for classifying images from CIFAR-10 dataset.
Support single-host training with one or multiple devices.
ResNet as proposed in:
Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
Deep Residual Learning for Image Recognition. arXiv:1512.03385
CIFAR-10 as in:
http://www.cs.toronto.edu/~kriz/cifar.html
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import functools
import operator
import os
import cifar10
import cifar10_model
import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training import basic_session_run_hooks
from tensorflow.python.training import session_run_hook
from tensorflow.python.training import training_util
tf.logging.set_verbosity(tf.logging.INFO)
FLAGS = tf.flags.FLAGS
tf.flags.DEFINE_string('data_dir', '',
'The directory where the CIFAR-10 input data is stored.')
tf.flags.DEFINE_string('model_dir', '',
'The directory where the model will be stored.')
tf.flags.DEFINE_boolean('is_cpu_ps', True,
'If using CPU as the parameter server.')
tf.flags.DEFINE_integer('num_gpus', 1,
'The number of gpus used. Uses only CPU if set to 0.')
tf.flags.DEFINE_integer('num_layers', 44, 'The number of layers of the model.')
tf.flags.DEFINE_integer('train_steps', 80000,
'The number of steps to use for training.')
tf.flags.DEFINE_integer('train_batch_size', 128, 'Batch size for training.')
tf.flags.DEFINE_integer('eval_batch_size', 100, 'Batch size for validation.')
tf.flags.DEFINE_float('momentum', 0.9, 'Momentum for MomentumOptimizer.')
tf.flags.DEFINE_float('weight_decay', 2e-4, 'Weight decay for convolutions.')
tf.flags.DEFINE_float('learning_rate', 0.1,
'This is the inital learning rate value.'
' The learning rate will decrease during training.'
' For more details check the model_fn implementation'
' in this file.')
tf.flags.DEFINE_boolean('use_distortion_for_training', True,
'If doing image distortion for training.')
tf.flags.DEFINE_boolean('run_experiment', False,
'If True will run an experiment,'
' otherwise will run training and evaluation'
' using the estimator interface.'
' Experiments perform training on several workers in'
' parallel, in other words experiments know how to'
' invoke train and eval in a sensible fashion for'
' distributed training.')
tf.flags.DEFINE_boolean('sync', False,
'If true when running in a distributed environment'
' will run on sync mode.')
tf.flags.DEFINE_integer('num_workers', 1, 'Number of workers.')
# Perf flags
tf.flags.DEFINE_integer('num_intra_threads', 1,
'Number of threads to use for intra-op parallelism.'
' If set to 0, the system will pick an appropriate number.'
' The default is 1 since in this example CPU only handles'
' the input pipeline and gradient aggregation (when'
' --is_cpu_ps). Ops that could potentially benefit'
' from intra-op parallelism are scheduled to run on GPUs.')
tf.flags.DEFINE_integer('num_inter_threads', 0,
'Number of threads to use for inter-op'
' parallelism. If set to 0, the system will pick'
' an appropriate number.')
tf.flags.DEFINE_boolean('force_gpu_compatible', False,
'Whether to enable force_gpu_compatible in'
' GPU_Options. Check'
' tensorflow/core/protobuf/config.proto#L69'
' for details.')
# Debugging flags
tf.flags.DEFINE_boolean('log_device_placement', False,
'Whether to log device placement.')
class ExamplesPerSecondHook(session_run_hook.SessionRunHook):
"""Hook to print out examples per second.
Total time is tracked and then divided by the total number of steps
to get the average step time and then batch_size is used to determine
the running average of examples per second. The examples per second for the
most recent interval is also logged.
"""
def __init__(
self,
batch_size,
every_n_steps=100,
every_n_secs=None,):
"""Initializer for ExamplesPerSecondHook.
Args:
batch_size: Total batch size used to calculate examples/second from
global time.
every_n_steps: Log stats every n steps.
every_n_secs: Log stats every n seconds.
"""
if (every_n_steps is None) == (every_n_secs is None):
raise ValueError('exactly one of every_n_steps'
' and every_n_secs should be provided.')
self._timer = basic_session_run_hooks.SecondOrStepTimer(
every_steps=every_n_steps, every_secs=every_n_secs)
self._step_train_time = 0
self._total_steps = 0
self._batch_size = batch_size
def begin(self):
self._global_step_tensor = training_util.get_global_step()
if self._global_step_tensor is None:
raise RuntimeError(
'Global step should be created to use StepCounterHook.')
def before_run(self, run_context): # pylint: disable=unused-argument
return basic_session_run_hooks.SessionRunArgs(self._global_step_tensor)
def after_run(self, run_context, run_values):
_ = run_context
global_step = run_values.results
if self._timer.should_trigger_for_step(global_step):
elapsed_time, elapsed_steps = self._timer.update_last_triggered_step(
global_step)
if elapsed_time is not None:
steps_per_sec = elapsed_steps / elapsed_time
self._step_train_time += elapsed_time
self._total_steps += elapsed_steps
average_examples_per_sec = self._batch_size * (
self._total_steps / self._step_train_time)
current_examples_per_sec = steps_per_sec * self._batch_size
# Average examples/sec followed by current examples/sec
logging.info('%s: %g (%g), step = %g', 'Average examples/sec',
average_examples_per_sec, current_examples_per_sec,
self._total_steps)
class GpuParamServerDeviceSetter(object):
"""Used with tf.device() to place variables on the least loaded GPU.
A common use for this class is to pass a list of GPU devices, e.g. ['gpu:0',
'gpu:1','gpu:2'], as ps_devices. When each variable is placed, it will be
placed on the least loaded gpu. All other Ops, which will be the computation
Ops, will be placed on the worker_device.
"""
def __init__(self, worker_device, ps_devices):
"""Initializer for GpuParamServerDeviceSetter.
Args:
worker_device: the device to use for computation Ops.
ps_devices: a list of devices to use for Variable Ops. Each variable is
assigned to the least loaded device.
"""
self.ps_devices = ps_devices
self.worker_device = worker_device
self.ps_sizes = [0] * len(self.ps_devices)
def __call__(self, op):
if op.device:
return op.device
if op.type not in ['Variable', 'VariableV2', 'VarHandleOp']:
return self.worker_device
# Gets the least loaded ps_device
device_index, _ = min(enumerate(self.ps_sizes), key=operator.itemgetter(1))
device_name = self.ps_devices[device_index]
var_size = op.outputs[0].get_shape().num_elements()
self.ps_sizes[device_index] += var_size
return device_name
def _create_device_setter(is_cpu_ps, worker, num_gpus):
"""Create device setter object."""
if is_cpu_ps:
# tf.train.replica_device_setter supports placing variables on the CPU, all
# on one GPU, or on ps_servers defined in a cluster_spec.
return tf.train.replica_device_setter(
worker_device=worker, ps_device='/cpu:0', ps_tasks=1)
else:
gpus = ['/gpu:%d' % i for i in range(num_gpus)]
return GpuParamServerDeviceSetter(worker, gpus)
def _resnet_model_fn(features, labels, mode):
"""Resnet model body.
Support single host, one or more GPU training. Parameter distribution can be
either one of the following scheme.
1. CPU is the parameter server and manages gradient updates.
2. Parameters are distributed evenly across all GPUs, and the first GPU
manages gradient updates.
Args:
features: a list of tensors, one for each tower
labels: a list of tensors, one for each tower
mode: ModeKeys.TRAIN or EVAL
Returns:
A EstimatorSpec object.
"""
is_training = (mode == tf.estimator.ModeKeys.TRAIN)
is_cpu_ps = FLAGS.is_cpu_ps
num_gpus = FLAGS.num_gpus
weight_decay = FLAGS.weight_decay
momentum = FLAGS.momentum
tower_features = features
tower_labels = labels
tower_losses = []
tower_gradvars = []
tower_preds = []
if num_gpus != 0:
for i in range(num_gpus):
worker = '/gpu:%d' % i
device_setter = _create_device_setter(is_cpu_ps, worker, FLAGS.num_gpus)
with tf.variable_scope('resnet', reuse=bool(i != 0)):
with tf.name_scope('tower_%d' % i) as name_scope:
with tf.device(device_setter):
_tower_fn(is_training, weight_decay, tower_features[i],
tower_labels[i], tower_losses, tower_gradvars,
tower_preds, False)
if i == 0:
# Only trigger batch_norm moving mean and variance update from the
# 1st tower. Ideally, we should grab the updates from all towers
# but these stats accumulate extremely fast so we can ignore the
# other stats from the other towers without significant detriment.
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS,
name_scope)
else:
with tf.variable_scope('resnet'), tf.device('/cpu:0'):
with tf.name_scope('tower_cpu') as name_scope:
_tower_fn(is_training, weight_decay, tower_features[0], tower_labels[0],
tower_losses, tower_gradvars, tower_preds, True)
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope)
# Now compute global loss and gradients.
gradvars = []
# parameter server here isn't necessarily one server storing the model params.
# (For gpu-as-ps case, model params are distributed evenly across all gpus.)
# It's the server that runs the ops to apply global gradient updates.
ps_device = '/cpu:0' if is_cpu_ps else '/gpu:0'
with tf.device(ps_device):
with tf.name_scope('gradient_averaging'):
loss = tf.reduce_mean(tower_losses, name='loss')
for zipped_gradvars in zip(*tower_gradvars):
# Averaging one var's gradients computed from multiple towers
var = zipped_gradvars[0][1]
grads = [gv[0] for gv in zipped_gradvars]
with tf.device(var.device):
if len(grads) == 1:
avg_grad = grads[0]
else:
avg_grad = tf.multiply(tf.add_n(grads), 1. / len(grads))
gradvars.append((avg_grad, var))
# Suggested learning rate scheduling from
# https://github.com/ppwwyyxx/tensorpack/blob/master/examples/ResNet/cifar10-resnet.py#L155
# users could apply other scheduling.
num_batches_per_epoch = cifar10.Cifar10DataSet.num_examples_per_epoch(
'train') // (FLAGS.train_batch_size * FLAGS.num_workers)
boundaries = [
num_batches_per_epoch * x
for x in np.array([82, 123, 300], dtype=np.int64)
]
staged_lr = [FLAGS.learning_rate * x for x in [1, 0.1, 0.01, 0.002]]
learning_rate = tf.train.piecewise_constant(tf.train.get_global_step(),
boundaries, staged_lr)
# Create a nicely-named tensor for logging
learning_rate = tf.identity(learning_rate, name='learning_rate')
optimizer = tf.train.MomentumOptimizer(
learning_rate=learning_rate, momentum=momentum)
chief_hooks = []
if FLAGS.sync:
optimizer = tf.train.SyncReplicasOptimizer(
optimizer,
replicas_to_aggregate=FLAGS.num_workers)
sync_replicas_hook = optimizer.make_session_run_hook(True)
chief_hooks.append(sync_replicas_hook)
# Create single grouped train op
train_op = [
optimizer.apply_gradients(
gradvars, global_step=tf.train.get_global_step())
]
train_op.extend(update_ops)
train_op = tf.group(*train_op)
predictions = {
'classes':
tf.concat([p['classes'] for p in tower_preds], axis=0),
'probabilities':
tf.concat([p['probabilities'] for p in tower_preds], axis=0)
}
stacked_labels = tf.concat(labels, axis=0)
metrics = {
'accuracy': tf.metrics.accuracy(stacked_labels, predictions['classes'])
}
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
train_op=train_op,
training_chief_hooks=chief_hooks,
eval_metric_ops=metrics)
def _tower_fn(is_training, weight_decay, feature, label, tower_losses,
tower_gradvars, tower_preds, is_cpu):
"""Build computation tower for each device (CPU or GPU).
Args:
is_training: true if is training graph.
weight_decay: weight regularization strength, a float.
feature: a Tensor.
label: a Tensor.
tower_losses: a list to be appended with current tower's loss.
tower_gradvars: a list to be appended with current tower's gradients.
tower_preds: a list to be appended with current tower's predictions.
is_cpu: true if build tower on CPU.
"""
data_format = 'channels_last' if is_cpu else 'channels_first'
model = cifar10_model.ResNetCifar10(
FLAGS.num_layers, is_training=is_training, data_format=data_format)
logits = model.forward_pass(feature, input_data_format='channels_last')
tower_pred = {
'classes': tf.argmax(input=logits, axis=1),
'probabilities': tf.nn.softmax(logits)
}
tower_preds.append(tower_pred)
tower_loss = tf.losses.sparse_softmax_cross_entropy(
logits=logits, labels=label)
tower_loss = tf.reduce_mean(tower_loss)
model_params = tf.trainable_variables()
tower_loss += weight_decay * tf.add_n(
[tf.nn.l2_loss(v) for v in model_params])
tower_losses.append(tower_loss)
tower_grad = tf.gradients(tower_loss, model_params)
tower_gradvars.append(zip(tower_grad, model_params))
def input_fn(subset, num_shards):
"""Create input graph for model.
Args:
subset: one of 'train', 'validate' and 'eval'.
num_shards: num of towers participating in data-parallel training.
Returns:
two lists of tensors for features and labels, each of num_shards length.
"""
if subset == 'train':
batch_size = FLAGS.train_batch_size
elif subset == 'validate' or subset == 'eval':
batch_size = FLAGS.eval_batch_size
else:
raise ValueError('Subset must be one of \'train\''
', \'validate\' and \'eval\'')
with tf.device('/cpu:0'):
use_distortion = subset == 'train' and FLAGS.use_distortion_for_training
dataset = cifar10.Cifar10DataSet(FLAGS.data_dir, subset, use_distortion)
image_batch, label_batch = dataset.make_batch(batch_size)
if num_shards <= 1:
# No GPU available or only 1 GPU.
return [image_batch], [label_batch]
# Note that passing num=batch_size is safe here, even though
# dataset.batch(batch_size) can, in some cases, return fewer than batch_size
# examples. This is because it does so only when repeating for a limited
# number of epochs, but our dataset repeats forever.
image_batch = tf.unstack(image_batch, num=batch_size, axis=0)
label_batch = tf.unstack(label_batch, num=batch_size, axis=0)
feature_shards = [[] for i in range(num_shards)]
label_shards = [[] for i in range(num_shards)]
for i in xrange(batch_size):
idx = i % num_shards
feature_shards[idx].append(image_batch[i])
label_shards[idx].append(label_batch[i])
feature_shards = [tf.parallel_stack(x) for x in feature_shards]
label_shards = [tf.parallel_stack(x) for x in label_shards]
return feature_shards, label_shards
# create experiment
def get_experiment_fn(train_input_fn, eval_input_fn, train_steps, eval_steps,
train_hooks):
"""Returns an Experiment function.
Experiments perform training on several workers in parallel,
in other words experiments know how to invoke train and eval in a sensible
fashion for distributed training.
"""
def _experiment_fn(run_config, hparams):
"""Returns an Experiment."""
del hparams # Unused arg.
# Create estimator.
classifier = tf.estimator.Estimator(model_fn=_resnet_model_fn,
config=run_config)
# Create experiment.
experiment = tf.contrib.learn.Experiment(
classifier,
train_input_fn=train_input_fn,
eval_input_fn=eval_input_fn,
train_steps=train_steps,
eval_steps=eval_steps)
# Adding hooks to be used by the estimator on training mode.
experiment.extend_train_hooks(train_hooks)
return experiment
return _experiment_fn
def main(unused_argv):
# The env variable is on deprecation path, default is set to off.
os.environ['TF_SYNC_ON_FINISH'] = '0'
if FLAGS.num_gpus < 0:
raise ValueError(
'Invalid GPU count: \"num_gpus\" must be 0 or a positive integer.')
if FLAGS.num_gpus == 0 and not FLAGS.is_cpu_ps:
raise ValueError(
'No GPU available for use, must use CPU as parameter server.')
if (FLAGS.num_layers - 2) % 6 != 0:
raise ValueError('Invalid num_layers parameter.')
if FLAGS.num_gpus != 0 and FLAGS.train_batch_size % FLAGS.num_gpus != 0:
raise ValueError('train_batch_size must be multiple of num_gpus.')
if FLAGS.num_gpus != 0 and FLAGS.eval_batch_size % FLAGS.num_gpus != 0:
raise ValueError('eval_batch_size must be multiple of num_gpus.')
num_eval_examples = cifar10.Cifar10DataSet.num_examples_per_epoch('eval')
if num_eval_examples % FLAGS.eval_batch_size != 0:
raise ValueError('validation set size must be multiple of eval_batch_size')
train_input_fn = functools.partial(input_fn, subset='train',
num_shards=FLAGS.num_gpus)
eval_input_fn = functools.partial(input_fn, subset='eval',
num_shards=FLAGS.num_gpus)
train_steps = FLAGS.train_steps
eval_steps = num_eval_examples // FLAGS.eval_batch_size
# Session configuration.
sess_config = tf.ConfigProto()
sess_config.allow_soft_placement = True
sess_config.log_device_placement = FLAGS.log_device_placement
sess_config.intra_op_parallelism_threads = FLAGS.num_intra_threads
sess_config.inter_op_parallelism_threads = FLAGS.num_inter_threads
sess_config.gpu_options.force_gpu_compatible = FLAGS.force_gpu_compatible
# Hooks that add extra logging that is useful to see the loss more often in
# the console as well as examples per second.
tensors_to_log = {'learning_rate': 'learning_rate',
'loss': 'gradient_averaging/loss'}
logging_hook = tf.train.LoggingTensorHook(
tensors=tensors_to_log, every_n_iter=100)
examples_sec_hook = ExamplesPerSecondHook(
FLAGS.train_batch_size, every_n_steps=10)
hooks = [logging_hook, examples_sec_hook]
if FLAGS.run_experiment:
config = tf.contrib.learn.RunConfig(model_dir=FLAGS.model_dir)
config = config.replace(session_config=sess_config)
tf.contrib.learn.learn_runner.run(
get_experiment_fn(train_input_fn, eval_input_fn,
train_steps, eval_steps,
hooks), run_config=config)
else:
config = tf.estimator.RunConfig()
config = config.replace(session_config=sess_config)
classifier = tf.estimator.Estimator(
model_fn=_resnet_model_fn, model_dir=FLAGS.model_dir, config=config)
print('Starting to train...')
classifier.train(input_fn=train_input_fn,
steps=train_steps,
hooks=hooks)
print('Starting to evaluate...')
eval_results = classifier.evaluate(
input_fn=eval_input_fn,
steps=eval_steps)
print(eval_results)
if __name__ == '__main__':
tf.app.run()
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Model class for Cifar10 Dataset."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import model_base
class ResNetCifar10(model_base.ResNet):
"""Cifar10 model with ResNetV1 and basic residual block."""
def __init__(self, num_layers, is_training, data_format='channels_first'):
super(ResNetCifar10, self).__init__(is_training, data_format)
self.n = (num_layers - 2) // 6
# Add one in case label starts with 1. No impact if label starts with 0.
self.num_classes = 10 + 1
self.filters = [16, 16, 32, 64]
self.strides = [1, 2, 2]
def forward_pass(self, x, input_data_format='channels_last'):
"""Build the core model within the graph."""
if self._data_format != input_data_format:
if input_data_format == 'channels_last':
# Computation requires channels_first.
x = tf.transpose(x, [0, 3, 1, 2])
else:
# Computation requires channels_last.
x = tf.transpose(x, [0, 2, 3, 1])
# Image standardization.
x = x / 128 - 1
x = self._conv(x, 3, 16, 1)
x = self._batch_norm(x)
x = self._relu(x)
# Use basic (non-bottleneck) block and ResNet V1 (post-activation).
res_func = self._residual_v1
# 3 stages of block stacking.
for i in range(3):
with tf.name_scope('stage'):
for j in range(self.n):
if j == 0:
# First block in a stage, filters and strides may change.
x = res_func(x, 3, self.filters[i], self.filters[i + 1],
self.strides[i])
else:
# Following blocks in a stage, constant filters and unit stride.
x = res_func(x, 3, self.filters[i + 1], self.filters[i + 1], 1)
x = self._global_avg_pool(x)
x = self._fully_connected(x, self.num_classes)
return x
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Read CIFAR-10 data from pickled numpy arrays and write TFExamples.
Generates TFRecord files from the python version of the CIFAR-10 dataset
downloaded from https://www.cs.toronto.edu/~kriz/cifar.html.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import cPickle
import os
import tensorflow as tf
FLAGS = tf.flags.FLAGS
tf.flags.DEFINE_string('input_dir', '',
'Directory where CIFAR10 data is located.')
tf.flags.DEFINE_string('output_dir', '',
'Directory where TFRecords will be saved.'
'The TFRecords will have the same name as'
' the CIFAR10 inputs + .tfrecords.')
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def _bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[str(value)]))
def _get_file_names():
"""Returns the file names expected to exist in the input_dir."""
file_names = {}
file_names['train'] = ['data_batch_%d' % i for i in xrange(1, 5)]
file_names['validation'] = ['data_batch_5']
file_names['eval'] = ['test_batch']
return file_names
def read_pickle_from_file(filename):
with open(filename, 'r') as f:
data_dict = cPickle.load(f)
return data_dict
def convert_to_tfrecord(input_files, output_file):
"""Converts a file to tfrecords."""
print('Generating %s' % output_file)
record_writer = tf.python_io.TFRecordWriter(output_file)
for input_file in input_files:
data_dict = read_pickle_from_file(input_file)
data = data_dict['data']
labels = data_dict['labels']
num_entries_in_batch = len(labels)
for i in range(num_entries_in_batch):
example = tf.train.Example(
features=tf.train.Features(feature={
'image': _bytes_feature(data[i].tobytes()),
'label': _int64_feature(labels[i])
}))
record_writer.write(example.SerializeToString())
record_writer.close()
def main(unused_argv):
file_names = _get_file_names()
for mode, files in file_names.items():
input_files = [
os.path.join(FLAGS.input_dir, f) for f in files]
output_file = os.path.join(FLAGS.output_dir, mode + '.tfrecords')
# Convert to Examples and write the result to TFRecords.
convert_to_tfrecord(input_files, output_file)
print('Done!')
if __name__ == '__main__':
tf.app.run(main)
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""ResNet model.
Related papers:
https://arxiv.org/pdf/1603.05027v2.pdf
https://arxiv.org/pdf/1512.03385v1.pdf
https://arxiv.org/pdf/1605.07146v1.pdf
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
FLAGS = tf.flags.FLAGS
tf.flags.DEFINE_float('batch_norm_decay', 0.997, 'Decay for batch norm.')
tf.flags.DEFINE_float('batch_norm_epsilon', 1e-5, 'Epsilon for batch norm.')
class ResNet(object):
"""ResNet model."""
def __init__(self, is_training, data_format):
"""ResNet constructor.
Args:
is_training: if build training or inference model.
data_format: the data_format used during computation.
one of 'channels_first' or 'channels_last'.
"""
self._is_training = is_training
assert data_format in ('channels_first', 'channels_last')
self._data_format = data_format
def forward_pass(self, x):
raise NotImplementedError(
'forward_pass() is implemented in ResNet sub classes')
def _residual_v1(self,
x,
kernel_size,
in_filter,
out_filter,
stride,
activate_before_residual=False):
"""Residual unit with 2 sub layers, using Plan A for shortcut connection."""
del activate_before_residual
with tf.name_scope('residual_v1') as name_scope:
orig_x = x
x = self._conv(x, kernel_size, out_filter, stride)
x = self._batch_norm(x)
x = self._relu(x)
x = self._conv(x, kernel_size, out_filter, 1)
x = self._batch_norm(x)
if in_filter != out_filter:
orig_x = self._avg_pool(orig_x, stride, stride)
pad = (out_filter - in_filter) // 2
if self._data_format == 'channels_first':
orig_x = tf.pad(orig_x, [[0, 0], [pad, pad], [0, 0], [0, 0]])
else:
orig_x = tf.pad(orig_x, [[0, 0], [0, 0], [0, 0], [pad, pad]])
x = self._relu(tf.add(x, orig_x))
tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
return x
def _residual_v2(self,
x,
in_filter,
out_filter,
stride,
activate_before_residual=False):
"""Residual unit with 2 sub layers with preactivation, plan A shortcut."""
with tf.name_scope('residual_v2') as name_scope:
if activate_before_residual:
x = self._batch_norm(x)
x = self._relu(x)
orig_x = x
else:
orig_x = x
x = self._batch_norm(x)
x = self._relu(x)
x = self._conv(x, 3, out_filter, stride)
x = self._batch_norm(x)
x = self._relu(x)
x = self._conv(x, 3, out_filter, [1, 1, 1, 1])
if in_filter != out_filter:
pad = (out_filter - in_filter) // 2
orig_x = self._avg_pool(orig_x, stride, stride)
if self._data_format == 'channels_first':
orig_x = tf.pad(orig_x, [[0, 0], [pad, pad], [0, 0], [0, 0]])
else:
orig_x = tf.pad(orig_x, [[0, 0], [0, 0], [0, 0], [pad, pad]])
x = tf.add(x, orig_x)
tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
return x
def _bottleneck_residual_v2(self,
x,
in_filter,
out_filter,
stride,
activate_before_residual=False):
"""Bottleneck residual unit with 3 sub layers, plan B shortcut."""
with tf.name_scope('bottle_residual_v2') as name_scope:
if activate_before_residual:
x = self._batch_norm(x)
x = self._relu(x)
orig_x = x
else:
orig_x = x
x = self._batch_norm(x)
x = self._relu(x)
x = self._conv(x, 1, out_filter // 4, stride, is_atrous=True)
x = self._batch_norm(x)
x = self._relu(x)
# pad when stride isn't unit
x = self._conv(x, 3, out_filter // 4, 1, is_atrous=True)
x = self._batch_norm(x)
x = self._relu(x)
x = self._conv(x, 1, out_filter, 1, is_atrous=True)
if in_filter != out_filter:
orig_x = self._conv(orig_x, 1, out_filter, stride, is_atrous=True)
x = tf.add(x, orig_x)
tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
return x
def _conv(self, x, kernel_size, filters, strides, is_atrous=False):
"""Convolution."""
padding = 'SAME'
if not is_atrous and strides > 1:
pad = kernel_size - 1
pad_beg = pad // 2
pad_end = pad - pad_beg
if self._data_format == 'channels_first':
x = tf.pad(x, [[0, 0], [0, 0], [pad_beg, pad_end], [pad_beg, pad_end]])
else:
x = tf.pad(x, [[0, 0], [pad_beg, pad_end], [pad_beg, pad_end], [0, 0]])
padding = 'VALID'
return tf.layers.conv2d(
inputs=x,
kernel_size=kernel_size,
filters=filters,
strides=strides,
padding=padding,
use_bias=False,
data_format=self._data_format)
def _batch_norm(self, x):
if self._data_format == 'channels_first':
data_format = 'NCHW'
else:
data_format = 'NHWC'
return tf.contrib.layers.batch_norm(
x,
decay=FLAGS.batch_norm_decay,
center=True,
scale=True,
epsilon=FLAGS.batch_norm_epsilon,
is_training=self._is_training,
fused=True,
data_format=data_format)
def _relu(self, x):
return tf.nn.relu(x)
def _fully_connected(self, x, out_dim):
with tf.name_scope('fully_connected') as name_scope:
x = tf.layers.dense(x, out_dim)
tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
return x
def _avg_pool(self, x, pool_size, stride):
with tf.name_scope('avg_pool') as name_scope:
x = tf.layers.average_pooling2d(
x, pool_size, stride, 'SAME', data_format=self._data_format)
tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
return x
def _max_pool(self, x, pool_size, stride):
with tf.name_scope('max_pool') as name_scope:
x = tf.layers.max_pooling2d(
x, pool_size, stride, padding='SAME', data_format=self._data_format)
tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
return x
def _global_avg_pool(self, x):
with tf.name_scope('global_avg_pool') as name_scope:
assert x.get_shape().ndims == 4
if self._data_format == 'channels_first':
x = tf.reduce_mean(x, [2, 3])
else:
x = tf.reduce_mean(x, [1, 2])
tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
return x
......@@ -28,10 +28,7 @@ import tensorflow as tf
def _read_words(filename):
with tf.gfile.GFile(filename, "r") as f:
if sys.version_info[0] >= 3:
return f.read().replace("\n", "<eos>").split()
else:
return f.read().decode("utf-8").replace("\n", "<eos>").split()
return f.read().decode("utf-8").replace("\n", "<eos>").split()
def _build_vocab(filename):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment