# Copyright 2021 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Image classification task definition.""" from typing import Any, Optional, List, Tuple from absl import logging import tensorflow as tf from official.common import dataset_fn from official.core import base_task from official.core import task_factory from official.modeling import tf_utils from official.vision.beta.configs import image_classification as exp_cfg from official.vision.beta.dataloaders import classification_input from official.vision.beta.dataloaders import input_reader_factory from official.vision.beta.dataloaders import tfds_classification_decoders from official.vision.beta.modeling import factory @task_factory.register_task_cls(exp_cfg.ImageClassificationTask) class ImageClassificationTask(base_task.Task): """A task for image classification.""" def build_model(self): """Builds classification model.""" input_specs = tf.keras.layers.InputSpec( shape=[None] + self.task_config.model.input_size) l2_weight_decay = self.task_config.losses.l2_weight_decay # Divide weight decay by 2.0 to match the implementation of tf.nn.l2_loss. # (https://www.tensorflow.org/api_docs/python/tf/keras/regularizers/l2) # (https://www.tensorflow.org/api_docs/python/tf/nn/l2_loss) l2_regularizer = (tf.keras.regularizers.l2( l2_weight_decay / 2.0) if l2_weight_decay else None) model = factory.build_classification_model( input_specs=input_specs, model_config=self.task_config.model, l2_regularizer=l2_regularizer) return model def initialize(self, model: tf.keras.Model): """Loads pretrained checkpoint.""" if not self.task_config.init_checkpoint: return ckpt_dir_or_file = self.task_config.init_checkpoint if tf.io.gfile.isdir(ckpt_dir_or_file): ckpt_dir_or_file = tf.train.latest_checkpoint(ckpt_dir_or_file) # Restoring checkpoint. if self.task_config.init_checkpoint_modules == 'all': ckpt = tf.train.Checkpoint(**model.checkpoint_items) status = ckpt.restore(ckpt_dir_or_file) status.assert_consumed() elif self.task_config.init_checkpoint_modules == 'backbone': ckpt = tf.train.Checkpoint(backbone=model.backbone) status = ckpt.restore(ckpt_dir_or_file) status.expect_partial().assert_existing_objects_matched() else: raise ValueError( "Only 'all' or 'backbone' can be used to initialize the model.") logging.info('Finished loading pretrained checkpoint from %s', ckpt_dir_or_file) def build_inputs(self, params: exp_cfg.DataConfig, input_context: Optional[tf.distribute.InputContext] = None): """Builds classification input.""" num_classes = self.task_config.model.num_classes input_size = self.task_config.model.input_size image_field_key = self.task_config.train_data.image_field_key label_field_key = self.task_config.train_data.label_field_key if params.tfds_name: if params.tfds_name in tfds_classification_decoders.TFDS_ID_TO_DECODER_MAP: decoder = tfds_classification_decoders.TFDS_ID_TO_DECODER_MAP[ params.tfds_name]() else: raise ValueError('TFDS {} is not supported'.format(params.tfds_name)) else: decoder = classification_input.Decoder( image_field_key=image_field_key, label_field_key=label_field_key) parser = classification_input.Parser( output_size=input_size[:2], num_classes=num_classes, image_field_key=image_field_key, label_field_key=label_field_key, aug_rand_hflip=params.aug_rand_hflip, aug_type=params.aug_type, dtype=params.dtype) reader = input_reader_factory.input_reader_generator( params, dataset_fn=dataset_fn.pick_dataset_fn(params.file_type), decoder_fn=decoder.decode, parser_fn=parser.parse_fn(params.is_training)) dataset = reader.read(input_context=input_context) return dataset def build_losses(self, labels: tf.Tensor, model_outputs: tf.Tensor, aux_losses: Optional[Any] = None): """Builds sparse categorical cross entropy loss. Args: labels: Input groundtruth labels. model_outputs: Output logits of the classifier. aux_losses: The auxiliarly loss tensors, i.e. `losses` in tf.keras.Model. Returns: The total loss tensor. """ losses_config = self.task_config.losses if losses_config.one_hot: total_loss = tf.keras.losses.categorical_crossentropy( labels, model_outputs, from_logits=True, label_smoothing=losses_config.label_smoothing) else: total_loss = tf.keras.losses.sparse_categorical_crossentropy( labels, model_outputs, from_logits=True) total_loss = tf_utils.safe_mean(total_loss) if aux_losses: total_loss += tf.add_n(aux_losses) return total_loss def build_metrics(self, training: bool = True): """Gets streaming metrics for training/validation.""" k = self.task_config.evaluation.top_k if self.task_config.losses.one_hot: metrics = [ tf.keras.metrics.CategoricalAccuracy(name='accuracy'), tf.keras.metrics.TopKCategoricalAccuracy( k=k, name='top_{}_accuracy'.format(k))] else: metrics = [ tf.keras.metrics.SparseCategoricalAccuracy(name='accuracy'), tf.keras.metrics.SparseTopKCategoricalAccuracy( k=k, name='top_{}_accuracy'.format(k))] return metrics def train_step(self, inputs: Tuple[Any, Any], model: tf.keras.Model, optimizer: tf.keras.optimizers.Optimizer, metrics: Optional[List[Any]] = None): """Does forward and backward. Args: inputs: A tuple of of input tensors of (features, labels). model: A tf.keras.Model instance. optimizer: The optimizer for this training step. metrics: A nested structure of metrics objects. Returns: A dictionary of logs. """ features, labels = inputs if self.task_config.losses.one_hot: labels = tf.one_hot(labels, self.task_config.model.num_classes) num_replicas = tf.distribute.get_strategy().num_replicas_in_sync with tf.GradientTape() as tape: outputs = model(features, training=True) # Casting output layer as float32 is necessary when mixed_precision is # mixed_float16 or mixed_bfloat16 to ensure output is casted as float32. outputs = tf.nest.map_structure( lambda x: tf.cast(x, tf.float32), outputs) # Computes per-replica loss. loss = self.build_losses( model_outputs=outputs, labels=labels, aux_losses=model.losses) # Scales loss as the default gradients allreduce performs sum inside the # optimizer. scaled_loss = loss / num_replicas # For mixed_precision policy, when LossScaleOptimizer is used, loss is # scaled for numerical stability. if isinstance( optimizer, tf.keras.mixed_precision.LossScaleOptimizer): scaled_loss = optimizer.get_scaled_loss(scaled_loss) tvars = model.trainable_variables grads = tape.gradient(scaled_loss, tvars) # Scales back gradient before apply_gradients when LossScaleOptimizer is # used. if isinstance( optimizer, tf.keras.mixed_precision.LossScaleOptimizer): grads = optimizer.get_unscaled_gradients(grads) optimizer.apply_gradients(list(zip(grads, tvars))) logs = {self.loss: loss} if metrics: self.process_metrics(metrics, labels, outputs) elif model.compiled_metrics: self.process_compiled_metrics(model.compiled_metrics, labels, outputs) logs.update({m.name: m.result() for m in model.metrics}) return logs def validation_step(self, inputs: Tuple[Any, Any], model: tf.keras.Model, metrics: Optional[List[Any]] = None): """Runs validatation step. Args: inputs: A tuple of of input tensors of (features, labels). model: A tf.keras.Model instance. metrics: A nested structure of metrics objects. Returns: A dictionary of logs. """ features, labels = inputs if self.task_config.losses.one_hot: labels = tf.one_hot(labels, self.task_config.model.num_classes) outputs = self.inference_step(features, model) outputs = tf.nest.map_structure(lambda x: tf.cast(x, tf.float32), outputs) loss = self.build_losses(model_outputs=outputs, labels=labels, aux_losses=model.losses) logs = {self.loss: loss} if metrics: self.process_metrics(metrics, labels, outputs) elif model.compiled_metrics: self.process_compiled_metrics(model.compiled_metrics, labels, outputs) logs.update({m.name: m.result() for m in model.metrics}) return logs def inference_step(self, inputs: tf.Tensor, model: tf.keras.Model): """Performs the forward step.""" return model(inputs, training=False)