"docs/vscode:/vscode.git/clone" did not exist on "4625f04bc04bed43c1ba9b821149af121b5965ea"
Commit fcbf2288 authored by A. Unique TensorFlower's avatar A. Unique TensorFlower
Browse files

Merge pull request #9836 from hylee817:yonsei-yt8m

PiperOrigin-RevId: 365507990
parents 506492d5 136cb326
# YouTube-8M Tensorflow Starter Code
This repo contains starter code (written in TensorFlow 2.x) for training and
evaluating machine learning models over the [YouTube-8M][1] dataset.
This is the Tensorflow2 version of the original starter code:
[YouTube-8M Tensorflow Starter Code][2]
which was tested on Tensorflow 1.14. (The code gives an end-to-end
working example for reading the dataset, training a TensorFlow model,
and evaluating the performance of the model). Functionalities are maintained,
while necessary migrations were done to accomodate running on tf2 environment.
### Requirements
The starter code requires Tensorflow. If you haven't installed it yet, follow
the instructions on [tensorflow.org][3].
This code has been tested with Tensorflow 2.4.0. Going forward,
we will continue to target the latest released version of Tensorflow.
Please verify that you have Python 3.6+ and Tensorflow 2.4.0 or higher
installed by running the following commands:
```sh
python --version
python -c 'import tensorflow as tf; print(tf.__version__)'
```
Refer to the [instructions here][4]
for using the model in this repo. Make sure to add the models folder to your
Python path.
[1]: https://research.google.com/youtube8m/
[2]: https://github.com/google/youtube-8m
[3]: https://www.tensorflow.org/install/
[4]:
https://github.com/tensorflow/models/tree/master/official#running-the-models
#### Using GPUs
If your Tensorflow installation has GPU support
(which should have been provided with `pip install tensorflow` for any version
above Tensorflow 1.15), this code will make use of all of your compatible GPUs.
You can verify your installation by running
```
tf.config.list_physical_devices('GPU')
```
This will print out something like the following for each of your compatible
GPUs.
```
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1720]
Found device 0 with properties:
pciBusID: 0000:00:04.0 name: Tesla P100-PCIE-16GB computeCapability: 6.0
coreClock: 1.3285GHz coreCount: 56 deviceMemorySize: 15.90GiB
deviceMemoryBandwidth: 681.88GiB/s
...
```
### Train and inference
Train video-level model on frame-level features and inference at segment-level.
#### Train using the config file.
Create a YAML or JSON file for specifying the parameters to be overridden.
Working examples can be found in yt8m/experiments directory.
```sh
task:
model:
cluster_size: 2048
hidden_size: 2048
add_batch_norm: true
sample_random_frames: true
is_training: true
activation: "relu6"
pooling_method: "average"
yt8m_agg_classifier_model: "MoeModel"
train_data:
segment_labels: false
temporal_stride: 1
num_devices: 1
input_path: 'gs://youtube8m-ml/2/frame/train/train*.tfrecord'
num_examples: 3888919
...
```
The code can be run in different modes: `train / train_and_eval / eval`.
Run `yt8m_train.py` and specify which mode you wish to execute.
Training is done using frame-level features with video-level labels,
while inference can be done at segment-level.
Setting `segment_labels=True` in your configuration forces
the segment level labels to be used in the evaluation/validation phrase.
If set to `False`, video level labels are used for inference.
The following commands will train a model on Google Cloud over frame-level
features.
```bash
python3 yt8m_train.py --mode='train' \
--experiment='yt8m_experiment' \
--model_dir=$MODEL_DIR \
--config_file=$CONFIG_FILE
```
In order to run evaluation after each training epoch,
set the mode to `train_and_eval`.
Paths to both train and validation dataset on Google Cloud are set as
train: `input_path=gs://youtube8m-ml/2/frame/train/train*.tfrecord`
validation:`input_path=gs://youtube8m-ml/3/frame/validate/validate*.tfrecord`
as default.
```bash
python3 yt8m_train.py --mode='train_and_eval' \
--experiment='yt8m_experiment' \
--model_dir=$MODEL_DIR \
--config_file=$CONFIG_FILE \
```
Running on evaluation mode loads saved checkpoint from specified path
and runs inference task.
```bash
python3 yt8m_train.py --mode='eval' \
--experiment='yt8m_experiment' \
--model_dir=$MODEL_DIR \
--config_file=$CONFIG_FILE
```
Once these job starts executing you will see outputs similar to the following:
```
train | step: 15190 | training until step 22785...
train | step: 22785 | steps/sec: 0.4 | output:
{'learning_rate': 0.0049961056,
'model_loss': 0.0012011167,
'total_loss': 0.0013538885,
'training_loss': 0.0013538885}
```
and the following for evaluation:
```
eval | step: 22785 | running 2172 steps of evaluation...
eval | step: 22785 | eval time: 1663.4 | output:
{'avg_hit_at_one': 0.5572835238737471,
'avg_perr': 0.557277077999072,
'gap': 0.768825760186494,
'map': 0.19354554465020685,
'model_loss': 0.0005052475,
'total_loss': 0.0006564412,
'validation_loss': 0.0006564412}
```
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Configs package definition."""
from official.vision.beta.projects.yt8m.configs import yt8m
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Video classification configuration definition."""
from typing import Optional, Tuple
from absl import flags
import dataclasses
from official.core import config_definitions as cfg
from official.core import exp_factory
from official.modeling import hyperparams
from official.modeling import optimization
FLAGS = flags.FLAGS
YT8M_TRAIN_EXAMPLES = 3888919
YT8M_VAL_EXAMPLES = 1112356
# 2/frame -> frame level
# 3/frame -> segment level
YT8M_TRAIN_PATH = 'gs://youtube8m-ml/2/frame/train/train*.tfrecord'
YT8M_VAL_PATH = 'gs://youtube8m-ml/3/frame/validate/validate*.tfrecord'
@dataclasses.dataclass
class DataConfig(cfg.DataConfig):
"""The base configuration for building datasets."""
name: Optional[str] = 'yt8m'
split: Optional[str] = None
feature_sizes: Tuple[int, ...] = (1024, 128)
feature_names: Tuple[str, ...] = ('rgb', 'audio')
segment_size: int = 1
segment_labels: bool = False
temporal_stride: int = 1
max_frames: int = 300
num_frames: int = 300 # set smaller to allow random sample (Parser)
num_classes: int = 3862
num_devices: int = 1
input_path: str = ''
is_training: bool = True
random_seed: int = 123
num_examples: int = -1
def yt8m(is_training):
"""YT8M dataset configs."""
return DataConfig(
num_frames=30,
temporal_stride=1,
segment_labels=False,
segment_size=5,
is_training=is_training,
split='train' if is_training else 'valid',
num_examples=YT8M_TRAIN_EXAMPLES if is_training else YT8M_VAL_EXAMPLES,
input_path=YT8M_TRAIN_PATH if is_training else YT8M_VAL_PATH)
@dataclasses.dataclass
class YT8MModel(hyperparams.Config):
"""The model config."""
cluster_size: int = 2048
hidden_size: int = 2048
add_batch_norm: bool = True
sample_random_frames: bool = True
is_training: bool = True
activation: str = 'relu6'
pooling_method: str = 'average'
yt8m_agg_classifier_model: str = 'MoeModel'
@dataclasses.dataclass
class Losses(hyperparams.Config):
name: str = 'binary_crossentropy'
from_logits: bool = False
label_smoothing: float = 0.0
@dataclasses.dataclass
class YT8MTask(cfg.TaskConfig):
"""The task config."""
model: YT8MModel = YT8MModel()
train_data: DataConfig = yt8m(is_training=True)
validation_data: DataConfig = yt8m(is_training=False)
losses: Losses = Losses()
gradient_clip_norm: float = 1.0
num_readers: int = 8
top_k: int = 20
top_n: Optional[int] = None
def add_trainer(
experiment: cfg.ExperimentConfig,
train_batch_size: int,
eval_batch_size: int,
learning_rate: float = 0.005,
train_epochs: int = 44,
):
"""Add and config a trainer to the experiment config."""
if YT8M_TRAIN_EXAMPLES <= 0:
raise ValueError('Wrong train dataset size {!r}'.format(
experiment.task.train_data))
if YT8M_VAL_EXAMPLES <= 0:
raise ValueError('Wrong validation dataset size {!r}'.format(
experiment.task.validation_data))
experiment.task.train_data.global_batch_size = train_batch_size
experiment.task.validation_data.global_batch_size = eval_batch_size
steps_per_epoch = YT8M_TRAIN_EXAMPLES // train_batch_size
experiment.trainer = cfg.TrainerConfig(
steps_per_loop=steps_per_epoch,
summary_interval=steps_per_epoch,
checkpoint_interval=steps_per_epoch,
train_steps=train_epochs * steps_per_epoch,
validation_steps=YT8M_VAL_EXAMPLES // eval_batch_size,
validation_interval=steps_per_epoch,
optimizer_config=optimization.OptimizationConfig({
'optimizer': {
'type': 'adam',
'adam': {}
},
'learning_rate': {
'type': 'exponential',
'exponential': {
'initial_learning_rate': learning_rate,
'decay_rate': 0.95,
'decay_steps': 1500000,
}
},
}))
return experiment
@exp_factory.register_config_factory('yt8m_experiment')
def yt8m_experiment() -> cfg.ExperimentConfig:
"""Video classification general."""
exp_config = cfg.ExperimentConfig(
runtime=cfg.RuntimeConfig(mixed_precision_dtype='bfloat16'),
task=YT8MTask(),
trainer=cfg.TrainerConfig(),
restrictions=[
'task.train_data.is_training != None',
'task.validation_data.is_training != None',
'task.train_data.num_classes == task.validation_data.num_classes',
'task.train_data.feature_sizes != None',
'task.train_data.feature_names != None',
])
return add_trainer(exp_config, train_batch_size=512, eval_batch_size=512)
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains a collection of util functions for training and evaluating."""
from absl import logging
import numpy
import tensorflow as tf
def Dequantize(feat_vector, max_quantized_value=2, min_quantized_value=-2):
"""Dequantize the feature from the byte format to the float format.
Args:
feat_vector: the input 1-d vector.
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.
Returns:
A float vector which has the same shape as feat_vector.
"""
assert max_quantized_value > min_quantized_value
quantized_range = max_quantized_value - min_quantized_value
scalar = quantized_range / 255.0
bias = (quantized_range / 512.0) + min_quantized_value
return feat_vector * scalar + bias
def MakeSummary(name, value):
"""Creates a tf.Summary proto with the given name and value."""
summary = tf.Summary()
val = summary.value.add()
val.tag = str(name)
val.simple_value = float(value)
return summary
def AddGlobalStepSummary(summary_writer,
global_step_val,
global_step_info_dict,
summary_scope="Eval"):
"""Add the global_step summary to the Tensorboard.
Args:
summary_writer: Tensorflow summary_writer.
global_step_val: a int value of the global step.
global_step_info_dict: a dictionary of the evaluation metrics calculated for
a mini-batch.
summary_scope: Train or Eval.
Returns:
A string of this global_step summary
"""
this_hit_at_one = global_step_info_dict["hit_at_one"]
this_perr = global_step_info_dict["perr"]
this_loss = global_step_info_dict["loss"]
examples_per_second = global_step_info_dict.get("examples_per_second", -1)
summary_writer.add_summary(
MakeSummary("GlobalStep/" + summary_scope + "_Hit@1", this_hit_at_one),
global_step_val)
summary_writer.add_summary(
MakeSummary("GlobalStep/" + summary_scope + "_Perr", this_perr),
global_step_val)
summary_writer.add_summary(
MakeSummary("GlobalStep/" + summary_scope + "_Loss", this_loss),
global_step_val)
if examples_per_second != -1:
summary_writer.add_summary(
MakeSummary("GlobalStep/" + summary_scope + "_Example_Second",
examples_per_second), global_step_val)
summary_writer.flush()
info = (
"global_step {0} | Batch Hit@1: {1:.3f} | Batch PERR: {2:.3f} | Batch "
"Loss: {3:.3f} | Examples_per_sec: {4:.3f}").format(
global_step_val, this_hit_at_one, this_perr, this_loss,
examples_per_second)
return info
def AddEpochSummary(summary_writer,
global_step_val,
epoch_info_dict,
summary_scope="Eval"):
"""Add the epoch summary to the Tensorboard.
Args:
summary_writer: Tensorflow summary_writer.
global_step_val: a int value of the global step.
epoch_info_dict: a dictionary of the evaluation metrics calculated for the
whole epoch.
summary_scope: Train or Eval.
Returns:
A string of this global_step summary
"""
epoch_id = epoch_info_dict["epoch_id"]
avg_hit_at_one = epoch_info_dict["avg_hit_at_one"]
avg_perr = epoch_info_dict["avg_perr"]
avg_loss = epoch_info_dict["avg_loss"]
aps = epoch_info_dict["aps"]
gap = epoch_info_dict["gap"]
mean_ap = numpy.mean(aps)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_Avg_Hit@1", avg_hit_at_one),
global_step_val)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_Avg_Perr", avg_perr),
global_step_val)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_Avg_Loss", avg_loss),
global_step_val)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_MAP", mean_ap), global_step_val)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_GAP", gap), global_step_val)
summary_writer.flush()
info = ("epoch/eval number {0} | Avg_Hit@1: {1:.3f} | Avg_PERR: {2:.3f} "
"| MAP: {3:.3f} | GAP: {4:.3f} | Avg_Loss: {5:3f} | num_classes: {6}"
).format(epoch_id, avg_hit_at_one, avg_perr, mean_ap, gap, avg_loss,
len(aps))
return info
def GetListOfFeatureNamesAndSizes(feature_names, feature_sizes):
"""Extract the list of feature names and the dimensionality.
Args:
feature_names: string containing comma separated list of feature names
feature_sizes: string containing comma separated list of feature sizes
Returns:
List of the feature names and list of the dimensionality of each feature.
Elements in the first/second list are strings/integers.
"""
list_of_feature_names = [
feature_names.strip() for feature_names in feature_names.split(",")
]
list_of_feature_sizes = [
int(feature_sizes) for feature_sizes in feature_sizes.split(",")
]
if len(list_of_feature_names) != len(list_of_feature_sizes):
logging.error(
"length of the feature names (=%r) != length of feature "
"sizes (=%r)", str(len(list_of_feature_names)),
str(len(list_of_feature_sizes)))
return list_of_feature_names, list_of_feature_sizes
def ClipGradientNorms(gradients_to_variables, max_norm):
"""Clips the gradients by the given value.
Args:
gradients_to_variables: A list of gradient to variable pairs (tuples).
max_norm: the maximum norm value.
Returns:
A list of clipped gradient to variable pairs.
"""
clipped_grads_and_vars = []
for grad, var in gradients_to_variables:
if grad is not None:
if isinstance(grad, tf.IndexedSlices):
tmp = tf.clip_by_norm(grad.values, max_norm)
grad = tf.IndexedSlices(tmp, grad.indices, grad.dense_shape)
else:
grad = tf.clip_by_norm(grad, max_norm)
clipped_grads_and_vars.append((grad, var))
return clipped_grads_and_vars
def CombineGradients(tower_grads):
"""Calculate the combined gradient for each shared variable across all towers.
Note that this function provides a synchronization point across all towers.
Args:
tower_grads: List of lists of (gradient, variable) tuples. The outer list is
over individual gradients. The inner list is over the gradient calculation
for each tower.
Returns:
List of pairs of (gradient, variable) where the gradient has been summed
across all towers.
"""
filtered_grads = [
[x for x in grad_list if x[0] is not None] for grad_list in tower_grads
]
final_grads = []
for i in range(len(filtered_grads[0])):
grads = [filtered_grads[t][i] for t in range(len(filtered_grads))]
grad = tf.stack([x[0] for x in grads], 0)
grad = tf.reduce_sum(grad, 0)
final_grads.append((
grad,
filtered_grads[0][i][1],
))
return final_grads
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""class YT8MFrameFeatureReader(BaseReader).
Reads TFRecords of SequenceExamples.
The TFRecords must contain SequenceExamples with the sparse in64 'labels'
context feature and a fixed length byte-quantized feature vector, obtained
from the features in 'feature_names'. The quantized features will be mapped
back into a range between min_quantized_value and max_quantized_value.
link for details: https://research.google.com/youtube8m/download.html
"""
from typing import Dict
import tensorflow as tf
from official.vision.beta.configs import video_classification as exp_cfg
from official.vision.beta.dataloaders import decoder
from official.vision.beta.dataloaders import parser
from official.vision.beta.projects.yt8m.dataloaders import utils
def resize_axis(tensor, axis, new_size, fill_value=0):
"""Truncates or pads a tensor to new_size on on a given axis.
Truncate or extend tensor such that tensor.shape[axis] == new_size. If the
size increases, the padding will be performed at the end, using fill_value.
Args:
tensor: The tensor to be resized.
axis: An integer representing the dimension to be sliced.
new_size: An integer or 0d tensor representing the new value for
tensor.shape[axis].
fill_value: Value to use to fill any new entries in the tensor. Will be cast
to the type of tensor.
Returns:
The resized tensor.
"""
tensor = tf.convert_to_tensor(tensor)
shape = tf.unstack(tf.shape(tensor))
pad_shape = shape[:]
pad_shape[axis] = tf.maximum(0, new_size - shape[axis])
shape[axis] = tf.minimum(shape[axis], new_size)
shape = tf.stack(shape)
resized = tf.concat([
tf.slice(tensor, tf.zeros_like(shape), shape),
tf.fill(tf.stack(pad_shape), tf.cast(fill_value, tensor.dtype))
], axis)
# Update shape.
new_shape = tensor.shape.as_list() # A copy is being made.
new_shape[axis] = new_size
resized = tf.ensure_shape(resized, new_shape)
return resized
def _process_segment_and_label(video_matrix, num_frames, contexts,
segment_labels, segment_size,
num_classes) -> Dict[str, tf.Tensor]:
"""Processes a batched Tensor of frames.
The same parameters used in process should be used here.
Args:
video_matrix: different features concatenated into one matrix
num_frames: Number of frames per subclip.
contexts: context information extracted from decoder
segment_labels: if we read segment labels instead.
segment_size: the segment_size used for reading segments.
num_classes: a positive integer for the number of classes.
Returns:
output: dictionary containing batch information
"""
# Partition frame-level feature matrix to segment-level feature matrix.
if segment_labels:
start_times = contexts["segment_start_times"].values
# Here we assume all the segments that started at the same start time has
# the same segment_size.
uniq_start_times, seg_idxs = tf.unique(start_times, out_idx=tf.dtypes.int64)
# Range gather matrix, e.g., [[0,1,2],[1,2,3]] for segment_size == 3.
range_mtx = tf.expand_dims(
uniq_start_times, axis=-1) + tf.expand_dims(
tf.range(0, segment_size, dtype=tf.int64), axis=0)
# Shape: [num_segment, segment_size, feature_dim].
batch_video_matrix = tf.gather_nd(video_matrix,
tf.expand_dims(range_mtx, axis=-1))
num_segment = tf.shape(batch_video_matrix)[0]
batch_video_ids = tf.reshape(
tf.tile([contexts["id"]], [num_segment]), (num_segment,))
batch_frames = tf.reshape(
tf.tile([segment_size], [num_segment]), (num_segment,))
batch_frames = tf.cast(tf.expand_dims(batch_frames, 1), tf.float32)
# For segment labels, all labels are not exhaustively rated. So we only
# evaluate the rated labels.
# Label indices for each segment, shape: [num_segment, 2].
label_indices = tf.stack([seg_idxs, contexts["segment_labels"].values],
axis=-1)
label_values = contexts["segment_scores"].values
sparse_labels = tf.sparse.SparseTensor(label_indices, label_values,
(num_segment, num_classes))
batch_labels = tf.sparse.to_dense(sparse_labels, validate_indices=False)
sparse_label_weights = tf.sparse.SparseTensor(
label_indices, tf.ones_like(label_values, dtype=tf.float32),
(num_segment, num_classes))
batch_label_weights = tf.sparse.to_dense(
sparse_label_weights, validate_indices=False)
# output_dict = utils.get_segments(batch_video_matrix, batch_frames, 5)
else:
# Process video-level labels.
label_indices = contexts["labels"].values
sparse_labels = tf.sparse.SparseTensor(
tf.expand_dims(label_indices, axis=-1),
tf.ones_like(contexts["labels"].values, dtype=tf.bool), (num_classes,))
labels = tf.sparse.to_dense(
sparse_labels, default_value=False, validate_indices=False)
# convert to batch format.
batch_video_ids = tf.expand_dims(contexts["id"], 0)
batch_video_matrix = tf.expand_dims(video_matrix, 0)
batch_labels = tf.expand_dims(labels, 0)
batch_frames = tf.expand_dims(num_frames, 0)
batch_label_weights = None
output_dict = {
"video_ids": batch_video_ids,
"video_matrix": batch_video_matrix,
"labels": batch_labels,
"num_frames": batch_frames,
}
if batch_label_weights is not None:
output_dict["label_weights"] = batch_label_weights
return output_dict
def _get_video_matrix(features, feature_size, max_frames, max_quantized_value,
min_quantized_value):
"""Decodes features from an input string and quantizes it.
Args:
features: raw feature values
feature_size: length of each frame feature vector
max_frames: number of frames (rows) in the output feature_matrix
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.
Returns:
feature_matrix: matrix of all frame-features
num_frames: number of frames in the sequence
"""
decoded_features = tf.reshape(
tf.cast(tf.io.decode_raw(features, tf.uint8), tf.float32),
[-1, feature_size])
num_frames = tf.math.minimum(tf.shape(decoded_features)[0], max_frames)
feature_matrix = utils.Dequantize(decoded_features, max_quantized_value,
min_quantized_value)
feature_matrix = resize_axis(feature_matrix, 0, max_frames)
return feature_matrix, num_frames
def _concat_features(features, feature_names, feature_sizes, max_frames,
max_quantized_value, min_quantized_value):
"""Loads (potentially) different types of features and concatenates them.
Args:
features: raw feature values
feature_names: list of feature names
feature_sizes: list of features sizes
max_frames: number of frames in the sequence
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.
Returns:
video_matrix: different features concatenated into one matrix
num_frames: the number of frames in the video
"""
num_features = len(feature_names)
assert num_features > 0, "No feature selected: feature_names is empty!"
assert len(feature_names) == len(feature_sizes), (
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(feature_names), len(feature_sizes)))
num_frames = -1 # the number of frames in the video
feature_matrices = [None] * num_features # an array of different features
for feature_index in range(num_features):
feature_matrix, num_frames_in_this_feature = _get_video_matrix(
features[feature_names[feature_index]], feature_sizes[feature_index],
max_frames, max_quantized_value, min_quantized_value)
if num_frames == -1:
num_frames = num_frames_in_this_feature
feature_matrices[feature_index] = feature_matrix
# cap the number of frames at self.max_frames
num_frames = tf.minimum(num_frames, max_frames)
# concatenate different features
video_matrix = tf.concat(feature_matrices, 1)
return video_matrix, num_frames
class Decoder(decoder.Decoder):
"""A tf.Example decoder for classification task."""
def __init__(
self,
input_params: exp_cfg.DataConfig,
):
self._segment_labels = input_params.segment_labels
self._feature_names = input_params.feature_names
self._context_features = {
"id": tf.io.FixedLenFeature([], tf.string),
}
if self._segment_labels:
self._context_features.update({
# There is no need to read end-time given we always assume the segment
# has the same size.
"segment_labels": tf.io.VarLenFeature(tf.int64),
"segment_start_times": tf.io.VarLenFeature(tf.int64),
"segment_scores": tf.io.VarLenFeature(tf.float32)
})
else:
self._context_features.update({"labels": tf.io.VarLenFeature(tf.int64)})
self._sequence_features = {
feature_name: tf.io.FixedLenSequenceFeature([], dtype=tf.string)
for feature_name in self._feature_names
}
def decode(self, serialized_example):
"""Parses a single tf.Example into image and label tensors."""
contexts, features = tf.io.parse_single_sequence_example(
serialized_example,
context_features=self._context_features,
sequence_features=self._sequence_features)
return {"contexts": contexts, "features": features}
class Parser(parser.Parser):
"""Parses a video and label dataset.
takes the decoded raw tensors dict
and parse them into a dictionary of tensors
that can be consumed by the model.
It will be executed after decoder.
"""
def __init__(
self,
input_params: exp_cfg.DataConfig,
max_quantized_value=2,
min_quantized_value=-2,
):
self._num_classes = input_params.num_classes
self._segment_size = input_params.segment_size
self._segment_labels = input_params.segment_labels
self._feature_names = input_params.feature_names
self._feature_sizes = input_params.feature_sizes
self.stride = input_params.temporal_stride
self._max_frames = input_params.max_frames
self._num_frames = input_params.num_frames
self._seed = input_params.random_seed
self._max_quantized_value = max_quantized_value
self._min_quantized_value = min_quantized_value
def _parse_train_data(self, decoded_tensors):
"""Parses data for training."""
# loads (potentially) different types of features and concatenates them
self.video_matrix, self.num_frames = _concat_features(
decoded_tensors["features"], self._feature_names, self._feature_sizes,
self._max_frames, self._max_quantized_value, self._min_quantized_value)
output_dict = _process_segment_and_label(self.video_matrix, self.num_frames,
decoded_tensors["contexts"],
self._segment_labels,
self._segment_size,
self._num_classes)
return output_dict
def _parse_eval_data(self, decoded_tensors):
"""Parses data for evaluation."""
# loads (potentially) different types of features and concatenates them
self.video_matrix, self.num_frames = _concat_features(
decoded_tensors["features"], self._feature_names, self._feature_sizes,
self._max_frames, self._max_quantized_value, self._min_quantized_value)
output_dict = _process_segment_and_label(self.video_matrix, self.num_frames,
decoded_tensors["contexts"],
self._segment_labels,
self._segment_size,
self._num_classes)
return output_dict # batched
def parse_fn(self, is_training):
"""Returns a parse fn that reads and parses raw tensors from the decoder.
Args:
is_training: a `bool` to indicate whether it is in training mode.
Returns:
parse: a `callable` that takes the serialized example and generate the
images, labels tuple where labels is a dict of Tensors that contains
labels.
"""
def parse(decoded_tensors):
"""Parses the serialized example data."""
if is_training:
return self._parse_train_data(decoded_tensors)
else:
return self._parse_eval_data(decoded_tensors)
return parse
class PostBatchProcessor():
"""Processes a video and label dataset which is batched."""
def __init__(self, input_params: exp_cfg.DataConfig):
self.segment_labels = input_params.segment_labels
self.num_classes = input_params.num_classes
self.segment_size = input_params.segment_size
def post_fn(self, batched_tensors):
"""Processes batched Tensors."""
video_ids = batched_tensors["video_ids"]
video_matrix = batched_tensors["video_matrix"]
labels = batched_tensors["labels"]
num_frames = batched_tensors["num_frames"]
label_weights = None
if self.segment_labels:
# [batch x num_segment x segment_size x num_features]
# -> [batch * num_segment x segment_size x num_features]
video_ids = tf.reshape(video_ids, [-1])
video_matrix = tf.reshape(video_matrix, [-1, self.segment_size, 1152])
labels = tf.reshape(labels, [-1, self.num_classes])
num_frames = tf.reshape(num_frames, [-1, 1])
label_weights = tf.reshape(batched_tensors["label_weights"],
[-1, self.num_classes])
else:
video_matrix = tf.squeeze(video_matrix)
labels = tf.squeeze(labels)
batched_tensors = {
"video_ids": video_ids,
"video_matrix": video_matrix,
"labels": labels,
"num_frames": num_frames,
}
if label_weights is not None:
batched_tensors["label_weights"] = label_weights
return batched_tensors
class TransformBatcher():
"""Performs manual batching on input dataset."""
def __init__(self, input_params: exp_cfg.DataConfig):
self._segment_labels = input_params.segment_labels
self._global_batch_size = input_params.global_batch_size
self._is_training = input_params.is_training
def batch_fn(self, dataset, input_context):
"""Add padding when segment_labels is true."""
per_replica_batch_size = input_context.get_per_replica_batch_size(
self._global_batch_size) if input_context else self._global_batch_size
if not self._segment_labels:
dataset = dataset.batch(per_replica_batch_size, drop_remainder=True)
else:
# add padding
pad_shapes = {
"video_ids": [None],
"video_matrix": [None, None, None],
"labels": [None, None],
"num_frames": [None, None],
"label_weights": [None, None]
}
pad_values = {
"video_ids": None,
"video_matrix": 0.0,
"labels": -1.0,
"num_frames": 0.0,
"label_weights": 0.0
}
dataset = dataset.padded_batch(
per_replica_batch_size,
padded_shapes=pad_shapes,
drop_remainder=True,
padding_values=pad_values)
return dataset
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Calculate or keep track of the interpolated average precision.
It provides an interface for calculating interpolated average precision for an
entire list or the top-n ranked items. For the definition of the
(non-)interpolated average precision:
http://trec.nist.gov/pubs/trec15/appendices/CE.MEASURES06.pdf
Example usages:
1) Use it as a static function call to directly calculate average precision for
a short ranked list in the memory.
```
import random
p = np.array([random.random() for _ in xrange(10)])
a = np.array([random.choice([0, 1]) for _ in xrange(10)])
ap = average_precision_calculator.AveragePrecisionCalculator.ap(p, a)
```
2) Use it as an object for long ranked list that cannot be stored in memory or
the case where partial predictions can be observed at a time (Tensorflow
predictions). In this case, we first call the function accumulate many times
to process parts of the ranked list. After processing all the parts, we call
peek_interpolated_ap_at_n.
```
p1 = np.array([random.random() for _ in xrange(5)])
a1 = np.array([random.choice([0, 1]) for _ in xrange(5)])
p2 = np.array([random.random() for _ in xrange(5)])
a2 = np.array([random.choice([0, 1]) for _ in xrange(5)])
# interpolated average precision at 10 using 1000 break points
calculator = average_precision_calculator.AveragePrecisionCalculator(10)
calculator.accumulate(p1, a1)
calculator.accumulate(p2, a2)
ap3 = calculator.peek_ap_at_n()
```
"""
import heapq
import numbers
import random
import numpy
class AveragePrecisionCalculator(object):
"""Calculate the average precision and average precision at n."""
def __init__(self, top_n=None):
"""Construct an AveragePrecisionCalculator to calculate average precision.
This class is used to calculate the average precision for a single label.
Args:
top_n: A positive Integer specifying the average precision at n, or None
to use all provided data points.
Raises:
ValueError: An error occurred when the top_n is not a positive integer.
"""
if not ((isinstance(top_n, int) and top_n >= 0) or top_n is None):
raise ValueError("top_n must be a positive integer or None.")
self._top_n = top_n # average precision at n
self._total_positives = 0 # total number of positives have seen
self._heap = [] # max heap of (prediction, actual)
@property
def heap_size(self):
"""Gets the heap size maintained in the class."""
return len(self._heap)
@property
def num_accumulated_positives(self):
"""Gets the number of positive samples that have been accumulated."""
return self._total_positives
def accumulate(self, predictions, actuals, num_positives=None):
"""Accumulate the predictions and their ground truth labels.
After the function call, we may call peek_ap_at_n to actually calculate
the average precision.
Note predictions and actuals must have the same shape.
Args:
predictions: a list storing the prediction scores.
actuals: a list storing the ground truth labels. Any value larger than 0
will be treated as positives, otherwise as negatives. num_positives = If
the 'predictions' and 'actuals' inputs aren't complete, then it's
possible some true positives were missed in them. In that case, you can
provide 'num_positives' in order to accurately track recall.
num_positives: number of positive examples.
Raises:
ValueError: An error occurred when the format of the input is not the
numpy 1-D array or the shape of predictions and actuals does not match.
"""
if len(predictions) != len(actuals):
raise ValueError("the shape of predictions and actuals does not match.")
if num_positives is not None:
if not isinstance(num_positives, numbers.Number) or num_positives < 0:
raise ValueError(
"'num_positives' was provided but it was a negative number.")
if num_positives is not None:
self._total_positives += num_positives
else:
self._total_positives += numpy.size(
numpy.where(numpy.array(actuals) > 1e-5))
topk = self._top_n
heap = self._heap
for i in range(numpy.size(predictions)):
if topk is None or len(heap) < topk:
heapq.heappush(heap, (predictions[i], actuals[i]))
else:
if predictions[i] > heap[0][0]: # heap[0] is the smallest
heapq.heappop(heap)
heapq.heappush(heap, (predictions[i], actuals[i]))
def clear(self):
"""Clear the accumulated predictions."""
self._heap = []
self._total_positives = 0
def peek_ap_at_n(self):
"""Peek the non-interpolated average precision at n.
Returns:
The non-interpolated average precision at n (default 0).
If n is larger than the length of the ranked list,
the average precision will be returned.
"""
if self.heap_size <= 0:
return 0
predlists = numpy.array(list(zip(*self._heap)))
ap = self.ap_at_n(
predlists[0],
predlists[1],
n=self._top_n,
total_num_positives=self._total_positives)
return ap
@staticmethod
def ap(predictions, actuals):
"""Calculate the non-interpolated average precision.
Args:
predictions: a numpy 1-D array storing the sparse prediction scores.
actuals: a numpy 1-D array storing the ground truth labels. Any value
larger than 0 will be treated as positives, otherwise as negatives.
Returns:
The non-interpolated average precision at n.
If n is larger than the length of the ranked list,
the average precision will be returned.
Raises:
ValueError: An error occurred when the format of the input is not the
numpy 1-D array or the shape of predictions and actuals does not match.
"""
return AveragePrecisionCalculator.ap_at_n(predictions, actuals, n=None)
@staticmethod
def ap_at_n(predictions, actuals, n=20, total_num_positives=None):
"""Calculate the non-interpolated average precision.
Args:
predictions: a numpy 1-D array storing the sparse prediction scores.
actuals: a numpy 1-D array storing the ground truth labels. Any value
larger than 0 will be treated as positives, otherwise as negatives.
n: the top n items to be considered in ap@n.
total_num_positives : (optionally) you can specify the number of total
positive in the list. If specified, it will be used in calculation.
Returns:
The non-interpolated average precision at n.
If n is larger than the length of the ranked list,
the average precision will be returned.
Raises:
ValueError: An error occurred when
1) the format of the input is not the numpy 1-D array;
2) the shape of predictions and actuals does not match;
3) the input n is not a positive integer.
"""
if len(predictions) != len(actuals):
raise ValueError("the shape of predictions and actuals does not match.")
if n is not None:
if not isinstance(n, int) or n <= 0:
raise ValueError("n must be 'None' or a positive integer."
" It was '%s'." % n)
ap = 0.0
predictions = numpy.array(predictions)
actuals = numpy.array(actuals)
# add a shuffler to avoid overestimating the ap
predictions, actuals = AveragePrecisionCalculator._shuffle(
predictions, actuals)
sortidx = sorted(
range(len(predictions)), key=lambda k: predictions[k], reverse=True)
if total_num_positives is None:
numpos = numpy.size(numpy.where(actuals > 0))
else:
numpos = total_num_positives
if numpos == 0:
return 0
if n is not None:
numpos = min(numpos, n)
delta_recall = 1.0 / numpos
poscount = 0.0
# calculate the ap
r = len(sortidx)
if n is not None:
r = min(r, n)
for i in range(r):
if actuals[sortidx[i]] > 0:
poscount += 1
ap += poscount / (i + 1) * delta_recall
return ap
@staticmethod
def _shuffle(predictions, actuals):
random.seed(0)
suffidx = random.sample(range(len(predictions)), len(predictions))
predictions = predictions[suffidx]
actuals = actuals[suffidx]
return predictions, actuals
@staticmethod
def _zero_one_normalize(predictions, epsilon=1e-7):
"""Normalize the predictions to the range between 0.0 and 1.0.
For some predictions like SVM predictions, we need to normalize them before
calculate the interpolated average precision. The normalization will not
change the rank in the original list and thus won't change the average
precision.
Args:
predictions: a numpy 1-D array storing the sparse prediction scores.
epsilon: a small constant to avoid denominator being zero.
Returns:
The normalized prediction.
"""
denominator = numpy.max(predictions) - numpy.min(predictions)
ret = (predictions - numpy.min(predictions)) / numpy.max(
denominator, epsilon)
return ret
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides functions to help with evaluating models."""
import numpy as np
import tensorflow as tf
from official.vision.beta.projects.yt8m.eval_utils import \
average_precision_calculator as ap_calculator
from official.vision.beta.projects.yt8m.eval_utils import \
mean_average_precision_calculator as map_calculator
def flatten(l):
"""Merges a list of lists into a single list."""
# pylint: disable=g-complex-comprehension
return [item for sublist in l for item in sublist]
# pylint: enable=g-complex-comprehension
def calculate_hit_at_one(predictions, actuals):
"""Performs a local (numpy) calculation of the hit at one.
Args:
predictions: Matrix containing the outputs of the model. Dimensions are
'batch' x 'num_classes'.
actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x
'num_classes'.
Returns:
float: The average hit at one across the entire batch.
"""
top_prediction = np.argmax(predictions, 1)
hits = actuals[np.arange(actuals.shape[0]), top_prediction]
return np.average(hits)
def calculate_precision_at_equal_recall_rate(predictions, actuals):
"""Performs a local (numpy) calculation of the PERR.
Args:
predictions: Matrix containing the outputs of the model. Dimensions are
'batch' x 'num_classes'.
actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x
'num_classes'.
Returns:
float: The average precision at equal recall rate across the entire batch.
"""
aggregated_precision = 0.0
num_videos = actuals.shape[0]
for row in np.arange(num_videos):
num_labels = int(np.sum(actuals[row]))
top_indices = np.argpartition(predictions[row], -num_labels)[-num_labels:]
item_precision = 0.0
for label_index in top_indices:
if predictions[row][label_index] > 0:
item_precision += actuals[row][label_index]
item_precision /= top_indices.size
aggregated_precision += item_precision
aggregated_precision /= num_videos
return aggregated_precision
def calculate_gap(predictions, actuals, top_k=20):
"""Performs a local (numpy) calculation of the global average precision.
Only the top_k predictions are taken for each of the videos.
Args:
predictions: Matrix containing the outputs of the model. Dimensions are
'batch' x 'num_classes'.
actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x
'num_classes'.
top_k: How many predictions to use per video.
Returns:
float: The global average precision.
"""
gap_calculator = ap_calculator.AveragePrecisionCalculator()
sparse_predictions, sparse_labels, num_positives = top_k_by_class(
predictions, actuals, top_k)
gap_calculator.accumulate(
flatten(sparse_predictions), flatten(sparse_labels), sum(num_positives))
return gap_calculator.peek_ap_at_n()
def top_k_by_class(predictions, labels, k=20):
"""Extracts the top k predictions for each video, sorted by class.
Args:
predictions: A numpy matrix containing the outputs of the model. Dimensions
are 'batch' x 'num_classes'.
labels: A numpy matrix containing the ground truth labels.
Dimensions are 'batch' x 'num_classes'.
k: the top k non-zero entries to preserve in each prediction.
Returns:
A tuple (predictions,labels, true_positives). 'predictions' and 'labels'
are lists of lists of floats. 'true_positives' is a list of scalars. The
length of the lists are equal to the number of classes. The entries in the
predictions variable are probability predictions, and
the corresponding entries in the labels variable are the ground truth for
those predictions. The entries in 'true_positives' are the number of true
positives for each class in the ground truth.
Raises:
ValueError: An error occurred when the k is not a positive integer.
"""
if k <= 0:
raise ValueError("k must be a positive integer.")
k = min(k, predictions.shape[1])
num_classes = predictions.shape[1]
prediction_triplets = []
for video_index in range(predictions.shape[0]):
prediction_triplets.extend(
top_k_triplets(predictions[video_index], labels[video_index], k))
out_predictions = [[] for _ in range(num_classes)]
out_labels = [[] for _ in range(num_classes)]
for triplet in prediction_triplets:
out_predictions[triplet[0]].append(triplet[1])
out_labels[triplet[0]].append(triplet[2])
out_true_positives = [np.sum(labels[:, i]) for i in range(num_classes)]
return out_predictions, out_labels, out_true_positives
def top_k_triplets(predictions, labels, k=20):
"""Get the top_k for a 1-d numpy array.
Args:
predictions: A numpy matrix containing the outputs of the model. Dimensions
are 'batch' x 'num_classes'.
labels: A numpy matrix containing the ground truth labels.
Dimensions are 'batch' x 'num_classes'.
k: The number top predictions to pick.
Returns:
a sparse list of tuples in (prediction, class) format.
"""
m = len(predictions)
k = min(k, m)
indices = np.argpartition(predictions, -k)[-k:]
return [(index, predictions[index], labels[index]) for index in indices]
class EvaluationMetrics(object):
"""A class to store the evaluation metrics."""
def __init__(self, num_class, top_k, top_n):
"""Construct an EvaluationMetrics object to store the evaluation metrics.
Args:
num_class: A positive integer specifying the number of classes.
top_k: A positive integer specifying how many predictions are considered
per video.
top_n: A positive Integer specifying the average precision at n, or None
to use all provided data points.
Raises:
ValueError: An error occurred when MeanAveragePrecisionCalculator cannot
not be constructed.
"""
self.sum_hit_at_one = 0.0
self.sum_perr = 0.0
self.map_calculator = map_calculator.MeanAveragePrecisionCalculator(
num_class, top_n=top_n)
self.global_ap_calculator = ap_calculator.AveragePrecisionCalculator()
self.top_k = top_k
self.num_examples = 0
self.num_class = num_class
def accumulate(self, predictions, labels):
"""Accumulate the metrics calculated locally for this mini-batch.
Args:
predictions: A numpy matrix containing the outputs of the model.
Dimensions are 'batch' x 'num_classes'.
labels: A numpy matrix containing the ground truth labels. Dimensions are
'batch' x 'num_classes'.
Returns:
dictionary: A dictionary storing the metrics for the mini-batch.
Raises:
ValueError: An error occurred when the shape of predictions and actuals
does not match.
"""
predictions, labels = self._convert_to_numpy(
predictions=predictions[0], groundtruths=labels[0])
batch_size = labels.shape[0]
mean_hit_at_one = calculate_hit_at_one(predictions, labels)
mean_perr = calculate_precision_at_equal_recall_rate(predictions, labels)
# Take the top 20 predictions.
sparse_predictions, sparse_labels, num_positives = top_k_by_class(
predictions, labels, self.top_k)
self.map_calculator.accumulate(sparse_predictions, sparse_labels,
num_positives)
self.global_ap_calculator.accumulate(
flatten(sparse_predictions), flatten(sparse_labels), sum(num_positives))
self.num_examples += batch_size
self.sum_hit_at_one += mean_hit_at_one * batch_size
self.sum_perr += mean_perr * batch_size
return {"hit_at_one": mean_hit_at_one, "perr": mean_perr}
def get(self):
"""Calculate the evaluation metrics for the whole epoch.
Raises:
ValueError: If no examples were accumulated.
Returns:
dictionary: a dictionary storing the evaluation metrics for the epoch. The
dictionary has the fields: avg_hit_at_one, avg_perr, and
aps (default nan).
"""
if self.num_examples <= 0:
raise ValueError("total_sample must be positive.")
avg_hit_at_one = self.sum_hit_at_one / self.num_examples
avg_perr = self.sum_perr / self.num_examples
aps = self.map_calculator.peek_map_at_n()
mean_ap = sum(aps) / self.num_class
gap = self.global_ap_calculator.peek_ap_at_n()
epoch_info_dict = {
"avg_hit_at_one": avg_hit_at_one,
"avg_perr": avg_perr,
"map": mean_ap,
"gap": gap
}
return epoch_info_dict
def clear(self):
"""Clear the evaluation metrics and reset the EvaluationMetrics object."""
self.sum_hit_at_one = 0.0
self.sum_perr = 0.0
self.map_calculator.clear()
self.global_ap_calculator.clear()
self.num_examples = 0
@property
def name(self):
return "avg_prec_metric"
def _convert_to_numpy(self, groundtruths, predictions):
"""Converts tesnors to numpy arrays."""
if groundtruths is not None:
labels = tf.nest.map_structure(lambda x: x.numpy(), groundtruths)
else:
labels = groundtruths
if predictions is not None:
outputs = tf.nest.map_structure(lambda x: x.numpy(), predictions)
else:
outputs = predictions
labels = labels * 1
return outputs, labels
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Calculate the mean average precision.
It provides an interface for calculating mean average precision
for an entire list or the top-n ranked items.
Example usages:
We first call the function accumulate many times to process parts of the ranked
list. After processing all the parts, we call peek_map_at_n
to calculate the mean average precision.
```
import random
p = np.array([[random.random() for _ in xrange(50)] for _ in xrange(1000)])
a = np.array([[random.choice([0, 1]) for _ in xrange(50)]
for _ in xrange(1000)])
# mean average precision for 50 classes.
calculator = mean_average_precision_calculator.MeanAveragePrecisionCalculator(
num_class=50)
calculator.accumulate(p, a)
aps = calculator.peek_map_at_n()
```
"""
from official.vision.beta.projects.yt8m.eval_utils import \
average_precision_calculator
class MeanAveragePrecisionCalculator(object):
"""This class is to calculate mean average precision."""
def __init__(self, num_class, filter_empty_classes=True, top_n=None):
"""Construct a calculator to calculate the (macro) average precision.
Args:
num_class: A positive Integer specifying the number of classes.
filter_empty_classes: whether to filter classes without any positives.
top_n: A positive Integer specifying the average precision at n, or None
to use all provided data points.
Raises:
ValueError: An error occurred when num_class is not a positive integer;
or the top_n_array is not a list of positive integers.
"""
if not isinstance(num_class, int) or num_class <= 1:
raise ValueError("num_class must be a positive integer.")
self._ap_calculators = [] # member of AveragePrecisionCalculator
self._num_class = num_class # total number of classes
self._filter_empty_classes = filter_empty_classes
for _ in range(num_class):
self._ap_calculators.append(
average_precision_calculator.AveragePrecisionCalculator(top_n=top_n))
def accumulate(self, predictions, actuals, num_positives=None):
"""Accumulate the predictions and their ground truth labels.
Args:
predictions: A list of lists storing the prediction scores. The outer
dimension corresponds to classes.
actuals: A list of lists storing the ground truth labels. The dimensions
should correspond to the predictions input. Any value larger than 0 will
be treated as positives, otherwise as negatives.
num_positives: If provided, it is a list of numbers representing the
number of true positives for each class. If not provided, the number of
true positives will be inferred from the 'actuals' array.
Raises:
ValueError: An error occurred when the shape of predictions and actuals
does not match.
"""
if not num_positives:
num_positives = [None for i in range(self._num_class)]
calculators = self._ap_calculators
for i in range(self._num_class):
calculators[i].accumulate(predictions[i], actuals[i], num_positives[i])
def clear(self):
for calculator in self._ap_calculators:
calculator.clear()
def is_empty(self):
return ([calculator.heap_size for calculator in self._ap_calculators
] == [0 for _ in range(self._num_class)])
def peek_map_at_n(self):
"""Peek the non-interpolated mean average precision at n.
Returns:
An array of non-interpolated average precision at n (default 0) for each
class.
"""
aps = []
for i in range(self._num_class):
if (not self._filter_empty_classes or
self._ap_calculators[i].num_accumulated_positives > 0):
ap = self._ap_calculators[i].peek_ap_at_n()
aps.append(ap)
return aps
# yt8m config file
task:
model:
cluster_size: 8192
hidden_size: 1024
add_batch_norm: true
sample_random_frames: true
is_training: true
activation: "sigmoid"
pooling_method: "max"
yt8m_agg_classifier_model: "MoeModel"
train_data:
name: 'yt8m'
split: 'train'
feature_sizes: !!python/tuple
- 1024
- 128
feature_names: !!python/tuple
- "rgb"
- "audio"
segment_size: 1
segment_labels: false
temporal_stride: 1
max_frames: 300
num_frames: 300
num_classes: 3862
num_devices: 1
input_path: 'gs://youtube8m-ml/2/frame/train/train*.tfrecord'
is_training: true
random_seed: 123
validation_data:
name: 'yt8m'
split: 'train'
feature_sizes: !!python/tuple
- 1024
- 128
feature_names: !!python/tuple
- "rgb"
- "audio"
segment_size: 1
segment_labels: true
temporal_stride: 1
max_frames: 300
num_frames: 300
num_classes: 3862
num_devices: 1
input_path: 'gs://youtube8m-ml/3/frame/validate/validate*.tfrecord'
is_training: false
random_seed: 123
losses:
name: 'binary_crossentropy'
from_logits: false
label_smoothing: 0.0
gradient_clip_norm: 1.0
num_readers: 8
top_k: 20
# yt8m test config file
task:
model:
cluster_size: 2048
hidden_size: 2048
add_batch_norm: true
sample_random_frames: true
is_training: true
activation: "relu6"
pooling_method: "average"
yt8m_agg_classifier_model: "MoeModel"
train_data:
segment_labels: false
temporal_stride: 1
num_devices: 1
input_path: 'gs://youtube8m-ml/2/frame/train/train*.tfrecord'
num_examples: 8000
validation_data:
segment_size: 5
segment_labels: true
temporal_stride: 1
num_devices: 1
input_path: 'gs://youtube8m-ml/3/frame/validate/validate*.tfrecord'
num_examples: 2000
losses:
name: 'binary_crossentropy'
from_logits: false
label_smoothing: 0.0
gradient_clip_norm: 1.0
num_readers: 8
top_k: 20
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains model definitions."""
import tensorflow as tf
layers = tf.keras.layers
regularizers = tf.keras.regularizers
# The number of mixtures (excluding the dummy 'expert') used for MoeModel.
moe_num_mixtures = 2
class LogisticModel():
"""Logistic model with L2 regularization."""
def create_model(self, model_input, vocab_size, l2_penalty=1e-8):
"""Creates a logistic model.
Args:
model_input: 'batch' x 'num_features' matrix of input features.
vocab_size: The number of classes in the dataset.
l2_penalty: L2 weight regularization ratio.
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
batch_size x num_classes.
"""
output = layers.Dense(
vocab_size,
activation=tf.nn.sigmoid,
kernel_regularizer=regularizers.l2(l2_penalty))(
model_input)
return {"predictions": output}
class MoeModel():
"""A softmax over a mixture of logistic models (with L2 regularization)."""
def create_model(self,
model_input,
vocab_size,
num_mixtures=None,
l2_penalty=1e-8):
"""Creates a Mixture of (Logistic) Experts model.
The model consists of a per-class softmax distribution over a
configurable number of logistic classifiers. One of the classifiers
in the mixture is not trained, and always predicts 0.
Args:
model_input: 'batch_size' x 'num_features' matrix of input features.
vocab_size: The number of classes in the dataset.
num_mixtures: The number of mixtures (excluding a dummy 'expert' that
always predicts the non-existence of an entity).
l2_penalty: How much to penalize the squared magnitudes of parameter
values.
Returns:
A dictionary with a tensor containing the probability predictions
of the model in the 'predictions' key. The dimensions of the tensor
are batch_size x num_classes.
"""
num_mixtures = num_mixtures or moe_num_mixtures
gate_activations = layers.Dense(
vocab_size * (num_mixtures + 1),
activation=None,
bias_initializer=None,
kernel_regularizer=regularizers.l2(l2_penalty))(
model_input)
expert_activations = layers.Dense(
vocab_size * num_mixtures,
activation=None,
kernel_regularizer=regularizers.l2(l2_penalty))(
model_input)
gating_distribution = tf.nn.softmax(
tf.reshape(
gate_activations,
[-1, num_mixtures + 1])) # (Batch * #Labels) x (num_mixtures + 1)
expert_distribution = tf.nn.sigmoid(
tf.reshape(expert_activations,
[-1, num_mixtures])) # (Batch * #Labels) x num_mixtures
final_probabilities_by_class_and_batch = tf.reduce_sum(
gating_distribution[:, :num_mixtures] * expert_distribution, 1)
final_probabilities = tf.reshape(final_probabilities_by_class_and_batch,
[-1, vocab_size])
return {"predictions": final_probabilities}
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""YT8M model definition."""
import tensorflow as tf
from official.modeling import tf_utils
from official.vision.beta.projects.yt8m.configs import yt8m as yt8m_cfg
from official.vision.beta.projects.yt8m.modeling import yt8m_agg_models
from official.vision.beta.projects.yt8m.modeling import yt8m_model_utils as utils
layers = tf.keras.layers
class YT8MModel(tf.keras.Model):
"""A YT8M model class builder."""
def __init__(self,
input_params: yt8m_cfg.YT8MModel,
num_frames=30,
num_classes=3862,
input_specs=layers.InputSpec(shape=[None, None, 1152]),
**kwargs):
"""YT8M initialization function.
Args:
input_params: model configuration parameters
num_frames: `int` number of frames in a single input.
num_classes: `int` number of classes in dataset.
input_specs: `tf.keras.layers.InputSpec` specs of the input tensor.
[batch_size x num_frames x num_features]
**kwargs: keyword arguments to be passed.
"""
self._self_setattr_tracking = False
self._config_dict = {
"input_specs": input_specs,
"num_classes": num_classes,
"num_frames": num_frames,
"input_params": input_params
}
self._num_classes = num_classes
self._input_specs = input_specs
self._act_fn = tf_utils.get_activation(input_params.activation)
self._is_training = input_params.is_training
# [batch_size x num_frames x num_features]
feature_size = input_specs.shape[-1]
# shape 'excluding' batch_size
model_input = tf.keras.Input(shape=self._input_specs.shape[1:])
reshaped_input = tf.reshape(model_input, [-1, feature_size])
tf.summary.histogram("input_hist", model_input)
# configure model
if input_params.add_batch_norm:
reshaped_input = layers.BatchNormalization(
name="input_bn", scale=True, center=True,
trainable=self._is_training)(
reshaped_input)
# activation = reshaped input * cluster weights
activation = layers.Dense(
input_params.cluster_size,
kernel_initializer=tf.random_normal_initializer(
stddev=1 / tf.sqrt(tf.cast(feature_size, tf.float32))))(
reshaped_input)
if input_params.add_batch_norm:
activation = layers.BatchNormalization(
name="cluster_bn",
scale=True,
center=True,
trainable=self._is_training)(
activation)
else:
cluster_biases = tf.Variable(
tf.random_normal_initializer(stddev=1 / tf.math.sqrt(feature_size))(
shape=[input_params.cluster_size]),
name="cluster_biases")
tf.summary.histogram("cluster_biases", cluster_biases)
activation += cluster_biases
activation = self._act_fn(activation)
tf.summary.histogram("cluster_output", activation)
activation = tf.reshape(activation,
[-1, num_frames, input_params.cluster_size])
activation = utils.FramePooling(activation, input_params.pooling_method)
# activation = activation * hidden1_weights
activation = layers.Dense(
input_params.hidden_size,
kernel_initializer=tf.random_normal_initializer(
stddev=1 /
tf.sqrt(tf.cast(input_params.cluster_size, tf.float32))))(
activation)
if input_params.add_batch_norm:
activation = layers.BatchNormalization(
name="hidden1_bn",
scale=True,
center=True,
trainable=self._is_training)(
activation)
else:
hidden1_biases = tf.Variable(
tf.random_normal_initializer(stddev=0.01)(
shape=[input_params.hidden_size]),
name="hidden1_biases")
tf.summary.histogram("hidden1_biases", hidden1_biases)
activation += hidden1_biases
activation = self._act_fn(activation)
tf.summary.histogram("hidden1_output", activation)
aggregated_model = getattr(yt8m_agg_models,
input_params.yt8m_agg_classifier_model)
output = aggregated_model().create_model(
model_input=activation, vocab_size=self._num_classes)
super().__init__(
inputs=model_input, outputs=output.get("predictions"), **kwargs)
@property
def checkpoint_items(self):
"""Returns a dictionary of items to be additionally checkpointed."""
return dict()
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config):
return cls(**config)
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for yt8m network."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.projects.yt8m.configs import yt8m as yt8m_cfg
from official.vision.beta.projects.yt8m.modeling import yt8m_model
class YT8MNetworkTest(parameterized.TestCase, tf.test.TestCase):
"""Class for testing yt8m network."""
# test_yt8m_network_creation arbitrary params
@parameterized.parameters((32, 1152)) # 1152 = 1024 + 128
def test_yt8m_network_creation(self, num_frames, feature_dims):
"""Test for creation of a YT8M Model.
Args:
num_frames: number of frames.
feature_dims: indicates total dimension size of the features.
"""
input_specs = tf.keras.layers.InputSpec(shape=[num_frames, feature_dims])
num_classes = 3862
model = yt8m_model.YT8MModel(
input_params=yt8m_cfg.YT8MTask.model,
num_frames=num_frames,
num_classes=num_classes,
input_specs=input_specs)
# batch = 2 -> arbitrary value for test
inputs = np.random.rand(2 * num_frames, feature_dims)
logits = model(inputs)
self.assertAllEqual([2, num_classes], logits.numpy().shape)
def test_serialize_deserialize(self):
model = yt8m_model.YT8MModel(input_params=yt8m_cfg.YT8MTask.model)
config = model.get_config()
new_model = yt8m_model.YT8MModel.from_config(config)
# If the serialization was successful,
# the new config should match the old.
self.assertAllEqual(model.get_config(), new_model.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains a collection of util functions for model construction."""
import tensorflow as tf
def SampleRandomSequence(model_input, num_frames, num_samples):
"""Samples a random sequence of frames of size num_samples.
Args:
model_input: tensor of shape [batch_size x max_frames x feature_size]
num_frames: tensor of shape [batch_size x 1]
num_samples: a scalar indicating the number of samples
Returns:
reshaped model_input in [batch_size x 'num_samples' x feature_size]
"""
batch_size = tf.shape(model_input)[0]
frame_index_offset = tf.tile(
tf.expand_dims(tf.range(num_samples), 0), [batch_size, 1])
max_start_frame_index = tf.maximum(num_frames - num_samples, 0)
start_frame_index = tf.cast(
tf.multiply(
tf.random_uniform([batch_size, 1]),
tf.cast(max_start_frame_index + 1, tf.float32)), tf.int32)
frame_index = tf.minimum(start_frame_index + frame_index_offset,
tf.cast(num_frames - 1, tf.int32))
batch_index = tf.tile(
tf.expand_dims(tf.range(batch_size), 1), [1, num_samples])
index = tf.stack([batch_index, frame_index], 2)
return tf.gather_nd(model_input, index)
def SampleRandomFrames(model_input, num_frames, num_samples):
"""Samples a random set of frames of size num_samples.
Args:
model_input: tensor of shape [batch_size x max_frames x feature_size]
num_frames: tensor of shape [batch_size x 1]
num_samples (int): a scalar indicating the number of samples
Returns:
reshaped model_input in [batch_size x 'num_samples' x feature_size]
"""
batch_size = tf.shape(model_input)[0]
frame_index = tf.cast(
tf.multiply(
tf.random.uniform([batch_size, num_samples]),
tf.tile(tf.cast(num_frames, tf.float32), [1, num_samples])), tf.int32)
batch_index = tf.tile(
tf.expand_dims(tf.range(batch_size), 1), [1, num_samples])
index = tf.stack([batch_index, frame_index], 2)
return tf.gather_nd(model_input, index)
def FramePooling(frames, method):
"""Pools over the frames of a video.
Args:
frames: tensor of shape [batch_size, num_frames, feature_size].
method: string indicating pooling method, one of: "average", "max",
"attention", or "none".
Returns:
tensor of shape [batch_size, feature_size] for average, max, or
attention pooling, and shape [batch_size*num_frames, feature_size]
for none pooling.
Raises:
ValueError: if method is other than "average", "max", "attention", or
"none".
"""
if method == "average":
reduced = tf.reduce_mean(frames, 1)
elif method == "max":
reduced = tf.reduce_max(frames, 1)
elif method == "none":
feature_size = frames.shape_as_list()[2]
reduced = tf.reshape(frames, [-1, feature_size])
else:
raise ValueError("Unrecognized pooling method: %s" % method)
return reduced
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tasks package definition."""
from official.vision.beta.projects.yt8m.tasks import yt8m_task
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Video classification task definition."""
from absl import logging
import tensorflow as tf
from official.core import base_task
from official.core import input_reader
from official.core import task_factory
from official.modeling import tf_utils
from official.vision.beta.projects.yt8m.configs import yt8m as yt8m_cfg
from official.vision.beta.projects.yt8m.dataloaders import yt8m_input
from official.vision.beta.projects.yt8m.eval_utils import eval_util
from official.vision.beta.projects.yt8m.modeling import yt8m_model_utils as utils
from official.vision.beta.projects.yt8m.modeling.yt8m_model import YT8MModel
@task_factory.register_task_cls(yt8m_cfg.YT8MTask)
class YT8MTask(base_task.Task):
"""A task for video classification."""
def build_model(self):
"""Builds model for YT8M Task."""
train_cfg = self.task_config.train_data
common_input_shape = [None, sum(train_cfg.feature_sizes)]
# [batch_size x num_frames x num_features]
input_specs = tf.keras.layers.InputSpec(shape=[None] + common_input_shape)
logging.info('Build model input %r', common_input_shape)
# Model configuration.
model_config = self.task_config.model
model = YT8MModel(
input_params=model_config,
input_specs=input_specs,
num_frames=train_cfg.num_frames,
num_classes=train_cfg.num_classes)
return model
def build_inputs(self, params: yt8m_cfg.DataConfig, input_context=None):
"""Builds input.
Args:
params: configuration for input data
input_context: indicates information about the compute replicas and input
pipelines
Returns:
dataset: dataset fetched from reader
"""
decoder = yt8m_input.Decoder(input_params=params)
decoder_fn = decoder.decode
parser = yt8m_input.Parser(input_params=params)
parser_fn = parser.parse_fn(params.is_training)
postprocess = yt8m_input.PostBatchProcessor(input_params=params)
postprocess_fn = postprocess.post_fn
transform_batch = yt8m_input.TransformBatcher(input_params=params)
batch_fn = transform_batch.batch_fn
reader = input_reader.InputReader(
params,
dataset_fn=tf.data.TFRecordDataset,
decoder_fn=decoder_fn,
parser_fn=parser_fn,
postprocess_fn=postprocess_fn,
transform_and_batch_fn=batch_fn)
dataset = reader.read(input_context=input_context)
return dataset
def build_losses(self, labels, model_outputs, aux_losses=None):
"""Sigmoid Cross Entropy.
Args:
labels: tensor containing truth labels.
model_outputs: output logits of the classifier.
aux_losses: tensor containing auxiliarly loss tensors, i.e. `losses` in
keras.Model.
Returns:
Tensors: The total loss, model loss tensors.
"""
losses_config = self.task_config.losses
model_loss = tf.keras.losses.binary_crossentropy(
labels,
model_outputs,
from_logits=losses_config.from_logits,
label_smoothing=losses_config.label_smoothing)
model_loss = tf_utils.safe_mean(model_loss)
total_loss = model_loss
if aux_losses:
total_loss += tf.add_n(aux_losses)
return total_loss, model_loss
def build_metrics(self, training=True):
"""Gets streaming metrics for training/validation.
metric: mAP/gAP
top_k: A positive integer specifying how many predictions are considered
per video.
top_n: A positive Integer specifying the average precision at n, or None
to use all provided data points.
Args:
training: bool value, true for training mode, false for eval/validation.
Returns:
list of strings that indicate metrics to be used
"""
metrics = []
metric_names = ['total_loss', 'model_loss']
for name in metric_names:
metrics.append(tf.keras.metrics.Mean(name, dtype=tf.float32))
if not training: # Cannot run in train step.
num_classes = self.task_config.validation_data.num_classes
top_k = self.task_config.top_k
top_n = self.task_config.top_n
self.avg_prec_metric = eval_util.EvaluationMetrics(
num_classes, top_k=top_k, top_n=top_n)
return metrics
def train_step(self, inputs, model, optimizer, metrics=None):
"""Does forward and backward.
Args:
inputs: a dictionary of input tensors. output_dict = {
"video_ids": batch_video_ids,
"video_matrix": batch_video_matrix,
"labels": batch_labels,
"num_frames": batch_frames, }
model: the model, forward pass definition.
optimizer: the optimizer for this training step.
metrics: a nested structure of metrics objects.
Returns:
a dictionary of logs.
"""
features, labels = inputs['video_matrix'], inputs['labels']
num_frames = inputs['num_frames']
# Normalize input features.
feature_dim = len(features.shape) - 1
features = tf.nn.l2_normalize(features, feature_dim)
# sample random frames / random sequence
num_frames = tf.cast(num_frames, tf.float32)
sample_frames = self.task_config.train_data.num_frames
if self.task_config.model.sample_random_frames:
features = utils.SampleRandomFrames(features, num_frames, sample_frames)
else:
features = utils.SampleRandomSequence(features, num_frames, sample_frames)
num_replicas = tf.distribute.get_strategy().num_replicas_in_sync
with tf.GradientTape() as tape:
outputs = model(features, training=True)
# Casting output layer as float32 is necessary when mixed_precision is
# mixed_float16 or mixed_bfloat16 to ensure output is casted as float32.
outputs = tf.nest.map_structure(lambda x: tf.cast(x, tf.float32), outputs)
# Computes per-replica loss
loss, model_loss = self.build_losses(
model_outputs=outputs, labels=labels, aux_losses=model.losses)
# Scales loss as the default gradients allreduce performs sum inside the
# optimizer.
scaled_loss = loss / num_replicas
# For mixed_precision policy, when LossScaleOptimizer is used, loss is
# scaled for numerical stability.
if isinstance(optimizer,
tf.keras.mixed_precision.experimental.LossScaleOptimizer):
scaled_loss = optimizer.get_scaled_loss(scaled_loss)
tvars = model.trainable_variables
grads = tape.gradient(scaled_loss, tvars)
# Scales back gradient before apply_gradients when LossScaleOptimizer is
# used.
if isinstance(optimizer,
tf.keras.mixed_precision.experimental.LossScaleOptimizer):
grads = optimizer.get_unscaled_gradients(grads)
# Apply gradient clipping.
if self.task_config.gradient_clip_norm > 0:
grads, _ = tf.clip_by_global_norm(grads,
self.task_config.gradient_clip_norm)
optimizer.apply_gradients(list(zip(grads, tvars)))
logs = {self.loss: loss}
all_losses = {'total_loss': loss, 'model_loss': model_loss}
if metrics:
for m in metrics:
m.update_state(all_losses[m.name])
logs.update({m.name: m.result()})
return logs
def validation_step(self, inputs, model, metrics=None):
"""Validatation step.
Args:
inputs: a dictionary of input tensors. output_dict = {
"video_ids": batch_video_ids,
"video_matrix": batch_video_matrix,
"labels": batch_labels,
"num_frames": batch_frames, }
model: the model, forward definition
metrics: a nested structure of metrics objects.
Returns:
a dictionary of logs.
"""
features, labels = inputs['video_matrix'], inputs['labels']
num_frames = inputs['num_frames']
# Normalize input features.
feature_dim = len(features.shape) - 1
features = tf.nn.l2_normalize(features, feature_dim)
# sample random frames (None, 5, 1152) -> (None, 30, 1152)
sample_frames = self.task_config.validation_data.num_frames
if self.task_config.model.sample_random_frames:
features = utils.SampleRandomFrames(features, num_frames, sample_frames)
else:
features = utils.SampleRandomSequence(features, num_frames, sample_frames)
outputs = self.inference_step(features, model)
outputs = tf.nest.map_structure(lambda x: tf.cast(x, tf.float32), outputs)
if self.task_config.validation_data.segment_labels:
# workaround to ignore the unrated labels.
outputs *= inputs['label_weights']
# remove padding
outputs = outputs[~tf.reduce_all(labels == -1, axis=1)]
labels = labels[~tf.reduce_all(labels == -1, axis=1)]
loss, model_loss = self.build_losses(
model_outputs=outputs, labels=labels, aux_losses=model.losses)
logs = {self.loss: loss}
all_losses = {'total_loss': loss, 'model_loss': model_loss}
logs.update({self.avg_prec_metric.name: (labels, outputs)})
if metrics:
for m in metrics:
m.update_state(all_losses[m.name])
logs.update({m.name: m.result()})
return logs
def inference_step(self, inputs, model):
"""Performs the forward step."""
return model(inputs, training=False)
def aggregate_logs(self, state=None, step_logs=None):
if state is None:
state = self.avg_prec_metric
self.avg_prec_metric.accumulate(
labels=step_logs[self.avg_prec_metric.name][0],
predictions=step_logs[self.avg_prec_metric.name][1])
return state
def reduce_aggregated_logs(self, aggregated_logs):
avg_prec_metrics = self.avg_prec_metric.get()
self.avg_prec_metric.clear()
return avg_prec_metrics
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""YT8M model training driver."""
from absl import app
from absl import flags
import gin
from official.common import distribute_utils
from official.common import flags as tfm_flags
from official.core import task_factory
from official.core import train_lib
from official.core import train_utils
from official.modeling import performance
# pylint: disable=unused-import
from official.vision.beta.projects.yt8m.configs import yt8m
from official.vision.beta.projects.yt8m.tasks import yt8m_task
# pylint: enable=unused-import
FLAGS = flags.FLAGS
def main(_):
gin.parse_config_files_and_bindings(FLAGS.gin_file, FLAGS.gin_params)
params = train_utils.parse_configuration(FLAGS)
model_dir = FLAGS.model_dir
if 'train' in FLAGS.mode:
# Pure eval modes do not output yaml files. Otherwise continuous eval job
# may race against the train job for writing the same file.
train_utils.serialize_config(params, model_dir)
# Sets mixed_precision policy. Using 'mixed_float16' or 'mixed_bfloat16'
# can have significant impact on model speeds by utilizing float16 in case of
# GPUs, and bfloat16 in the case of TPUs. loss_scale takes effect only when
# dtype is float16
if params.runtime.mixed_precision_dtype:
performance.set_mixed_precision_policy(params.runtime.mixed_precision_dtype,
params.runtime.loss_scale)
distribution_strategy = distribute_utils.get_distribution_strategy(
distribution_strategy=params.runtime.distribution_strategy,
all_reduce_alg=params.runtime.all_reduce_alg,
num_gpus=params.runtime.num_gpus,
tpu_address=params.runtime.tpu)
with distribution_strategy.scope():
task = task_factory.get_task(params.task, logging_dir=model_dir)
train_lib.run_experiment(
distribution_strategy=distribution_strategy,
task=task,
mode=FLAGS.mode,
params=params,
model_dir=model_dir)
if __name__ == '__main__':
tfm_flags.define_flags()
app.run(main)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment