Unverified Commit 0e57630c authored by vivek rathod's avatar vivek rathod Committed by GitHub
Browse files

Merged commit includes the following changes: (#8755)

319052168  by rathodv:

    Change assertAllEqual to assertAllClose for Position Sensitive Crop and Resize to avoid flaky tests.

--
319044492  by rathodv:

    Internal change.

--
319039033  by ronnyvotel:

    Preprocessor ops for DensePose.

--
319035440  by sbeery:

    External beam code with DataFlow Support

--
318899436  by ronnyvotel:

    DensePose library for common operations like scaling, coordinate transformations, and flipping.

--
318833308  by Vivek Rathod:

      Internal Change

--

PiperOrigin-RevId: 319052168
parent b035a227
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""DensePose operations.
DensePose part ids are represented as tensors of shape
[num_instances, num_points] and coordinates are represented as tensors of shape
[num_instances, num_points, 4] where each point holds (y, x, v, u). The location
of the DensePose sampled point is (y, x) in normalized coordinates. The surface
coordinate (in the part coordinate frame) is (v, u). Note that dim 1 of both
tensors may contain padding, since the number of sampled points per instance
is not fixed. The value `num_points` represents the maximum number of sampled
points for an instance in the example.
"""
import os
import scipy.io
import tensorflow.compat.v1 as tf
from object_detection.utils import shape_utils
PART_NAMES = [
b'torso_back', b'torso_front', b'right_hand', b'left_hand', b'left_foot',
b'right_foot', b'right_upper_leg_back', b'left_upper_leg_back',
b'right_upper_leg_front', b'left_upper_leg_front', b'right_lower_leg_back',
b'left_lower_leg_back', b'right_lower_leg_front', b'left_lower_leg_front',
b'left_upper_arm_back', b'right_upper_arm_back', b'left_upper_arm_front',
b'right_upper_arm_front', b'left_lower_arm_back', b'right_lower_arm_back',
b'left_lower_arm_front', b'right_lower_arm_front', b'right_face',
b'left_face',
]
_SRC_PATH = ('google3/third_party/tensorflow_models/object_detection/'
'dataset_tools/densepose')
def scale(dp_surface_coords, y_scale, x_scale, scope=None):
"""Scales DensePose coordinates in y and x dimensions.
Args:
dp_surface_coords: a tensor of shape [num_instances, num_points, 4], with
coordinates in (y, x, v, u) format.
y_scale: (float) scalar tensor
x_scale: (float) scalar tensor
scope: name scope.
Returns:
new_dp_surface_coords: a tensor of shape [num_instances, num_points, 4]
"""
with tf.name_scope(scope, 'DensePoseScale'):
y_scale = tf.cast(y_scale, tf.float32)
x_scale = tf.cast(x_scale, tf.float32)
new_keypoints = dp_surface_coords * [[[y_scale, x_scale, 1, 1]]]
return new_keypoints
def clip_to_window(dp_surface_coords, window, scope=None):
"""Clips DensePose points to a window.
This op clips any input DensePose points to a window.
Args:
dp_surface_coords: a tensor of shape [num_instances, num_points, 4] with
DensePose surface coordinates in (y, x, v, u) format.
window: a tensor of shape [4] representing the [y_min, x_min, y_max, x_max]
window to which the op should clip the keypoints.
scope: name scope.
Returns:
new_dp_surface_coords: a tensor of shape [num_instances, num_points, 4].
"""
with tf.name_scope(scope, 'DensePoseClipToWindow'):
y, x, v, u = tf.split(value=dp_surface_coords, num_or_size_splits=4, axis=2)
win_y_min, win_x_min, win_y_max, win_x_max = tf.unstack(window)
y = tf.maximum(tf.minimum(y, win_y_max), win_y_min)
x = tf.maximum(tf.minimum(x, win_x_max), win_x_min)
new_dp_surface_coords = tf.concat([y, x, v, u], 2)
return new_dp_surface_coords
def prune_outside_window(dp_num_points, dp_part_ids, dp_surface_coords, window,
scope=None):
"""Prunes DensePose points that fall outside a given window.
This function replaces points that fall outside the given window with zeros.
See also clip_to_window which clips any DensePose points that fall outside the
given window.
Note that this operation uses dynamic shapes, and therefore is not currently
suitable for TPU.
Args:
dp_num_points: a tensor of shape [num_instances] that indicates how many
(non-padded) DensePose points there are per instance.
dp_part_ids: a tensor of shape [num_instances, num_points] with DensePose
part ids. These part_ids are 0-indexed, where the first non-background
part has index 0.
dp_surface_coords: a tensor of shape [num_instances, num_points, 4] with
DensePose surface coordinates in (y, x, v, u) format.
window: a tensor of shape [4] representing the [y_min, x_min, y_max, x_max]
window outside of which the op should prune the points.
scope: name scope.
Returns:
new_dp_num_points: a tensor of shape [num_instances] that indicates how many
(non-padded) DensePose points there are per instance after pruning.
new_dp_part_ids: a tensor of shape [num_instances, num_points] with
DensePose part ids. These part_ids are 0-indexed, where the first
non-background part has index 0.
new_dp_surface_coords: a tensor of shape [num_instances, num_points, 4] with
DensePose surface coordinates after pruning.
"""
with tf.name_scope(scope, 'DensePosePruneOutsideWindow'):
y, x, _, _ = tf.unstack(dp_surface_coords, axis=-1)
win_y_min, win_x_min, win_y_max, win_x_max = tf.unstack(window)
num_instances, num_points = shape_utils.combined_static_and_dynamic_shape(
dp_part_ids)
dp_num_points_tiled = tf.tile(dp_num_points[:, tf.newaxis],
multiples=[1, num_points])
range_tiled = tf.tile(tf.range(num_points)[tf.newaxis, :],
multiples=[num_instances, 1])
valid_initial = range_tiled < dp_num_points_tiled
valid_in_window = tf.logical_and(
tf.logical_and(y >= win_y_min, y <= win_y_max),
tf.logical_and(x >= win_x_min, x <= win_x_max))
valid_indices = tf.logical_and(valid_initial, valid_in_window)
new_dp_num_points = tf.math.reduce_sum(
tf.cast(valid_indices, tf.int32), axis=1)
max_num_points = tf.math.reduce_max(new_dp_num_points)
def gather_and_reshuffle(elems):
dp_part_ids, dp_surface_coords, valid_indices = elems
locs = tf.where(valid_indices)[:, 0]
valid_part_ids = tf.gather(dp_part_ids, locs, axis=0)
valid_part_ids_padded = shape_utils.pad_or_clip_nd(
valid_part_ids, output_shape=[max_num_points])
valid_surface_coords = tf.gather(dp_surface_coords, locs, axis=0)
valid_surface_coords_padded = shape_utils.pad_or_clip_nd(
valid_surface_coords, output_shape=[max_num_points, 4])
return [valid_part_ids_padded, valid_surface_coords_padded]
new_dp_part_ids, new_dp_surface_coords = (
shape_utils.static_or_dynamic_map_fn(
gather_and_reshuffle,
elems=[dp_part_ids, dp_surface_coords, valid_indices],
dtype=[tf.int32, tf.float32],
back_prop=False))
return new_dp_num_points, new_dp_part_ids, new_dp_surface_coords
def change_coordinate_frame(dp_surface_coords, window, scope=None):
"""Changes coordinate frame of the points to be relative to window's frame.
Given a window of the form [y_min, x_min, y_max, x_max] in normalized
coordinates, changes DensePose coordinates to be relative to this window.
An example use case is data augmentation: where we are given groundtruth
points and would like to randomly crop the image to some window. In this
case we need to change the coordinate frame of each sampled point to be
relative to this new window.
Args:
dp_surface_coords: a tensor of shape [num_instances, num_points, 4] with
DensePose surface coordinates in (y, x, v, u) format.
window: a tensor of shape [4] representing the [y_min, x_min, y_max, x_max]
window we should change the coordinate frame to.
scope: name scope.
Returns:
new_dp_surface_coords: a tensor of shape [num_instances, num_points, 4].
"""
with tf.name_scope(scope, 'DensePoseChangeCoordinateFrame'):
win_height = window[2] - window[0]
win_width = window[3] - window[1]
new_dp_surface_coords = scale(
dp_surface_coords - [window[0], window[1], 0, 0],
1.0 / win_height, 1.0 / win_width)
return new_dp_surface_coords
def to_normalized_coordinates(dp_surface_coords, height, width,
check_range=True, scope=None):
"""Converts absolute DensePose coordinates to normalized in range [0, 1].
This function raises an assertion failed error at graph execution time when
the maximum coordinate is smaller than 1.01 (which means that coordinates are
already normalized). The value 1.01 is to deal with small rounding errors.
Args:
dp_surface_coords: a tensor of shape [num_instances, num_points, 4] with
DensePose absolute surface coordinates in (y, x, v, u) format.
height: Height of image.
width: Width of image.
check_range: If True, checks if the coordinates are already normalized.
scope: name scope.
Returns:
A tensor of shape [num_instances, num_points, 4] with normalized
coordinates.
"""
with tf.name_scope(scope, 'DensePoseToNormalizedCoordinates'):
height = tf.cast(height, tf.float32)
width = tf.cast(width, tf.float32)
if check_range:
max_val = tf.reduce_max(dp_surface_coords[:, :, :2])
max_assert = tf.Assert(tf.greater(max_val, 1.01),
['max value is lower than 1.01: ', max_val])
with tf.control_dependencies([max_assert]):
width = tf.identity(width)
return scale(dp_surface_coords, 1.0 / height, 1.0 / width)
def to_absolute_coordinates(dp_surface_coords, height, width,
check_range=True, scope=None):
"""Converts normalized DensePose coordinates to absolute pixel coordinates.
This function raises an assertion failed error when the maximum
coordinate value is larger than 1.01 (in which case coordinates are already
absolute).
Args:
dp_surface_coords: a tensor of shape [num_instances, num_points, 4] with
DensePose normalized surface coordinates in (y, x, v, u) format.
height: Height of image.
width: Width of image.
check_range: If True, checks if the coordinates are normalized or not.
scope: name scope.
Returns:
A tensor of shape [num_instances, num_points, 4] with absolute coordinates.
"""
with tf.name_scope(scope, 'DensePoseToAbsoluteCoordinates'):
height = tf.cast(height, tf.float32)
width = tf.cast(width, tf.float32)
if check_range:
max_val = tf.reduce_max(dp_surface_coords[:, :, :2])
max_assert = tf.Assert(tf.greater_equal(1.01, max_val),
['maximum coordinate value is larger than 1.01: ',
max_val])
with tf.control_dependencies([max_assert]):
width = tf.identity(width)
return scale(dp_surface_coords, height, width)
class DensePoseHorizontalFlip(object):
"""Class responsible for horizontal flipping of parts and surface coords."""
def __init__(self):
"""Constructor."""
uv_symmetry_transforms_path = os.path.join(
tf.resource_loader.get_data_files_path(), '..', 'dataset_tools',
'densepose', 'UV_symmetry_transforms.mat')
data = scipy.io.loadmat(uv_symmetry_transforms_path)
# Create lookup maps which indicate how a VU coordinate changes after a
# horizontal flip.
uv_symmetry_map = {}
for key in ('U_transforms', 'V_transforms'):
uv_symmetry_map_per_part = []
for i in range(data[key].shape[1]):
# The following tensor has shape [256, 256].
map_per_part = tf.constant(data[key][0, i], dtype=tf.float32)
uv_symmetry_map_per_part.append(map_per_part)
uv_symmetry_map[key] = tf.reshape(
tf.stack(uv_symmetry_map_per_part, axis=0), [-1])
# The following dictionary contains flattened lookup maps for the U and V
# coordinates separately. The shape of each is [24 * 256 * 256].
self.uv_symmetries = uv_symmetry_map
# Create a list of that maps part index to flipped part index (0-indexed).
part_symmetries = []
for i, part_name in enumerate(PART_NAMES):
if b'left' in part_name:
part_symmetries.append(PART_NAMES.index(
part_name.replace(b'left', b'right')))
elif b'right' in part_name:
part_symmetries.append(PART_NAMES.index(
part_name.replace(b'right', b'left')))
else:
part_symmetries.append(i)
self.part_symmetries = part_symmetries
def flip_parts_and_coords(self, part_ids, vu):
"""Flips part ids and coordinates.
Args:
part_ids: a [num_instances, num_points] int32 tensor with pre-flipped part
ids. These part_ids are 0-indexed, where the first non-background part
has index 0.
vu: a [num_instances, num_points, 2] float32 tensor with pre-flipped vu
normalized coordinates.
Returns:
new_part_ids: a [num_instances, num_points] int32 tensor with post-flipped
part ids. These part_ids are 0-indexed, where the first non-background
part has index 0.
new_vu: a [num_instances, num_points, 2] float32 tensor with post-flipped
vu coordinates.
"""
num_instances, num_points = shape_utils.combined_static_and_dynamic_shape(
part_ids)
part_ids_flattened = tf.reshape(part_ids, [-1])
new_part_ids_flattened = tf.gather(self.part_symmetries, part_ids_flattened)
new_part_ids = tf.reshape(new_part_ids_flattened,
[num_instances, num_points])
# Convert VU floating point coordinates to values in [256, 256] grid.
vu = tf.math.minimum(tf.math.maximum(vu, 0.0), 1.0)
vu_locs = tf.cast(vu * 256., dtype=tf.int32)
vu_locs_flattened = tf.reshape(vu_locs, [-1, 2])
v_locs_flattened, u_locs_flattened = tf.unstack(vu_locs_flattened, axis=1)
# Convert vu_locs into lookup indices (in flattened part symmetries map).
symmetry_lookup_inds = (
part_ids_flattened * 65536 + 256 * v_locs_flattened + u_locs_flattened)
# New VU coordinates.
v_new = tf.gather(self.uv_symmetries['V_transforms'], symmetry_lookup_inds)
u_new = tf.gather(self.uv_symmetries['U_transforms'], symmetry_lookup_inds)
new_vu_flattened = tf.stack([v_new, u_new], axis=1)
new_vu = tf.reshape(new_vu_flattened, [num_instances, num_points, 2])
return new_part_ids, new_vu
def flip_horizontal(dp_part_ids, dp_surface_coords, scope=None):
"""Flips the DensePose points horizontally around the flip_point.
This operation flips dense pose annotations horizontally. Note that part ids
and surface coordinates may or may not change as a result of the flip.
Args:
dp_part_ids: a tensor of shape [num_instances, num_points] with DensePose
part ids. These part_ids are 0-indexed, where the first non-background
part has index 0.
dp_surface_coords: a tensor of shape [num_instances, num_points, 4] with
DensePose surface coordinates in (y, x, v, u) normalized format.
scope: name scope.
Returns:
new_dp_part_ids: a tensor of shape [num_instances, num_points] with
DensePose part ids after flipping.
new_dp_surface_coords: a tensor of shape [num_instances, num_points, 4] with
DensePose surface coordinates after flipping.
"""
with tf.name_scope(scope, 'DensePoseFlipHorizontal'):
# First flip x coordinate.
y, x, vu = tf.split(dp_surface_coords, num_or_size_splits=[1, 1, 2], axis=2)
xflipped = 1.0 - x
# Flip part ids and surface coordinates.
horizontal_flip = DensePoseHorizontalFlip()
new_dp_part_ids, new_vu = horizontal_flip.flip_parts_and_coords(
dp_part_ids, vu)
new_dp_surface_coords = tf.concat([y, xflipped, new_vu], axis=2)
return new_dp_part_ids, new_dp_surface_coords
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for object_detection.core.densepose_ops."""
import numpy as np
import tensorflow.compat.v1 as tf
from object_detection.core import densepose_ops
from object_detection.utils import test_case
class DensePoseOpsTest(test_case.TestCase):
"""Tests for common DensePose operations."""
def test_scale(self):
def graph_fn():
dp_surface_coords = tf.constant([
[[0.0, 0.0, 0.1, 0.2], [100.0, 200.0, 0.3, 0.4]],
[[50.0, 120.0, 0.5, 0.6], [100.0, 140.0, 0.7, 0.8]]
])
y_scale = tf.constant(1.0 / 100)
x_scale = tf.constant(1.0 / 200)
output = densepose_ops.scale(dp_surface_coords, y_scale, x_scale)
return output
output = self.execute(graph_fn, [])
expected_dp_surface_coords = np.array([
[[0., 0., 0.1, 0.2], [1.0, 1.0, 0.3, 0.4]],
[[0.5, 0.6, 0.5, 0.6], [1.0, 0.7, 0.7, 0.8]]
])
self.assertAllClose(output, expected_dp_surface_coords)
def test_clip_to_window(self):
def graph_fn():
dp_surface_coords = tf.constant([
[[0.25, 0.5, 0.1, 0.2], [0.75, 0.75, 0.3, 0.4]],
[[0.5, 0.0, 0.5, 0.6], [1.0, 1.0, 0.7, 0.8]]
])
window = tf.constant([0.25, 0.25, 0.75, 0.75])
output = densepose_ops.clip_to_window(dp_surface_coords, window)
return output
output = self.execute(graph_fn, [])
expected_dp_surface_coords = np.array([
[[0.25, 0.5, 0.1, 0.2], [0.75, 0.75, 0.3, 0.4]],
[[0.5, 0.25, 0.5, 0.6], [0.75, 0.75, 0.7, 0.8]]
])
self.assertAllClose(output, expected_dp_surface_coords)
def test_prune_outside_window(self):
def graph_fn():
dp_num_points = tf.constant([2, 0, 1])
dp_part_ids = tf.constant([[1, 1], [0, 0], [16, 0]])
dp_surface_coords = tf.constant([
[[0.9, 0.5, 0.1, 0.2], [0.75, 0.75, 0.3, 0.4]],
[[0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0]],
[[0.8, 0.5, 0.6, 0.6], [0.5, 0.5, 0.7, 0.7]]
])
window = tf.constant([0.25, 0.25, 0.75, 0.75])
new_dp_num_points, new_dp_part_ids, new_dp_surface_coords = (
densepose_ops.prune_outside_window(dp_num_points, dp_part_ids,
dp_surface_coords, window))
return new_dp_num_points, new_dp_part_ids, new_dp_surface_coords
new_dp_num_points, new_dp_part_ids, new_dp_surface_coords = (
self.execute_cpu(graph_fn, []))
expected_dp_num_points = np.array([1, 0, 0])
expected_dp_part_ids = np.array([[1], [0], [0]])
expected_dp_surface_coords = np.array([
[[0.75, 0.75, 0.3, 0.4]],
[[0.0, 0.0, 0.0, 0.0]],
[[0.0, 0.0, 0.0, 0.0]]
])
self.assertAllEqual(new_dp_num_points, expected_dp_num_points)
self.assertAllEqual(new_dp_part_ids, expected_dp_part_ids)
self.assertAllClose(new_dp_surface_coords, expected_dp_surface_coords)
def test_change_coordinate_frame(self):
def graph_fn():
dp_surface_coords = tf.constant([
[[0.25, 0.5, 0.1, 0.2], [0.75, 0.75, 0.3, 0.4]],
[[0.5, 0.0, 0.5, 0.6], [1.0, 1.0, 0.7, 0.8]]
])
window = tf.constant([0.25, 0.25, 0.75, 0.75])
output = densepose_ops.change_coordinate_frame(dp_surface_coords, window)
return output
output = self.execute(graph_fn, [])
expected_dp_surface_coords = np.array([
[[0, 0.5, 0.1, 0.2], [1.0, 1.0, 0.3, 0.4]],
[[0.5, -0.5, 0.5, 0.6], [1.5, 1.5, 0.7, 0.8]]
])
self.assertAllClose(output, expected_dp_surface_coords)
def test_to_normalized_coordinates(self):
def graph_fn():
dp_surface_coords = tf.constant([
[[10., 30., 0.1, 0.2], [30., 45., 0.3, 0.4]],
[[20., 0., 0.5, 0.6], [40., 60., 0.7, 0.8]]
])
output = densepose_ops.to_normalized_coordinates(
dp_surface_coords, 40, 60)
return output
output = self.execute(graph_fn, [])
expected_dp_surface_coords = np.array([
[[0.25, 0.5, 0.1, 0.2], [0.75, 0.75, 0.3, 0.4]],
[[0.5, 0.0, 0.5, 0.6], [1.0, 1.0, 0.7, 0.8]]
])
self.assertAllClose(output, expected_dp_surface_coords)
def test_to_absolute_coordinates(self):
def graph_fn():
dp_surface_coords = tf.constant([
[[0.25, 0.5, 0.1, 0.2], [0.75, 0.75, 0.3, 0.4]],
[[0.5, 0.0, 0.5, 0.6], [1.0, 1.0, 0.7, 0.8]]
])
output = densepose_ops.to_absolute_coordinates(
dp_surface_coords, 40, 60)
return output
output = self.execute(graph_fn, [])
expected_dp_surface_coords = np.array([
[[10., 30., 0.1, 0.2], [30., 45., 0.3, 0.4]],
[[20., 0., 0.5, 0.6], [40., 60., 0.7, 0.8]]
])
self.assertAllClose(output, expected_dp_surface_coords)
def test_horizontal_flip(self):
part_ids_np = np.array([[1, 4], [0, 8]], dtype=np.int32)
surf_coords_np = np.array([
[[0.1, 0.7, 0.2, 0.4], [0.3, 0.8, 0.2, 0.4]],
[[0.0, 0.5, 0.8, 0.7], [0.6, 1.0, 0.7, 0.9]],
], dtype=np.float32)
def graph_fn():
part_ids = tf.constant(part_ids_np, dtype=tf.int32)
surf_coords = tf.constant(surf_coords_np, dtype=tf.float32)
flipped_part_ids, flipped_surf_coords = densepose_ops.flip_horizontal(
part_ids, surf_coords)
flipped_twice_part_ids, flipped_twice_surf_coords = (
densepose_ops.flip_horizontal(flipped_part_ids, flipped_surf_coords))
return (flipped_part_ids, flipped_surf_coords,
flipped_twice_part_ids, flipped_twice_surf_coords)
(flipped_part_ids, flipped_surf_coords, flipped_twice_part_ids,
flipped_twice_surf_coords) = self.execute(graph_fn, [])
expected_flipped_part_ids = [[1, 5], # 1->1, 4->5
[0, 9]] # 0->0, 8->9
expected_flipped_surf_coords_yx = np.array([
[[0.1, 1.0-0.7], [0.3, 1.0-0.8]],
[[0.0, 1.0-0.5], [0.6, 1.0-1.0]],
], dtype=np.float32)
self.assertAllEqual(expected_flipped_part_ids, flipped_part_ids)
self.assertAllClose(expected_flipped_surf_coords_yx,
flipped_surf_coords[:, :, 0:2])
self.assertAllEqual(part_ids_np, flipped_twice_part_ids)
self.assertAllClose(surf_coords_np, flipped_twice_surf_coords, rtol=1e-2,
atol=1e-2)
if __name__ == '__main__':
tf.test.main()
......@@ -119,6 +119,24 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
])
return tf.constant(keypoints, dtype=tf.float32)
def createTestDensePose(self):
dp_num_points = tf.constant([1, 3], dtype=tf.int32)
dp_part_ids = tf.constant(
[[4, 0, 0],
[1, 0, 5]], dtype=tf.int32)
dp_surface_coords = tf.constant(
[
# Instance 0.
[[0.1, 0.2, 0.6, 0.7],
[0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0]],
# Instance 1.
[[0.8, 0.9, 0.2, 0.4],
[0.1, 0.3, 0.2, 0.8],
[0.6, 1.0, 0.3, 0.4]],
], dtype=tf.float32)
return dp_num_points, dp_part_ids, dp_surface_coords
def createKeypointFlipPermutation(self):
return [0, 2, 1]
......@@ -694,7 +712,11 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
test_masks=True,
test_keypoints=True)
def testRunRandomHorizontalFlipWithMaskAndKeypoints(self):
@parameterized.parameters(
{'include_dense_pose': False},
{'include_dense_pose': True}
)
def testRunRandomHorizontalFlipWithMaskAndKeypoints(self, include_dense_pose):
def graph_fn():
preprocess_options = [(preprocessor.random_horizontal_flip, {})]
......@@ -704,6 +726,7 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
boxes = self.createTestBoxes()
masks = self.createTestMasks()
keypoints, keypoint_visibilities = self.createTestKeypoints()
dp_num_point, dp_part_ids, dp_surface_coords = self.createTestDensePose()
keypoint_flip_permutation = self.createKeypointFlipPermutation()
tensor_dict = {
fields.InputDataFields.image:
......@@ -717,13 +740,21 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
fields.InputDataFields.groundtruth_keypoint_visibilities:
keypoint_visibilities
}
if include_dense_pose:
tensor_dict.update({
fields.InputDataFields.groundtruth_dp_num_points: dp_num_point,
fields.InputDataFields.groundtruth_dp_part_ids: dp_part_ids,
fields.InputDataFields.groundtruth_dp_surface_coords:
dp_surface_coords
})
preprocess_options = [(preprocessor.random_horizontal_flip, {
'keypoint_flip_permutation': keypoint_flip_permutation
})]
preprocessor_arg_map = preprocessor.get_default_func_arg_map(
include_instance_masks=True,
include_keypoints=True,
include_keypoint_visibilities=True)
include_keypoint_visibilities=True,
include_dense_pose=include_dense_pose)
tensor_dict = preprocessor.preprocess(
tensor_dict, preprocess_options, func_arg_map=preprocessor_arg_map)
boxes = tensor_dict[fields.InputDataFields.groundtruth_boxes]
......@@ -731,14 +762,26 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
keypoints = tensor_dict[fields.InputDataFields.groundtruth_keypoints]
keypoint_visibilities = tensor_dict[
fields.InputDataFields.groundtruth_keypoint_visibilities]
return [boxes, masks, keypoints, keypoint_visibilities]
boxes, masks, keypoints, keypoint_visibilities = self.execute_cpu(
graph_fn, [])
self.assertIsNotNone(boxes)
self.assertIsNotNone(masks)
self.assertIsNotNone(keypoints)
self.assertIsNotNone(keypoint_visibilities)
output_tensors = [boxes, masks, keypoints, keypoint_visibilities]
if include_dense_pose:
dp_num_points = tensor_dict[
fields.InputDataFields.groundtruth_dp_num_points]
dp_part_ids = tensor_dict[
fields.InputDataFields.groundtruth_dp_part_ids]
dp_surface_coords = tensor_dict[
fields.InputDataFields.groundtruth_dp_surface_coords]
output_tensors.extend([dp_num_points, dp_part_ids, dp_surface_coords])
return output_tensors
output_tensors = self.execute_cpu(graph_fn, [])
self.assertIsNotNone(output_tensors[0]) # Boxes.
self.assertIsNotNone(output_tensors[1]) # Masks.
self.assertIsNotNone(output_tensors[2]) # Keypoints
self.assertIsNotNone(output_tensors[3]) # Keypoint Visibilities.
if include_dense_pose:
self.assertIsNotNone(output_tensors[4]) # DensePose Num Points.
self.assertIsNotNone(output_tensors[5]) # DensePose Part IDs.
self.assertIsNotNone(output_tensors[6]) # DensePose Surface Coords
def testRandomVerticalFlip(self):
......@@ -1886,6 +1929,65 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
self.assertAllClose(
distorted_keypoints_.flatten(), expected_keypoints.flatten())
def testRunRandomCropImageWithDensePose(self):
def graph_fn():
image = self.createColorfulTestImage()
boxes = self.createTestBoxes()
labels = self.createTestLabels()
weights = self.createTestGroundtruthWeights()
dp_num_points, dp_part_ids, dp_surface_coords = self.createTestDensePose()
tensor_dict = {
fields.InputDataFields.image: image,
fields.InputDataFields.groundtruth_boxes: boxes,
fields.InputDataFields.groundtruth_classes: labels,
fields.InputDataFields.groundtruth_weights: weights,
fields.InputDataFields.groundtruth_dp_num_points: dp_num_points,
fields.InputDataFields.groundtruth_dp_part_ids: dp_part_ids,
fields.InputDataFields.groundtruth_dp_surface_coords:
dp_surface_coords
}
preprocessor_arg_map = preprocessor.get_default_func_arg_map(
include_dense_pose=True)
preprocessing_options = [(preprocessor.random_crop_image, {})]
with mock.patch.object(
tf.image,
'sample_distorted_bounding_box'
) as mock_sample_distorted_bounding_box:
mock_sample_distorted_bounding_box.return_value = (
tf.constant([6, 40, 0], dtype=tf.int32),
tf.constant([134, 340, -1], dtype=tf.int32),
tf.constant([[[0.03, 0.1, 0.7, 0.95]]], dtype=tf.float32))
distorted_tensor_dict = preprocessor.preprocess(
tensor_dict,
preprocessing_options,
func_arg_map=preprocessor_arg_map)
distorted_image = distorted_tensor_dict[fields.InputDataFields.image]
distorted_dp_num_points = distorted_tensor_dict[
fields.InputDataFields.groundtruth_dp_num_points]
distorted_dp_part_ids = distorted_tensor_dict[
fields.InputDataFields.groundtruth_dp_part_ids]
distorted_dp_surface_coords = distorted_tensor_dict[
fields.InputDataFields.groundtruth_dp_surface_coords]
return [distorted_image, distorted_dp_num_points, distorted_dp_part_ids,
distorted_dp_surface_coords]
(distorted_image_, distorted_dp_num_points_, distorted_dp_part_ids_,
distorted_dp_surface_coords_) = self.execute_cpu(graph_fn, [])
expected_dp_num_points = np.array([1, 1])
expected_dp_part_ids = np.array([[4], [0]])
expected_dp_surface_coords = np.array([
[[0.10447761, 0.1176470, 0.6, 0.7]],
[[0.10447761, 0.2352941, 0.2, 0.8]],
])
self.assertAllEqual(distorted_image_.shape, [1, 134, 340, 3])
self.assertAllEqual(distorted_dp_num_points_, expected_dp_num_points)
self.assertAllEqual(distorted_dp_part_ids_, expected_dp_part_ids)
self.assertAllClose(distorted_dp_surface_coords_,
expected_dp_surface_coords)
def testRunRetainBoxesAboveThreshold(self):
def graph_fn():
boxes = self.createTestBoxes()
......@@ -2276,7 +2378,11 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
self.assertTrue(np.all((boxes_[:, 3] - boxes_[:, 1]) >= (
padded_boxes_[:, 3] - padded_boxes_[:, 1])))
def testRandomPadImageWithKeypointsAndMasks(self):
@parameterized.parameters(
{'include_dense_pose': False},
{'include_dense_pose': True}
)
def testRandomPadImageWithKeypointsAndMasks(self, include_dense_pose):
def graph_fn():
preprocessing_options = [(preprocessor.normalize_image, {
'original_minval': 0,
......@@ -2290,12 +2396,15 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
labels = self.createTestLabels()
masks = self.createTestMasks()
keypoints, _ = self.createTestKeypoints()
_, _, dp_surface_coords = self.createTestDensePose()
tensor_dict = {
fields.InputDataFields.image: images,
fields.InputDataFields.groundtruth_boxes: boxes,
fields.InputDataFields.groundtruth_classes: labels,
fields.InputDataFields.groundtruth_instance_masks: masks,
fields.InputDataFields.groundtruth_keypoints: keypoints,
fields.InputDataFields.groundtruth_dp_surface_coords:
dp_surface_coords
}
tensor_dict = preprocessor.preprocess(tensor_dict, preprocessing_options)
images = tensor_dict[fields.InputDataFields.image]
......@@ -2304,7 +2413,8 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
func_arg_map = preprocessor.get_default_func_arg_map(
include_instance_masks=True,
include_keypoints=True,
include_keypoint_visibilities=True)
include_keypoint_visibilities=True,
include_dense_pose=include_dense_pose)
padded_tensor_dict = preprocessor.preprocess(tensor_dict,
preprocessing_options,
func_arg_map=func_arg_map)
......@@ -2323,15 +2433,29 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
padded_keypoints_shape = tf.shape(padded_keypoints)
images_shape = tf.shape(images)
padded_images_shape = tf.shape(padded_images)
return [boxes_shape, padded_boxes_shape, padded_masks_shape,
keypoints_shape, padded_keypoints_shape, images_shape,
padded_images_shape, boxes, padded_boxes, keypoints,
padded_keypoints]
(boxes_shape_, padded_boxes_shape_, padded_masks_shape_,
keypoints_shape_, padded_keypoints_shape_, images_shape_,
padded_images_shape_, boxes_, padded_boxes_,
keypoints_, padded_keypoints_) = self.execute_cpu(graph_fn, [])
outputs = [boxes_shape, padded_boxes_shape, padded_masks_shape,
keypoints_shape, padded_keypoints_shape, images_shape,
padded_images_shape, boxes, padded_boxes, keypoints,
padded_keypoints]
if include_dense_pose:
padded_dp_surface_coords = padded_tensor_dict[
fields.InputDataFields.groundtruth_dp_surface_coords]
outputs.extend([dp_surface_coords, padded_dp_surface_coords])
return outputs
outputs = self.execute_cpu(graph_fn, [])
boxes_shape_ = outputs[0]
padded_boxes_shape_ = outputs[1]
padded_masks_shape_ = outputs[2]
keypoints_shape_ = outputs[3]
padded_keypoints_shape_ = outputs[4]
images_shape_ = outputs[5]
padded_images_shape_ = outputs[6]
boxes_ = outputs[7]
padded_boxes_ = outputs[8]
keypoints_ = outputs[9]
padded_keypoints_ = outputs[10]
self.assertAllEqual(boxes_shape_, padded_boxes_shape_)
self.assertAllEqual(keypoints_shape_, padded_keypoints_shape_)
self.assertTrue((images_shape_[1] >= padded_images_shape_[1] * 0.5).all)
......@@ -2347,6 +2471,11 @@ class PreprocessorTest(test_case.TestCase, parameterized.TestCase):
padded_keypoints_[1, :, 0] - padded_keypoints_[0, :, 0])))
self.assertTrue(np.all((keypoints_[1, :, 1] - keypoints_[0, :, 1]) >= (
padded_keypoints_[1, :, 1] - padded_keypoints_[0, :, 1])))
if include_dense_pose:
dp_surface_coords = outputs[11]
padded_dp_surface_coords = outputs[12]
self.assertAllClose(padded_dp_surface_coords[:, :, 2:],
dp_surface_coords[:, :, 2:])
def testRandomAbsolutePadImage(self):
height_padding = 10
......
......@@ -43,6 +43,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import copy
import datetime
import io
......@@ -51,62 +52,11 @@ import json
import os
from absl import app
from absl import flags
import apache_beam as beam
import numpy as np
import PIL.Image
import six
import tensorflow as tf
from apache_beam import runners
flags.DEFINE_string('input_tfrecord', None, 'TFRecord containing images in '
'tf.Example format for object detection, with bounding'
'boxes and contextual feature embeddings.')
flags.DEFINE_string('output_tfrecord', None,
'TFRecord containing images in tf.Example format, with '
'added contextual memory banks.')
flags.DEFINE_string('sequence_key', None, 'Key to use when grouping sequences: '
'so far supports `image/seq_id` and `image/location`.')
flags.DEFINE_string('time_horizon', None, 'What time horizon to use when '
'splitting the data, if any. Options are: `year`, `month`,'
' `week`, `day `, `hour`, `minute`, `None`.')
flags.DEFINE_integer('subsample_context_features_rate', 0, 'Whether to '
'subsample the context_features, and if so how many to '
'sample. If the rate is set to X, it will sample context '
'from 1 out of every X images. Default is sampling from '
'every image, which is X=0.')
flags.DEFINE_boolean('reduce_image_size', True, 'downsamples images to'
'have longest side max_image_dimension, maintaining aspect'
' ratio')
flags.DEFINE_integer('max_image_dimension', 1024, 'sets max image dimension')
flags.DEFINE_boolean('add_context_features', True, 'adds a memory bank of'
'embeddings to each clip')
flags.DEFINE_boolean('sorted_image_ids', True, 'whether the image source_ids '
'are sortable to deal with date_captured tie-breaks')
flags.DEFINE_string('image_ids_to_keep', 'All', 'path to .json list of image'
'ids to keep, used for ground truth eval creation')
flags.DEFINE_boolean('keep_context_features_image_id_list', False, 'Whether or '
'not to keep a list of the image_ids corresponding to the '
'memory bank')
flags.DEFINE_boolean('keep_only_positives', False, 'Whether or not to '
'keep only positive boxes based on score')
flags.DEFINE_boolean('keep_only_positives_gt', False, 'Whether or not to '
'keep only positive boxes based on gt class')
flags.DEFINE_float('context_features_score_threshold', 0.7, 'What score '
'threshold to use for boxes in context_features')
flags.DEFINE_integer('max_num_elements_in_context_features', 2000, 'Sets max '
'num elements per memory bank')
flags.DEFINE_integer('num_shards', 0, 'Number of output shards.')
flags.DEFINE_string('output_type', 'tf_sequence_example', 'Output type, one of '
'`tf_example`, `tf_sequence_example`')
flags.DEFINE_integer('max_clip_length', None, 'Max length for sequence '
'example outputs.')
FLAGS = flags.FLAGS
DEFAULT_FEATURE_LENGTH = 2057
import tensorflow.compat.v1 as tf
class ReKeyDataFn(beam.DoFn):
......@@ -406,7 +356,8 @@ class GenerateContextFn(beam.DoFn):
keep_only_positives_gt=False,
max_num_elements_in_context_features=5000,
pad_context_features=False,
output_type='tf_example', max_clip_length=None):
output_type='tf_example', max_clip_length=None,
context_feature_length=2057):
"""Initialization function.
Args:
......@@ -432,6 +383,8 @@ class GenerateContextFn(beam.DoFn):
output_type: What type of output, tf_example of tf_sequence_example
max_clip_length: The maximum length of a sequence example, before
splitting into multiple
context_feature_length: The length of the context feature embeddings
stored in the input data.
"""
self._session = None
self._num_examples_processed = beam.metrics.Metrics.counter(
......@@ -456,6 +409,7 @@ class GenerateContextFn(beam.DoFn):
self._context_features_score_threshold = context_features_score_threshold
self._max_num_elements_in_context_features = (
max_num_elements_in_context_features)
self._context_feature_length = context_feature_length
self._images_kept = beam.metrics.Metrics.counter(
'sequence_data_generation', 'images_kept')
......@@ -506,9 +460,9 @@ class GenerateContextFn(beam.DoFn):
context_features_image_id_list.append(example_image_id)
if not example_embedding:
example_embedding.append(np.zeros(DEFAULT_FEATURE_LENGTH))
example_embedding.append(np.zeros(self._context_feature_length))
feature_length = DEFAULT_FEATURE_LENGTH
feature_length = self._context_feature_length
# If the example_list is not empty and image/embedding_length is in the
# featture dict, feature_length will be assigned to that. Otherwise, it will
......@@ -703,7 +657,8 @@ class GenerateContextFn(beam.DoFn):
return list_of_examples
def construct_pipeline(input_tfrecord,
def construct_pipeline(pipeline,
input_tfrecord,
output_tfrecord,
sequence_key,
time_horizon=None,
......@@ -720,10 +675,12 @@ def construct_pipeline(input_tfrecord,
max_num_elements_in_context_features=5000,
num_shards=0,
output_type='tf_example',
max_clip_length=None):
max_clip_length=None,
context_feature_length=2057):
"""Returns a beam pipeline to run object detection inference.
Args:
pipeline: Initialized beam pipeline.
input_tfrecord: An TFRecord of tf.train.Example protos containing images.
output_tfrecord: An TFRecord of tf.train.Example protos that contain images
in the input TFRecord and the detections from the model.
......@@ -755,91 +712,224 @@ def construct_pipeline(input_tfrecord,
output_type: What type of output, tf_example of tf_sequence_example
max_clip_length: The maximum length of a sequence example, before
splitting into multiple
context_feature_length: The length of the context feature embeddings stored
in the input data.
"""
def pipeline(root):
if output_type == 'tf_example':
coder = beam.coders.ProtoCoder(tf.train.Example)
elif output_type == 'tf_sequence_example':
coder = beam.coders.ProtoCoder(tf.train.SequenceExample)
else:
raise ValueError('Unsupported output type.')
input_collection = (
root | 'ReadInputTFRecord' >> beam.io.tfrecordio.ReadFromTFRecord(
input_tfrecord,
coder=beam.coders.BytesCoder()))
rekey_collection = input_collection | 'RekeyExamples' >> beam.ParDo(
ReKeyDataFn(sequence_key, time_horizon,
reduce_image_size, max_image_dimension))
grouped_collection = (
rekey_collection | 'GroupBySequenceKey' >> beam.GroupByKey())
grouped_collection = (
grouped_collection | 'ReshuffleGroups' >> beam.Reshuffle())
ordered_collection = (
grouped_collection | 'OrderByFrameNumber' >> beam.ParDo(
SortGroupedDataFn(sequence_key, sorted_image_ids,
max_num_elements_in_context_features)))
ordered_collection = (
ordered_collection | 'ReshuffleSortedGroups' >> beam.Reshuffle())
output_collection = (
ordered_collection | 'AddContextToExamples' >> beam.ParDo(
GenerateContextFn(
sequence_key, add_context_features, image_ids_to_keep,
keep_context_features_image_id_list=(
keep_context_features_image_id_list),
subsample_context_features_rate=subsample_context_features_rate,
keep_only_positives=keep_only_positives,
keep_only_positives_gt=keep_only_positives_gt,
context_features_score_threshold=(
context_features_score_threshold),
max_num_elements_in_context_features=(
max_num_elements_in_context_features),
output_type=output_type,
max_clip_length=max_clip_length)))
output_collection = (
output_collection | 'ReshuffleExamples' >> beam.Reshuffle())
_ = output_collection | 'WritetoDisk' >> beam.io.tfrecordio.WriteToTFRecord(
output_tfrecord,
num_shards=num_shards,
coder=coder)
return pipeline
def main(_):
"""Runs the Beam pipeline that builds context features.
if output_type == 'tf_example':
coder = beam.coders.ProtoCoder(tf.train.Example)
elif output_type == 'tf_sequence_example':
coder = beam.coders.ProtoCoder(tf.train.SequenceExample)
else:
raise ValueError('Unsupported output type.')
input_collection = (
pipeline | 'ReadInputTFRecord' >> beam.io.tfrecordio.ReadFromTFRecord(
input_tfrecord,
coder=beam.coders.BytesCoder()))
rekey_collection = input_collection | 'RekeyExamples' >> beam.ParDo(
ReKeyDataFn(sequence_key, time_horizon,
reduce_image_size, max_image_dimension))
grouped_collection = (
rekey_collection | 'GroupBySequenceKey' >> beam.GroupByKey())
grouped_collection = (
grouped_collection | 'ReshuffleGroups' >> beam.Reshuffle())
ordered_collection = (
grouped_collection | 'OrderByFrameNumber' >> beam.ParDo(
SortGroupedDataFn(sequence_key, sorted_image_ids,
max_num_elements_in_context_features)))
ordered_collection = (
ordered_collection | 'ReshuffleSortedGroups' >> beam.Reshuffle())
output_collection = (
ordered_collection | 'AddContextToExamples' >> beam.ParDo(
GenerateContextFn(
sequence_key, add_context_features, image_ids_to_keep,
keep_context_features_image_id_list=(
keep_context_features_image_id_list),
subsample_context_features_rate=subsample_context_features_rate,
keep_only_positives=keep_only_positives,
keep_only_positives_gt=keep_only_positives_gt,
context_features_score_threshold=(
context_features_score_threshold),
max_num_elements_in_context_features=(
max_num_elements_in_context_features),
output_type=output_type,
max_clip_length=max_clip_length,
context_feature_length=context_feature_length)))
output_collection = (
output_collection | 'ReshuffleExamples' >> beam.Reshuffle())
_ = output_collection | 'WritetoDisk' >> beam.io.tfrecordio.WriteToTFRecord(
output_tfrecord,
num_shards=num_shards,
coder=coder)
def parse_args(argv):
"""Command-line argument parser.
Args:
_: unused
argv: command line arguments
Returns:
beam_args: Arguments for the beam pipeline.
pipeline_args: Arguments for the pipeline options, such as runner type.
"""
# must create before flags are used
runner = runners.DirectRunner()
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_tfrecord',
dest='input_tfrecord',
required=True,
help='TFRecord containing images in tf.Example format for object '
'detection, with bounding boxes and contextual feature embeddings.')
parser.add_argument(
'--output_tfrecord',
dest='output_tfrecord',
required=True,
help='TFRecord containing images in tf.Example format, with added '
'contextual memory banks.')
parser.add_argument(
'--sequence_key',
dest='sequence_key',
default='image/location',
help='Key to use when grouping sequences: so far supports `image/seq_id` '
'and `image/location`.')
parser.add_argument(
'--context_feature_length',
dest='context_feature_length',
default=2057,
help='The length of the context feature embeddings stored in the input '
'data.')
parser.add_argument(
'--time_horizon',
dest='time_horizon',
default=None,
help='What time horizon to use when splitting the data, if any. Options '
'are: `year`, `month`, `week`, `day `, `hour`, `minute`, `None`.')
parser.add_argument(
'--subsample_context_features_rate',
dest='subsample_context_features_rate',
default=0,
help='Whether to subsample the context_features, and if so how many to '
'sample. If the rate is set to X, it will sample context from 1 out of '
'every X images. Default is sampling from every image, which is X=0.')
parser.add_argument(
'--reduce_image_size',
dest='reduce_image_size',
default=True,
help='downsamples images to have longest side max_image_dimension, '
'maintaining aspect ratio')
parser.add_argument(
'--max_image_dimension',
dest='max_image_dimension',
default=1024,
help='Sets max image dimension for resizing.')
parser.add_argument(
'--add_context_features',
dest='add_context_features',
default=True,
help='Adds a memory bank of embeddings to each clip')
parser.add_argument(
'--sorted_image_ids',
dest='sorted_image_ids',
default=True,
help='Whether the image source_ids are sortable to deal with '
'date_captured tie-breaks.')
parser.add_argument(
'--image_ids_to_keep',
dest='image_ids_to_keep',
default='All',
help='Path to .json list of image ids to keep, used for ground truth '
'eval creation.')
parser.add_argument(
'--keep_context_features_image_id_list',
dest='keep_context_features_image_id_list',
default=False,
help='Whether or not to keep a list of the image_ids corresponding to '
'the memory bank.')
parser.add_argument(
'--keep_only_positives',
dest='keep_only_positives',
default=False,
help='Whether or not to keep only positive boxes based on score.')
parser.add_argument(
'--context_features_score_threshold',
dest='context_features_score_threshold',
default=0.7,
help='What score threshold to use for boxes in context_features, when '
'`keep_only_positives` is set to `True`.')
parser.add_argument(
'--keep_only_positives_gt',
dest='keep_only_positives_gt',
default=False,
help='Whether or not to keep only positive boxes based on gt class.')
parser.add_argument(
'--max_num_elements_in_context_features',
dest='max_num_elements_in_context_features',
default=2000,
help='Sets max number of context feature elements per memory bank. '
'If the number of images in the context group is greater than '
'`max_num_elements_in_context_features`, the context group will be split.'
)
parser.add_argument(
'--output_type',
dest='output_type',
default='tf_example',
help='Output type, one of `tf_example`, `tf_sequence_example`.')
parser.add_argument(
'--max_clip_length',
dest='max_clip_length',
default=None,
help='Max length for sequence example outputs.')
parser.add_argument(
'--num_shards',
dest='num_shards',
default=0,
help='Number of output shards.')
beam_args, pipeline_args = parser.parse_known_args(argv)
return beam_args, pipeline_args
def main(argv=None, save_main_session=True):
"""Runs the Beam pipeline that performs inference.
dirname = os.path.dirname(FLAGS.output_tfrecord)
Args:
argv: Command line arguments.
save_main_session: Whether to save the main session.
"""
args, pipeline_args = parse_args(argv)
pipeline_options = beam.options.pipeline_options.PipelineOptions(
pipeline_args)
pipeline_options.view_as(
beam.options.pipeline_options.SetupOptions).save_main_session = (
save_main_session)
dirname = os.path.dirname(args.output_tfrecord)
tf.io.gfile.makedirs(dirname)
runner.run(
construct_pipeline(FLAGS.input_tfrecord,
FLAGS.output_tfrecord,
FLAGS.sequence_key,
FLAGS.time_horizon,
FLAGS.subsample_context_features_rate,
FLAGS.reduce_image_size,
FLAGS.max_image_dimension,
FLAGS.add_context_features,
FLAGS.sorted_image_ids,
FLAGS.image_ids_to_keep,
FLAGS.keep_context_features_image_id_list,
FLAGS.keep_only_positives,
FLAGS.context_features_score_threshold,
FLAGS.keep_only_positives_gt,
FLAGS.max_num_elements_in_context_features,
FLAGS.num_shards,
FLAGS.output_type,
FLAGS.max_clip_length))
p = beam.Pipeline(options=pipeline_options)
construct_pipeline(
p,
args.input_tfrecord,
args.output_tfrecord,
args.sequence_key,
args.time_horizon,
args.subsample_context_features_rate,
args.reduce_image_size,
args.max_image_dimension,
args.add_context_features,
args.sorted_image_ids,
args.image_ids_to_keep,
args.keep_context_features_image_id_list,
args.keep_only_positives,
args.context_features_score_threshold,
args.keep_only_positives_gt,
args.max_num_elements_in_context_features,
args.output_type,
args.max_clip_length,
args.context_feature_length)
p.run()
if __name__ == '__main__':
flags.mark_flags_as_required([
'input_tfrecord',
'output_tfrecord'
])
app.run(main)
......@@ -22,13 +22,13 @@ import datetime
import os
import tempfile
import unittest
import apache_beam as beam
import numpy as np
import six
import tensorflow.compat.v1 as tf
from object_detection.dataset_tools.context_rcnn import add_context_to_examples
from object_detection.utils import tf_version
from apache_beam import runners
@contextlib.contextmanager
......@@ -329,19 +329,22 @@ class GenerateContextDataTest(tf.test.TestCase):
with InMemoryTFRecord(
[self._create_first_tf_example(),
self._create_second_tf_example()]) as input_tfrecord:
runner = runners.DirectRunner()
temp_dir = tempfile.mkdtemp(dir=os.environ.get('TEST_TMPDIR'))
output_tfrecord = os.path.join(temp_dir, 'output_tfrecord')
sequence_key = six.ensure_binary('image/seq_id')
max_num_elements = 10
num_shards = 1
pipeline = add_context_to_examples.construct_pipeline(
pipeline_options = beam.options.pipeline_options.PipelineOptions(
runner='DirectRunner')
p = beam.Pipeline(options=pipeline_options)
add_context_to_examples.construct_pipeline(
p,
input_tfrecord,
output_tfrecord,
sequence_key,
max_num_elements_in_context_features=max_num_elements,
num_shards=num_shards)
runner.run(pipeline)
p.run()
filenames = tf.io.gfile.glob(output_tfrecord + '-?????-of-?????')
actual_output = []
record_iterator = tf.python_io.tf_record_iterator(path=filenames[0])
......@@ -355,20 +358,23 @@ class GenerateContextDataTest(tf.test.TestCase):
with InMemoryTFRecord(
[self._create_first_tf_example(),
self._create_second_tf_example()]) as input_tfrecord:
runner = runners.DirectRunner()
temp_dir = tempfile.mkdtemp(dir=os.environ.get('TEST_TMPDIR'))
output_tfrecord = os.path.join(temp_dir, 'output_tfrecord')
sequence_key = six.ensure_binary('image/seq_id')
max_num_elements = 10
num_shards = 1
pipeline = add_context_to_examples.construct_pipeline(
pipeline_options = beam.options.pipeline_options.PipelineOptions(
runner='DirectRunner')
p = beam.Pipeline(options=pipeline_options)
add_context_to_examples.construct_pipeline(
p,
input_tfrecord,
output_tfrecord,
sequence_key,
max_num_elements_in_context_features=max_num_elements,
num_shards=num_shards,
output_type='tf_sequence_example')
runner.run(pipeline)
p.run()
filenames = tf.io.gfile.glob(output_tfrecord + '-?????-of-?????')
actual_output = []
record_iterator = tf.python_io.tf_record_iterator(
......
......@@ -33,32 +33,19 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import hashlib
import io
import json
import logging
import os
from absl import app
from absl import flags
import apache_beam as beam
import numpy as np
import PIL.Image
import tensorflow.compat.v1 as tf
from apache_beam import runners
from object_detection.utils import dataset_util
flags.DEFINE_string('image_directory', None, 'Directory where images are '
'stored')
flags.DEFINE_string('output_tfrecord_prefix', None,
'TFRecord containing images in tf.Example format.')
flags.DEFINE_string('input_annotations_file', None, 'Path to Coco-CameraTraps'
'style annotations file')
flags.DEFINE_integer('num_images_per_shard',
200,
'The number of images to be stored in each shard.')
FLAGS = flags.FLAGS
class ParseImage(beam.DoFn):
"""A DoFn that parses a COCO-CameraTraps json and emits TFRecords."""
......@@ -243,13 +230,14 @@ class ParseImage(beam.DoFn):
return [(example)]
def _load_json_data(data_file):
def load_json_data(data_file):
with tf.io.gfile.GFile(data_file, 'r') as fid:
data_dict = json.load(fid)
return data_dict
def create_pipeline(image_directory,
def create_pipeline(pipeline,
image_directory,
input_annotations_file,
output_tfrecord_prefix=None,
num_images_per_shard=200,
......@@ -257,68 +245,97 @@ def create_pipeline(image_directory,
"""Creates a beam pipeline for producing a COCO-CameraTraps Image dataset.
Args:
pipeline: Initialized beam pipeline.
image_directory: Path to image directory
input_annotations_file: Path to a coco-cameratraps annotation file
output_tfrecord_prefix: Absolute path for tfrecord outputs. Final files will
be named {output_tfrecord_prefix}@N.
num_images_per_shard: The number of images to store in each shard
keep_bboxes: Whether to keep any bounding boxes that exist in the json file
Returns:
A Beam pipeline.
"""
logging.info('Reading data from COCO-CameraTraps Dataset.')
data = _load_json_data(input_annotations_file)
data = load_json_data(input_annotations_file)
num_shards = int(np.ceil(float(len(data['images']))/num_images_per_shard))
def pipeline(root):
"""Builds beam pipeline."""
image_examples = (
pipeline | ('CreateCollections') >> beam.Create(
[im['id'] for im in data['images']])
| ('ParseImage') >> beam.ParDo(ParseImage(
image_directory, data['images'], data['annotations'],
data['categories'], keep_bboxes=keep_bboxes)))
_ = (image_examples
| ('Reshuffle') >> beam.Reshuffle()
| ('WriteTfImageExample') >> beam.io.tfrecordio.WriteToTFRecord(
output_tfrecord_prefix,
num_shards=num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example)))
image_examples = (
root
| ('CreateCollections') >> beam.Create(
[im['id'] for im in data['images']])
| ('ParseImage') >> beam.ParDo(ParseImage(
image_directory, data['images'], data['annotations'],
data['categories'], keep_bboxes=keep_bboxes)))
_ = (image_examples
| ('Reshuffle') >> beam.Reshuffle()
| ('WriteTfImageExample') >> beam.io.tfrecordio.WriteToTFRecord(
output_tfrecord_prefix,
num_shards=num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example)))
return pipeline
def parse_args(argv):
"""Command-line argument parser.
def main(_):
Args:
argv: command line arguments
Returns:
beam_args: Arguments for the beam pipeline.
pipeline_args: Arguments for the pipeline options, such as runner type.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--image_directory',
dest='image_directory',
required=True,
help='Path to the directory where the images are stored.')
parser.add_argument(
'--output_tfrecord_prefix',
dest='output_tfrecord_prefix',
required=True,
help='Path and prefix to store TFRecords containing images in tf.Example'
'format.')
parser.add_argument(
'--input_annotations_file',
dest='input_annotations_file',
required=True,
help='Path to Coco-CameraTraps style annotations file.')
parser.add_argument(
'--num_images_per_shard',
dest='num_images_per_shard',
default=200,
help='The number of images to be stored in each outputshard.')
beam_args, pipeline_args = parser.parse_known_args(argv)
return beam_args, pipeline_args
def main(argv=None, save_main_session=True):
"""Runs the Beam pipeline that performs inference.
Args:
_: unused
argv: Command line arguments.
save_main_session: Whether to save the main session.
"""
args, pipeline_args = parse_args(argv)
# must create before flags are used
runner = runners.DirectRunner()
pipeline_options = beam.options.pipeline_options.PipelineOptions(
pipeline_args)
pipeline_options.view_as(
beam.options.pipeline_options.SetupOptions).save_main_session = (
save_main_session)
dirname = os.path.dirname(FLAGS.output_tfrecord_prefix)
dirname = os.path.dirname(args.output_tfrecord_prefix)
tf.io.gfile.makedirs(dirname)
runner.run(
create_pipeline(
image_directory=FLAGS.image_directory,
input_annotations_file=FLAGS.input_annotations_file,
output_tfrecord_prefix=FLAGS.output_tfrecord_prefix,
num_images_per_shard=FLAGS.num_images_per_shard))
p = beam.Pipeline(options=pipeline_options)
create_pipeline(
pipeline=p,
image_directory=args.image_directory,
input_annotations_file=args.input_annotations_file,
output_tfrecord_prefix=args.output_tfrecord_prefix,
num_images_per_shard=args.num_images_per_shard)
p.run()
if __name__ == '__main__':
flags.mark_flags_as_required([
'image_directory',
'input_annotations_file',
'output_tfrecord_prefix'
])
app.run(main)
......@@ -21,13 +21,14 @@ import json
import os
import tempfile
import unittest
import apache_beam as beam
import numpy as np
from PIL import Image
import tensorflow.compat.v1 as tf
from object_detection.dataset_tools.context_rcnn import create_cococameratraps_tfexample_main
from object_detection.utils import tf_version
from apache_beam import runners
@unittest.skipIf(tf_version.is_tf2(), 'Skipping TF1.X only test.')
......@@ -156,16 +157,18 @@ class CreateCOCOCameraTrapsTfexampleTest(tf.test.TestCase):
example.features.feature['image/encoded'].bytes_list.value)
def test_beam_pipeline(self):
runner = runners.DirectRunner()
num_frames = 1
temp_dir = tempfile.mkdtemp(dir=os.environ.get('TEST_TMPDIR'))
json_path = self._create_json_file(temp_dir, num_frames)
output_tfrecord = temp_dir+'/output'
self._write_random_images_to_directory(temp_dir, num_frames)
pipeline = create_cococameratraps_tfexample_main.create_pipeline(
temp_dir, json_path,
pipeline_options = beam.options.pipeline_options.PipelineOptions(
runner='DirectRunner')
p = beam.Pipeline(options=pipeline_options)
create_cococameratraps_tfexample_main.create_pipeline(
p, temp_dir, json_path,
output_tfrecord_prefix=output_tfrecord)
runner.run(pipeline)
p.run()
filenames = tf.io.gfile.glob(output_tfrecord + '-?????-of-?????')
actual_output = []
record_iterator = tf.python_io.tf_record_iterator(path=filenames[0])
......@@ -176,17 +179,19 @@ class CreateCOCOCameraTrapsTfexampleTest(tf.test.TestCase):
actual_output[0]))
def test_beam_pipeline_bbox(self):
runner = runners.DirectRunner()
num_frames = 1
temp_dir = tempfile.mkdtemp(dir=os.environ.get('TEST_TMPDIR'))
json_path = self._create_json_file(temp_dir, num_frames, keep_bboxes=True)
output_tfrecord = temp_dir+'/output'
self._write_random_images_to_directory(temp_dir, num_frames)
pipeline = create_cococameratraps_tfexample_main.create_pipeline(
temp_dir, json_path,
pipeline_options = beam.options.pipeline_options.PipelineOptions(
runner='DirectRunner')
p = beam.Pipeline(options=pipeline_options)
create_cococameratraps_tfexample_main.create_pipeline(
p, temp_dir, json_path,
output_tfrecord_prefix=output_tfrecord,
keep_bboxes=True)
runner.run(pipeline)
p.run()
filenames = tf.io.gfile.glob(output_tfrecord+'-?????-of-?????')
actual_output = []
record_iterator = tf.python_io.tf_record_iterator(path=filenames[0])
......
......@@ -45,26 +45,12 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os
import threading
from absl import app
from absl import flags
import apache_beam as beam
import tensorflow.compat.v1 as tf
from apache_beam import runners
flags.DEFINE_string('detection_input_tfrecord', None, 'TFRecord containing '
'images in tf.Example format for object detection.')
flags.DEFINE_string('detection_output_tfrecord', None,
'TFRecord containing detections in tf.Example format.')
flags.DEFINE_string('detection_model_dir', None, 'Path to directory containing'
'an object detection SavedModel.')
flags.DEFINE_float('confidence_threshold', 0.9,
'Min confidence to keep bounding boxes')
flags.DEFINE_integer('num_shards', 0, 'Number of output shards.')
FLAGS = flags.FLAGS
class GenerateDetectionDataFn(beam.DoFn):
......@@ -205,58 +191,103 @@ class GenerateDetectionDataFn(beam.DoFn):
return [example]
def construct_pipeline(input_tfrecord, output_tfrecord, model_dir,
def construct_pipeline(pipeline, input_tfrecord, output_tfrecord, model_dir,
confidence_threshold, num_shards):
"""Returns a Beam pipeline to run object detection inference.
Args:
pipeline: Initialized beam pipeline.
input_tfrecord: A TFRecord of tf.train.Example protos containing images.
output_tfrecord: A TFRecord of tf.train.Example protos that contain images
in the input TFRecord and the detections from the model.
model_dir: Path to `saved_model` to use for inference.
confidence_threshold: Threshold to use when keeping detection results.
num_shards: The number of output shards.
"""
input_collection = (
pipeline | 'ReadInputTFRecord' >> beam.io.tfrecordio.ReadFromTFRecord(
input_tfrecord,
coder=beam.coders.BytesCoder()))
output_collection = input_collection | 'RunInference' >> beam.ParDo(
GenerateDetectionDataFn(model_dir, confidence_threshold))
output_collection = output_collection | 'Reshuffle' >> beam.Reshuffle()
_ = output_collection | 'WritetoDisk' >> beam.io.tfrecordio.WriteToTFRecord(
output_tfrecord,
num_shards=num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example))
def parse_args(argv):
"""Command-line argument parser.
Args:
argv: command line arguments
Returns:
pipeline: A Beam pipeline.
beam_args: Arguments for the beam pipeline.
pipeline_args: Arguments for the pipeline options, such as runner type.
"""
def pipeline(root):
input_collection = (
root | 'ReadInputTFRecord' >> beam.io.tfrecordio.ReadFromTFRecord(
input_tfrecord,
coder=beam.coders.BytesCoder()))
output_collection = input_collection | 'RunInference' >> beam.ParDo(
GenerateDetectionDataFn(model_dir, confidence_threshold))
output_collection = output_collection | 'Reshuffle' >> beam.Reshuffle()
_ = output_collection | 'WritetoDisk' >> beam.io.tfrecordio.WriteToTFRecord(
output_tfrecord,
num_shards=num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example))
return pipeline
def main(_):
parser = argparse.ArgumentParser()
parser.add_argument(
'--detection_input_tfrecord',
dest='detection_input_tfrecord',
required=True,
help='TFRecord containing images in tf.Example format for object '
'detection.')
parser.add_argument(
'--detection_output_tfrecord',
dest='detection_output_tfrecord',
required=True,
help='TFRecord containing detections in tf.Example format.')
parser.add_argument(
'--detection_model_dir',
dest='detection_model_dir',
required=True,
help='Path to directory containing an object detection SavedModel.')
parser.add_argument(
'--confidence_threshold',
dest='confidence_threshold',
default=0.9,
help='Min confidence to keep bounding boxes.')
parser.add_argument(
'--num_shards',
dest='num_shards',
default=0,
help='Number of output shards.')
beam_args, pipeline_args = parser.parse_known_args(argv)
return beam_args, pipeline_args
def main(argv=None, save_main_session=True):
"""Runs the Beam pipeline that performs inference.
Args:
_: unused
argv: Command line arguments.
save_main_session: Whether to save the main session.
"""
# must create before flags are used
runner = runners.DirectRunner()
dirname = os.path.dirname(FLAGS.detection_output_tfrecord)
args, pipeline_args = parse_args(argv)
pipeline_options = beam.options.pipeline_options.PipelineOptions(
pipeline_args)
pipeline_options.view_as(
beam.options.pipeline_options.SetupOptions).save_main_session = (
save_main_session)
dirname = os.path.dirname(args.detection_output_tfrecord)
tf.io.gfile.makedirs(dirname)
runner.run(
construct_pipeline(FLAGS.detection_input_tfrecord,
FLAGS.detection_output_tfrecord,
FLAGS.detection_model_dir,
FLAGS.confidence_threshold,
FLAGS.num_shards))
p = beam.Pipeline(options=pipeline_options)
construct_pipeline(
p,
args.detection_input_tfrecord,
args.detection_output_tfrecord,
args.detection_model_dir,
args.confidence_threshold,
args.num_shards)
p.run()
if __name__ == '__main__':
flags.mark_flags_as_required([
'detection_input_tfrecord',
'detection_output_tfrecord',
'detection_model_dir'
])
app.run(main)
......@@ -22,6 +22,7 @@ import contextlib
import os
import tempfile
import unittest
import apache_beam as beam
import numpy as np
import six
import tensorflow.compat.v1 as tf
......@@ -32,7 +33,6 @@ from object_detection.core import model
from object_detection.dataset_tools.context_rcnn import generate_detection_data
from object_detection.protos import pipeline_pb2
from object_detection.utils import tf_version
from apache_beam import runners
if six.PY2:
import mock # pylint: disable=g-import-not-at-top
......@@ -246,16 +246,18 @@ class GenerateDetectionDataTest(tf.test.TestCase):
def test_beam_pipeline(self):
with InMemoryTFRecord([self._create_tf_example()]) as input_tfrecord:
runner = runners.DirectRunner()
temp_dir = tempfile.mkdtemp(dir=os.environ.get('TEST_TMPDIR'))
output_tfrecord = os.path.join(temp_dir, 'output_tfrecord')
saved_model_path = self._export_saved_model()
confidence_threshold = 0.8
num_shards = 1
pipeline = generate_detection_data.construct_pipeline(
input_tfrecord, output_tfrecord, saved_model_path,
pipeline_options = beam.options.pipeline_options.PipelineOptions(
runner='DirectRunner')
p = beam.Pipeline(options=pipeline_options)
generate_detection_data.construct_pipeline(
p, input_tfrecord, output_tfrecord, saved_model_path,
confidence_threshold, num_shards)
runner.run(pipeline)
p.run()
filenames = tf.io.gfile.glob(output_tfrecord + '-?????-of-?????')
actual_output = []
record_iterator = tf.python_io.tf_record_iterator(path=filenames[0])
......
......@@ -47,34 +47,17 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import datetime
import os
import threading
from absl import app
from absl import flags
import apache_beam as beam
import numpy as np
import six
import tensorflow.compat.v1 as tf
from apache_beam import runners
flags.DEFINE_string('embedding_input_tfrecord', None, 'TFRecord containing'
'images in tf.Example format for object detection.')
flags.DEFINE_string('embedding_output_tfrecord', None,
'TFRecord containing embeddings in tf.Example format.')
flags.DEFINE_string('embedding_model_dir', None, 'Path to directory containing'
'an object detection SavedModel with'
'detection_box_classifier_features in the output.')
flags.DEFINE_integer('top_k_embedding_count', 1,
'The number of top k embeddings to add to the memory bank.'
)
flags.DEFINE_integer('bottom_k_embedding_count', 0,
'The number of bottom k embeddings to add to the memory '
'bank.')
flags.DEFINE_integer('num_shards', 0, 'Number of output shards.')
FLAGS = flags.FLAGS
class GenerateEmbeddingDataFn(beam.DoFn):
......@@ -321,12 +304,13 @@ class GenerateEmbeddingDataFn(beam.DoFn):
return [example]
def construct_pipeline(input_tfrecord, output_tfrecord, model_dir,
def construct_pipeline(pipeline, input_tfrecord, output_tfrecord, model_dir,
top_k_embedding_count, bottom_k_embedding_count,
num_shards):
"""Returns a beam pipeline to run object detection inference.
Args:
pipeline: Initialized beam pipeline.
input_tfrecord: An TFRecord of tf.train.Example protos containing images.
output_tfrecord: An TFRecord of tf.train.Example protos that contain images
in the input TFRecord and the detections from the model.
......@@ -335,44 +319,96 @@ def construct_pipeline(input_tfrecord, output_tfrecord, model_dir,
bottom_k_embedding_count: The number of low-confidence embeddings to store.
num_shards: The number of output shards.
"""
def pipeline(root):
input_collection = (
root | 'ReadInputTFRecord' >> beam.io.tfrecordio.ReadFromTFRecord(
input_tfrecord,
coder=beam.coders.BytesCoder()))
output_collection = input_collection | 'ExtractEmbedding' >> beam.ParDo(
GenerateEmbeddingDataFn(model_dir, top_k_embedding_count,
bottom_k_embedding_count))
output_collection = output_collection | 'Reshuffle' >> beam.Reshuffle()
_ = output_collection | 'WritetoDisk' >> beam.io.tfrecordio.WriteToTFRecord(
output_tfrecord,
num_shards=num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example))
return pipeline
def main(_):
input_collection = (
pipeline | 'ReadInputTFRecord' >> beam.io.tfrecordio.ReadFromTFRecord(
input_tfrecord,
coder=beam.coders.BytesCoder()))
output_collection = input_collection | 'ExtractEmbedding' >> beam.ParDo(
GenerateEmbeddingDataFn(model_dir, top_k_embedding_count,
bottom_k_embedding_count))
output_collection = output_collection | 'Reshuffle' >> beam.Reshuffle()
_ = output_collection | 'WritetoDisk' >> beam.io.tfrecordio.WriteToTFRecord(
output_tfrecord,
num_shards=num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example))
def parse_args(argv):
"""Command-line argument parser.
Args:
argv: command line arguments
Returns:
beam_args: Arguments for the beam pipeline.
pipeline_args: Arguments for the pipeline options, such as runner type.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--embedding_input_tfrecord',
dest='embedding_input_tfrecord',
required=True,
help='TFRecord containing images in tf.Example format for object '
'detection.')
parser.add_argument(
'--embedding_output_tfrecord',
dest='embedding_output_tfrecord',
required=True,
help='TFRecord containing embeddings in tf.Example format.')
parser.add_argument(
'--embedding_model_dir',
dest='embedding_model_dir',
required=True,
help='Path to directory containing an object detection SavedModel with'
'detection_box_classifier_features in the output.')
parser.add_argument(
'--top_k_embedding_count',
dest='top_k_embedding_count',
default=1,
help='The number of top k embeddings to add to the memory bank.')
parser.add_argument(
'--bottom_k_embedding_count',
dest='bottom_k_embedding_count',
default=0,
help='The number of bottom k embeddings to add to the memory bank.')
parser.add_argument(
'--num_shards',
dest='num_shards',
default=0,
help='Number of output shards.')
beam_args, pipeline_args = parser.parse_known_args(argv)
return beam_args, pipeline_args
def main(argv=None, save_main_session=True):
"""Runs the Beam pipeline that performs inference.
Args:
_: unused
argv: Command line arguments.
save_main_session: Whether to save the main session.
"""
# must create before flags are used
runner = runners.DirectRunner()
args, pipeline_args = parse_args(argv)
pipeline_options = beam.options.pipeline_options.PipelineOptions(
pipeline_args)
pipeline_options.view_as(
beam.options.pipeline_options.SetupOptions).save_main_session = (
save_main_session)
dirname = os.path.dirname(FLAGS.embedding_output_tfrecord)
dirname = os.path.dirname(args.embedding_output_tfrecord)
tf.io.gfile.makedirs(dirname)
runner.run(
construct_pipeline(FLAGS.embedding_input_tfrecord,
FLAGS.embedding_output_tfrecord,
FLAGS.embedding_model_dir, FLAGS.top_k_embedding_count,
FLAGS.bottom_k_embedding_count, FLAGS.num_shards))
p = beam.Pipeline(options=pipeline_options)
construct_pipeline(
p,
args.embedding_input_tfrecord,
args.embedding_output_tfrecord,
args.embedding_model_dir,
args.top_k_embedding_count,
args.bottom_k_embedding_count,
args.num_shards)
p.run()
if __name__ == '__main__':
flags.mark_flags_as_required([
'embedding_input_tfrecord',
'embedding_output_tfrecord',
'embedding_model_dir'
])
app.run(main)
......@@ -21,6 +21,7 @@ import contextlib
import os
import tempfile
import unittest
import apache_beam as beam
import numpy as np
import six
import tensorflow.compat.v1 as tf
......@@ -30,7 +31,7 @@ from object_detection.core import model
from object_detection.dataset_tools.context_rcnn import generate_embedding_data
from object_detection.protos import pipeline_pb2
from object_detection.utils import tf_version
from apache_beam import runners
if six.PY2:
import mock # pylint: disable=g-import-not-at-top
......@@ -314,17 +315,19 @@ class GenerateEmbeddingData(tf.test.TestCase):
def test_beam_pipeline(self):
with InMemoryTFRecord([self._create_tf_example()]) as input_tfrecord:
runner = runners.DirectRunner()
temp_dir = tempfile.mkdtemp(dir=os.environ.get('TEST_TMPDIR'))
output_tfrecord = os.path.join(temp_dir, 'output_tfrecord')
saved_model_path = self._export_saved_model()
top_k_embedding_count = 1
bottom_k_embedding_count = 0
num_shards = 1
pipeline = generate_embedding_data.construct_pipeline(
input_tfrecord, output_tfrecord, saved_model_path,
pipeline_options = beam.options.pipeline_options.PipelineOptions(
runner='DirectRunner')
p = beam.Pipeline(options=pipeline_options)
generate_embedding_data.construct_pipeline(
p, input_tfrecord, output_tfrecord, saved_model_path,
top_k_embedding_count, bottom_k_embedding_count, num_shards)
runner.run(pipeline)
p.run()
filenames = tf.io.gfile.glob(
output_tfrecord + '-?????-of-?????')
actual_output = []
......
......@@ -145,7 +145,7 @@ message Ssd {
optional MaskHead mask_head_config = 25;
}
// Next id: 18.
// Next id: 19.
message SsdFeatureExtractor {
reserved 6;
......
......@@ -1082,7 +1082,7 @@ class OpsTestPositionSensitiveCropRegions(test_case.TestCase):
return ps_crop_and_pool
output = self.execute(graph_fn, [])
self.assertAllEqual(output, expected_output[crop_size_mult - 1])
self.assertAllClose(output, expected_output[crop_size_mult - 1])
def test_raise_value_error_on_non_square_block_size(self):
num_spatial_bins = [3, 2]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment