Unverified Commit 78ddf6eb authored by cclauss's avatar cclauss Committed by GitHub
Browse files

Merge branch 'master' into patch-6

parents 50cb0365 1f34fcaf
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Tests for common.rollout."""
import numpy as np
import tensorflow as tf
from common import rollout as rollout_lib # brain coder
class RolloutTest(tf.test.TestCase):
def MakeRollout(self, states, actions, rewards, values=None, terminated=True):
rollout = rollout_lib.Rollout()
rollout.add_many(
states=states, actions=actions, rewards=rewards, values=values,
terminated=terminated)
return rollout
def testDiscount(self):
discounted = np.array([1.0 / 2 ** n for n in range(4, -1, -1)])
discounted[:2] += [1.0 / 2 ** n for n in range(1, -1, -1)]
self.assertTrue(np.array_equal(
rollout_lib.discount([0.0, 1.0, 0.0, 0.0, 1.0], 0.50),
discounted))
self.assertTrue(np.array_equal(
rollout_lib.discount(np.array([0.0, 1.0, 0.0, 0.0, 1.0]), 0.50),
discounted))
def testDiscountedAdvantageAndRewards(self):
# lambda=1, No bootstrapping.
values = [0.1, 0.5, 0.5, 0.25]
(empirical_values,
generalized_advantage) = rollout_lib.discounted_advantage_and_rewards(
[0.0, 0.0, 0.0, 1.0],
values,
gamma=0.75,
lambda_=1.0)
expected_discounted_r = (
np.array([1.0 * 0.75 ** n for n in range(3, -1, -1)]))
expected_adv = expected_discounted_r - values
self.assertTrue(np.array_equal(empirical_values, expected_discounted_r))
self.assertTrue(np.allclose(generalized_advantage, expected_adv))
# lambda=1, With bootstrapping.
values = [0.1, 0.5, 0.5, 0.25, 0.75]
(empirical_values,
generalized_advantage) = rollout_lib.discounted_advantage_and_rewards(
[0.0, 0.0, 0.0, 1.0],
values,
gamma=0.75,
lambda_=1.0)
expected_discounted_r = (
np.array([0.75 * 0.75 ** n for n in range(4, 0, -1)])
+ np.array([1.0 * 0.75 ** n for n in range(3, -1, -1)]))
expected_adv = expected_discounted_r - values[:-1]
self.assertTrue(np.array_equal(empirical_values, expected_discounted_r))
self.assertTrue(np.allclose(generalized_advantage, expected_adv))
# lambda=0.5, With bootstrapping.
values = [0.1, 0.5, 0.5, 0.25, 0.75]
rewards = [0.0, 0.0, 0.0, 1.0]
l = 0.5 # lambda
g = 0.75 # gamma
(empirical_values,
generalized_advantage) = rollout_lib.discounted_advantage_and_rewards(
rewards,
values,
gamma=g,
lambda_=l)
expected_discounted_r = (
np.array([0.75 * g ** n for n in range(4, 0, -1)])
+ np.array([1.0 * g ** n for n in range(3, -1, -1)]))
expected_adv = [0.0] * len(values)
for t in range(3, -1, -1):
delta_t = rewards[t] + g * values[t + 1] - values[t]
expected_adv[t] = delta_t + g * l * expected_adv[t + 1]
expected_adv = expected_adv[:-1]
self.assertTrue(np.array_equal(empirical_values, expected_discounted_r))
self.assertTrue(np.allclose(generalized_advantage, expected_adv))
def testProcessRollouts(self):
g = 0.95
rollouts = [
self.MakeRollout(
states=[3, 6, 9],
actions=[1, 2, 3],
rewards=[1.0, -1.0, 0.5],
values=[0.5, 0.5, 0.1]),
self.MakeRollout(
states=[10],
actions=[5],
rewards=[1.0],
values=[0.5])]
batch = rollout_lib.process_rollouts(rollouts, gamma=g)
self.assertEqual(2, batch.batch_size)
self.assertEqual(3, batch.max_time)
self.assertEqual([3, 1], batch.episode_lengths)
self.assertEqual([0.5, 1.0], batch.total_rewards)
self.assertEqual(
[[3, 6, 9], [10, 0, 0]],
batch.states.tolist())
self.assertEqual(
[[1, 2, 3], [5, 0, 0]],
batch.actions.tolist())
rew1, rew2 = rollouts[0].rewards, rollouts[1].rewards
expected_discounted_rewards = [
[rew1[0] + g * rew1[1] + g * g * rew1[2],
rew1[1] + g * rew1[2],
rew1[2]],
[rew2[0], 0.0, 0.0]]
expected_advantages = [
[dr - v
for dr, v
in zip(expected_discounted_rewards[0], rollouts[0].values)],
[expected_discounted_rewards[1][0] - rollouts[1].values[0], 0.0, 0.0]]
self.assertTrue(
np.allclose(expected_discounted_rewards, batch.discounted_r))
self.assertTrue(
np.allclose(expected_advantages, batch.discounted_adv))
if __name__ == '__main__':
tf.test.main()
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Schedule functions for controlling hparams over time."""
from abc import ABCMeta
from abc import abstractmethod
import math
from common import config_lib # brain coder
class Schedule(object):
"""Schedule is a function which sets a hyperparameter's value over time.
For example, a schedule can be used to decay an hparams, or oscillate it over
time.
This object is constructed with an instance of config_lib.Config (will be
specific to each class implementation). For example if this is a decay
schedule, the config may specify the rate of decay and decay start time. Then
the object instance is called like a function, mapping global step (an integer
counting how many calls to the train op have been made) to the hparam value.
Properties of a schedule function f(t):
0) Domain of t is the non-negative integers (t may be 0).
1) Range of f is the reals.
2) Schedule functions can assume that they will be called in time order. This
allows schedules to be stateful.
3) Schedule functions should be deterministic. Two schedule instances with the
same config must always give the same value for each t, and regardless of
what t's it was previously called on. Users may call f(t) on arbitrary
(positive) time jumps. Essentially, multiple schedule instances used in
replica training will behave the same.
4) Duplicate successive calls on the same time are allowed.
"""
__metaclass__ = ABCMeta
@abstractmethod
def __init__(self, config):
"""Construct this schedule with a config specific to each class impl.
Args:
config: An instance of config_lib.Config.
"""
pass
@abstractmethod
def __call__(self, global_step):
"""Map `global_step` to a value.
`global_step` is an integer counting how many calls to the train op have
been made across all replicas (hence why it is global). Implementations
may assume calls to be made in time order, i.e. `global_step` now >=
previous `global_step` values.
Args:
global_step: Non-negative integer.
Returns:
Hparam value at this step. A number.
"""
pass
class ConstSchedule(Schedule):
"""Constant function.
config:
const: Constant value at every step.
f(t) = const.
"""
def __init__(self, config):
super(ConstSchedule, self).__init__(config)
self.const = config.const
def __call__(self, global_step):
return self.const
class LinearDecaySchedule(Schedule):
"""Linear decay function.
config:
initial: Decay starts from this value.
final: Decay ends at this value.
start_time: Step when decay starts. Constant before it.
end_time: When decay ends. Constant after it.
f(t) is a linear function when start_time <= t <= end_time, with slope of
(final - initial) / (end_time - start_time). f(t) = initial
when t <= start_time. f(t) = final when t >= end_time.
If start_time == end_time, this becomes a step function.
"""
def __init__(self, config):
super(LinearDecaySchedule, self).__init__(config)
self.initial = config.initial
self.final = config.final
self.start_time = config.start_time
self.end_time = config.end_time
if self.end_time < self.start_time:
raise ValueError('start_time must be before end_time.')
# Linear interpolation.
self._time_diff = float(self.end_time - self.start_time)
self._diff = float(self.final - self.initial)
self._slope = (
self._diff / self._time_diff if self._time_diff > 0 else float('inf'))
def __call__(self, global_step):
if global_step <= self.start_time:
return self.initial
if global_step > self.end_time:
return self.final
return self.initial + (global_step - self.start_time) * self._slope
class ExponentialDecaySchedule(Schedule):
"""Exponential decay function.
See https://en.wikipedia.org/wiki/Exponential_decay.
Use this decay function to decay over orders of magnitude. For example, to
decay learning rate from 1e-2 to 1e-6. Exponential decay will decay the
exponent linearly.
config:
initial: Decay starts from this value.
final: Decay ends at this value.
start_time: Step when decay starts. Constant before it.
end_time: When decay ends. Constant after it.
f(t) is an exponential decay function when start_time <= t <= end_time. The
decay rate and amplitude are chosen so that f(t) = initial when
t = start_time, and f(t) = final when t = end_time. f(t) is constant for
t < start_time or t > end_time. initial and final must be positive values.
If start_time == end_time, this becomes a step function.
"""
def __init__(self, config):
super(ExponentialDecaySchedule, self).__init__(config)
self.initial = config.initial
self.final = config.final
self.start_time = config.start_time
self.end_time = config.end_time
if self.initial <= 0 or self.final <= 0:
raise ValueError('initial and final must be positive numbers.')
# Linear interpolation in log space.
self._linear_fn = LinearDecaySchedule(
config_lib.Config(
initial=math.log(self.initial),
final=math.log(self.final),
start_time=self.start_time,
end_time=self.end_time))
def __call__(self, global_step):
return math.exp(self._linear_fn(global_step))
class SmootherstepDecaySchedule(Schedule):
"""Smootherstep decay function.
A sigmoidal like transition from initial to final values. A smoother
transition than linear and exponential decays, hence the name.
See https://en.wikipedia.org/wiki/Smoothstep.
config:
initial: Decay starts from this value.
final: Decay ends at this value.
start_time: Step when decay starts. Constant before it.
end_time: When decay ends. Constant after it.
f(t) is fully defined here:
https://en.wikipedia.org/wiki/Smoothstep#Variations.
f(t) is smooth, as in its first-derivative exists everywhere.
"""
def __init__(self, config):
super(SmootherstepDecaySchedule, self).__init__(config)
self.initial = config.initial
self.final = config.final
self.start_time = config.start_time
self.end_time = config.end_time
if self.end_time < self.start_time:
raise ValueError('start_time must be before end_time.')
self._time_diff = float(self.end_time - self.start_time)
self._diff = float(self.final - self.initial)
def __call__(self, global_step):
if global_step <= self.start_time:
return self.initial
if global_step > self.end_time:
return self.final
x = (global_step - self.start_time) / self._time_diff
# Smootherstep
return self.initial + x * x * x * (x * (x * 6 - 15) + 10) * self._diff
class HardOscillatorSchedule(Schedule):
"""Hard oscillator function.
config:
high: Max value of the oscillator. Value at constant plateaus.
low: Min value of the oscillator. Value at constant valleys.
start_time: Global step when oscillation starts. Constant before this.
period: Width of one oscillation, i.e. number of steps over which the
oscillation takes place.
transition_fraction: Fraction of the period spent transitioning between high
and low values. 50% of this time is spent rising, and 50% of this time
is spent falling. 50% of the remaining time is spent constant at the
high value, and 50% of the remaining time is spent constant at the low
value. transition_fraction = 1.0 means the entire period is spent
rising and falling. transition_fraction = 0.0 means no time is spent
rising and falling, i.e. the function jumps instantaneously between
high and low.
f(t) = high when t < start_time.
f(t) is periodic when t >= start_time, with f(t + period) = f(t).
f(t) is linear with positive slope when rising, and negative slope when
falling. At the start of the period t0, f(t0) = high and begins to descend.
At the middle of the period f is low and is constant until the ascension
begins. f then rises from low to high and is constant again until the period
repeats.
Note: when transition_fraction is 0, f starts the period low and ends high.
"""
def __init__(self, config):
super(HardOscillatorSchedule, self).__init__(config)
self.high = config.high
self.low = config.low
self.start_time = config.start_time
self.period = float(config.period)
self.transition_fraction = config.transition_fraction
self.half_transition_fraction = config.transition_fraction / 2.0
if self.transition_fraction < 0 or self.transition_fraction > 1.0:
raise ValueError('transition_fraction must be between 0 and 1.0')
if self.period <= 0:
raise ValueError('period must be positive')
self._slope = (
float(self.high - self.low) / self.half_transition_fraction
if self.half_transition_fraction > 0 else float('inf'))
def __call__(self, global_step):
if global_step < self.start_time:
return self.high
period_pos = ((global_step - self.start_time) / self.period) % 1.0
if period_pos >= 0.5:
# ascending
period_pos -= 0.5
if period_pos < self.half_transition_fraction:
return self.low + period_pos * self._slope
else:
return self.high
else:
# descending
if period_pos < self.half_transition_fraction:
return self.high - period_pos * self._slope
else:
return self.low
_NAME_TO_CONFIG = {
'const': ConstSchedule,
'linear_decay': LinearDecaySchedule,
'exp_decay': ExponentialDecaySchedule,
'smooth_decay': SmootherstepDecaySchedule,
'hard_osc': HardOscillatorSchedule,
}
def make_schedule(config):
"""Schedule factory.
Given `config` containing a `fn` property, a Schedule implementation is
instantiated with `config`. See `_NAME_TO_CONFIG` for `fn` options.
Args:
config: Config with a `fn` option that specifies which Schedule
implementation to use. `config` is passed into the constructor.
Returns:
A Schedule impl instance.
"""
schedule_class = _NAME_TO_CONFIG[config.fn]
return schedule_class(config)
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Tests for common.schedules."""
from math import exp
from math import sqrt
import numpy as np
from six.moves import xrange
import tensorflow as tf
from common import config_lib # brain coder
from common import schedules # brain coder
class SchedulesTest(tf.test.TestCase):
def ScheduleTestHelper(self, config, schedule_subtype, io_values):
"""Run common checks for schedules.
Args:
config: Config object which is passed into schedules.make_schedule.
schedule_subtype: The expected schedule type to be instantiated.
io_values: List of (input, output) pairs. Must be in ascending input
order. No duplicate inputs.
"""
# Check that make_schedule makes the correct type.
f = schedules.make_schedule(config)
self.assertTrue(isinstance(f, schedule_subtype))
# Check that multiple instances returned from make_schedule behave the same.
fns = [schedules.make_schedule(config) for _ in xrange(3)]
# Check that all the inputs map to the right outputs.
for i, o in io_values:
for f in fns:
f_out = f(i)
self.assertTrue(
np.isclose(o, f_out),
'Wrong value at input %d. Expected %s, got %s' % (i, o, f_out))
# Check that a subset of the io_values are still correct.
f = schedules.make_schedule(config)
subseq = [io_values[i**2] for i in xrange(int(sqrt(len(io_values))))]
if subseq[-1] != io_values[-1]:
subseq.append(io_values[-1])
for i, o in subseq:
f_out = f(i)
self.assertTrue(
np.isclose(o, f_out),
'Wrong value at input %d. Expected %s, got %s' % (i, o, f_out))
# Check duplicate calls.
f = schedules.make_schedule(config)
for i, o in io_values:
for _ in xrange(3):
f_out = f(i)
self.assertTrue(
np.isclose(o, f_out),
'Duplicate calls at input %d are not equal. Expected %s, got %s'
% (i, o, f_out))
def testConstSchedule(self):
self.ScheduleTestHelper(
config_lib.Config(fn='const', const=5),
schedules.ConstSchedule,
[(0, 5), (1, 5), (10, 5), (20, 5), (100, 5), (1000000, 5)])
def testLinearDecaySchedule(self):
self.ScheduleTestHelper(
config_lib.Config(fn='linear_decay', initial=2, final=0, start_time=10,
end_time=20),
schedules.LinearDecaySchedule,
[(0, 2), (1, 2), (10, 2), (11, 1.8), (15, 1), (19, 0.2), (20, 0),
(100000, 0)])
# Test step function.
self.ScheduleTestHelper(
config_lib.Config(fn='linear_decay', initial=2, final=0, start_time=10,
end_time=10),
schedules.LinearDecaySchedule,
[(0, 2), (1, 2), (10, 2), (11, 0), (15, 0)])
def testExponentialDecaySchedule(self):
self.ScheduleTestHelper(
config_lib.Config(fn='exp_decay', initial=exp(-1), final=exp(-6),
start_time=10, end_time=20),
schedules.ExponentialDecaySchedule,
[(0, exp(-1)), (1, exp(-1)), (10, exp(-1)), (11, exp(-1/2. - 1)),
(15, exp(-5/2. - 1)), (19, exp(-9/2. - 1)), (20, exp(-6)),
(100000, exp(-6))])
# Test step function.
self.ScheduleTestHelper(
config_lib.Config(fn='exp_decay', initial=exp(-1), final=exp(-6),
start_time=10, end_time=10),
schedules.ExponentialDecaySchedule,
[(0, exp(-1)), (1, exp(-1)), (10, exp(-1)), (11, exp(-6)),
(15, exp(-6))])
def testSmootherstepDecaySchedule(self):
self.ScheduleTestHelper(
config_lib.Config(fn='smooth_decay', initial=2, final=0, start_time=10,
end_time=20),
schedules.SmootherstepDecaySchedule,
[(0, 2), (1, 2), (10, 2), (11, 1.98288), (15, 1), (19, 0.01712),
(20, 0), (100000, 0)])
# Test step function.
self.ScheduleTestHelper(
config_lib.Config(fn='smooth_decay', initial=2, final=0, start_time=10,
end_time=10),
schedules.SmootherstepDecaySchedule,
[(0, 2), (1, 2), (10, 2), (11, 0), (15, 0)])
def testHardOscillatorSchedule(self):
self.ScheduleTestHelper(
config_lib.Config(fn='hard_osc', high=2, low=0, start_time=100,
period=10, transition_fraction=0.5),
schedules.HardOscillatorSchedule,
[(0, 2), (1, 2), (10, 2), (100, 2), (101, 1.2), (102, 0.4), (103, 0),
(104, 0), (105, 0), (106, 0.8), (107, 1.6), (108, 2), (109, 2),
(110, 2), (111, 1.2), (112, 0.4), (115, 0), (116, 0.8), (119, 2),
(120, 2), (100001, 1.2), (100002, 0.4), (100005, 0), (100006, 0.8),
(100010, 2)])
# Test instantaneous step.
self.ScheduleTestHelper(
config_lib.Config(fn='hard_osc', high=2, low=0, start_time=100,
period=10, transition_fraction=0),
schedules.HardOscillatorSchedule,
[(0, 2), (1, 2), (10, 2), (99, 2), (100, 0), (104, 0), (105, 2),
(106, 2), (109, 2), (110, 0)])
if __name__ == '__main__':
tf.test.main()
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Configuration class."""
import bisect
from collections import deque
import cPickle
import heapq
import random
from absl import logging
import numpy as np
import six
from six.moves import xrange
import tensorflow as tf
def tuple_to_record(tuple_, record_type):
return record_type(**dict(zip(record_type.__slots__, tuple_)))
def make_record(type_name, attributes, defaults=None):
"""Factory for mutable record classes.
A record acts just like a collections.namedtuple except slots are writable.
One exception is that record classes are not equivalent to tuples or other
record classes of the same length.
Note, each call to `make_record` produces a unique type. Two calls will make
different types even if `type_name` is the same each time.
Args:
type_name: Name of the record type to create.
attributes: List of names of each record attribute. The order of the list
is preserved.
defaults: (optional) default values for attributes. A dict mapping attribute
names to values.
Returns:
A new record type.
Raises:
ValueError: If,
`defaults` is not a dict,
`attributes` contains duplicate names,
`defaults` keys are not contained in `attributes`.
"""
if defaults is None:
defaults = {}
if not isinstance(defaults, dict):
raise ValueError('defaults must be a dict.')
attr_set = set(attributes)
if len(attr_set) < len(attributes):
raise ValueError('No duplicate attributes allowed.')
if not set(defaults.keys()).issubset(attr_set):
raise ValueError('Default attributes must be given in the attributes list.')
class RecordClass(object):
"""A record type.
Acts like mutable tuple with named slots.
"""
__slots__ = list(attributes)
_defaults = dict(defaults)
def __init__(self, *args, **kwargs):
if len(args) > len(self.__slots__):
raise ValueError('Too many arguments. %s has length %d.'
% (type(self).__name__, len(self.__slots__)))
for attr, val in self._defaults.items():
setattr(self, attr, val)
for i, arg in enumerate(args):
setattr(self, self.__slots__[i], arg)
for attr, val in kwargs.items():
setattr(self, attr, val)
for attr in self.__slots__:
if not hasattr(self, attr):
raise ValueError('Required attr "%s" is not set.' % attr)
def __len__(self):
return len(self.__slots__)
def __iter__(self):
for attr in self.__slots__:
yield getattr(self, attr)
def __getitem__(self, index):
return getattr(self, self.__slots__[index])
def __setitem__(self, index, value):
return setattr(self, self.__slots__[index], value)
def __eq__(self, other):
# Types must be equal as well as values.
return (isinstance(other, type(self))
and all(a == b for a, b in zip(self, other)))
def __str__(self):
return '%s(%s)' % (
type(self).__name__,
', '.join(attr + '=' + str(getattr(self, attr))
for attr in self.__slots__))
def __repr__(self):
return str(self)
RecordClass.__name__ = type_name
return RecordClass
# Making minibatches.
def stack_pad(tensors, pad_axes=None, pad_to_lengths=None, dtype=np.float32,
pad_value=0):
"""Stack tensors along 0-th dim and pad them to be the same shape.
Args:
tensors: Any list of iterables (python list, numpy array, etc). Can be 1D
or multi-D iterables.
pad_axes: An int or list of ints. Axes to pad along.
pad_to_lengths: Length in each dimension. If pad_axes was an int, this is an
int or None. If pad_axes was a list of ints, this is a list of mixed int
and None types with the same length, or None. A None length means the
maximum length among the given tensors is used.
dtype: Type of output numpy array. Defaults to np.float32.
pad_value: Value to use for padding. Defaults to 0.
Returns:
Numpy array containing the tensors stacked along the 0-th dimension and
padded along the specified dimensions.
Raises:
ValueError: If the tensors do not have equal shapes along non-padded
dimensions.
"""
tensors = [np.asarray(t) for t in tensors]
max_lengths = [max(l) for l in zip(*[t.shape for t in tensors])]
same_axes = dict(enumerate(max_lengths))
if pad_axes is None:
pad_axes = []
if isinstance(pad_axes, six.integer_types):
if pad_to_lengths is not None:
max_lengths[pad_axes] = pad_to_lengths
del same_axes[pad_axes]
else:
if pad_to_lengths is None:
pad_to_lengths = [None] * len(pad_axes)
for i, l in zip(pad_axes, pad_to_lengths):
if l is not None:
max_lengths[i] = l
del same_axes[i]
same_axes_items = same_axes.items()
dest = np.full([len(tensors)] + max_lengths, pad_value, dtype=dtype)
for i, t in enumerate(tensors):
for j, l in same_axes_items:
if t.shape[j] != l:
raise ValueError(
'Tensor at index %d does not have size %d along axis %d'
% (i, l, j))
dest[[i] + [slice(0, d) for d in t.shape]] = t
return dest
class RandomQueue(deque):
def __init__(self, capacity):
super(RandomQueue, self).__init__([], capacity)
self.capacity = capacity
def random_sample(self, sample_size):
idx = np.random.choice(len(self), sample_size)
return [self[i] for i in idx]
def push(self, item):
# Append to right. Oldest element will be popped from left.
self.append(item)
class MPQItemContainer(object):
"""Class for holding an item with its score.
Defines a comparison function for use in the heap-queue.
"""
def __init__(self, score, item, extra_data):
self.item = item
self.score = score
self.extra_data = extra_data
def __cmp__(self, other):
assert isinstance(other, type(self))
return cmp(self.score, other.score)
def __iter__(self):
"""Allows unpacking like a tuple."""
yield self.score
yield self.item
yield self.extra_data
def __repr__(self):
"""String representation of this item.
`extra_data` is not included in the representation. We are assuming that
`extra_data` is not easily interpreted by a human (if it was, it should be
hashable, like a string or tuple).
Returns:
String representation of `self`.
"""
return str((self.score, self.item))
def __str__(self):
return repr(self)
class MaxUniquePriorityQueue(object):
"""A maximum priority queue where duplicates are not added.
The top items by score remain in the queue. When the capacity is reached,
the lowest scored item in the queue will be dropped.
This implementation differs from a typical priority queue, in that the minimum
score is popped, instead of the maximum. Largest scores remain stuck in the
queue. This is useful for accumulating the best known items from a population.
The items used to determine uniqueness must be hashable, but additional
non-hashable data may be stored with each item.
"""
def __init__(self, capacity):
self.capacity = capacity
self.heap = []
self.unique_items = set()
def push(self, score, item, extra_data=None):
"""Push an item onto the queue.
If the queue is at capacity, the item with the smallest score will be
dropped. Note that it is assumed each item has exactly one score. The same
item with a different score will still be dropped.
Args:
score: Number used to prioritize items in the queue. Largest scores are
kept in the queue.
item: A hashable item to be stored. Duplicates of this item will not be
added to the queue.
extra_data: An extra (possible not hashable) data to store with the item.
"""
if item in self.unique_items:
return
if len(self.heap) >= self.capacity:
_, popped_item, _ = heapq.heappushpop(
self.heap, MPQItemContainer(score, item, extra_data))
self.unique_items.add(item)
self.unique_items.remove(popped_item)
else:
heapq.heappush(self.heap, MPQItemContainer(score, item, extra_data))
self.unique_items.add(item)
def pop(self):
"""Pop the item with the lowest score.
Returns:
score: Item's score.
item: The item that was popped.
extra_data: Any extra data stored with the item.
"""
if not self.heap:
return ()
score, item, extra_data = heapq.heappop(self.heap)
self.unique_items.remove(item)
return score, item, extra_data
def get_max(self):
"""Peek at the item with the highest score.
Returns:
Same as `pop`.
"""
if not self.heap:
return ()
score, item, extra_data = heapq.nlargest(1, self.heap)[0]
return score, item, extra_data
def get_min(self):
"""Peek at the item with the lowest score.
Returns:
Same as `pop`.
"""
if not self.heap:
return ()
score, item, extra_data = heapq.nsmallest(1, self.heap)[0]
return score, item, extra_data
def random_sample(self, sample_size):
"""Randomly select items from the queue.
This does not modify the queue.
Items are drawn from a uniform distribution, and not weighted by score.
Args:
sample_size: Number of random samples to draw. The same item can be
sampled multiple times.
Returns:
List of sampled items (of length `sample_size`). Each element in the list
is a tuple: (item, extra_data).
"""
idx = np.random.choice(len(self.heap), sample_size)
return [(self.heap[i].item, self.heap[i].extra_data) for i in idx]
def iter_in_order(self):
"""Iterate over items in the queue from largest score to smallest.
Yields:
item: Hashable item.
extra_data: Extra data stored with the item.
"""
for _, item, extra_data in heapq.nlargest(len(self.heap), self.heap):
yield item, extra_data
def __len__(self):
return len(self.heap)
def __iter__(self):
for _, item, _ in self.heap:
yield item
def __repr__(self):
return '[' + ', '.join(repr(c) for c in self.heap) + ']'
def __str__(self):
return repr(self)
class RouletteWheel(object):
"""Randomly samples stored objects proportionally to their given weights.
Stores objects and weights. Acts like a roulette wheel where each object is
given a slice of the roulette disk proportional to its weight.
This can be used as a replay buffer where past experiences are sampled
proportionally to their weights. A good choice of "weight" for reinforcement
learning is exp(reward / temperature) where temperature -> inf makes the
distribution more uniform and temperature -> 0 makes the distribution more
peaky.
To prevent experiences from being overweighted by appearing in the replay
buffer multiple times, a "unique mode" is supported where duplicate
experiences are ignored. In unique mode, weights can be quickly retrieved from
keys.
"""
def __init__(self, unique_mode=False, save_file=None):
"""Construct empty RouletteWheel.
If `save_file` is not None, and the file already exists on disk, whatever
is in the file will be loaded into this instance. This allows jobs using
RouletteWheel to resume after preemption.
Args:
unique_mode: If True, puts this RouletteWheel into unique mode, where
objects are added with hashable keys, so that duplicates are ignored.
save_file: Optional file path to save to. Must be a string containing
an absolute path to a file, or None. File will be Python pickle
format.
"""
self.unique_mode = unique_mode
self.objects = []
self.weights = []
self.partial_sums = []
if self.unique_mode:
self.keys_to_weights = {}
self.save_file = save_file
self.save_to_disk_buffer = []
if save_file is not None and tf.gfile.Exists(save_file):
# Load from disk.
with tf.gfile.OpenFast(save_file, 'r') as f:
count = 0
while 1:
try:
obj, weight, key = cPickle.load(f)
except EOFError:
break
else:
self.add(obj, weight, key)
count += 1
logging.info('Loaded %d samples from disk.', count)
# Clear buffer since these items are already on disk.
self.save_to_disk_buffer = []
def __iter__(self):
return iter(zip(self.objects, self.weights))
def __len__(self):
return len(self.objects)
def is_empty(self):
"""Returns whether there is anything in the roulette wheel."""
return not self.partial_sums
@property
def total_weight(self):
"""Total cumulative weight across all objects."""
if self.partial_sums:
return self.partial_sums[-1]
return 0.0
def has_key(self, key):
if self.unique_mode:
RuntimeError('has_key method can only be called in unique mode.')
return key in self.keys_to_weights
def get_weight(self, key):
if self.unique_mode:
RuntimeError('get_weight method can only be called in unique mode.')
return self.keys_to_weights[key]
def add(self, obj, weight, key=None):
"""Add one object and its weight to the roulette wheel.
Args:
obj: Any object to be stored.
weight: A non-negative float. The given object will be drawn with
probability proportional to this weight when sampling.
key: This argument is only used when in unique mode. To allow `obj` to
be an unhashable type, like list, a separate hashable key is given.
Each `key` should be unique to each `obj`. `key` is used to check if
`obj` has been added to the roulette wheel before.
Returns:
True if the object was added, False if it was not added due to it being
a duplicate (this only happens in unique mode).
Raises:
ValueError: If `weight` is negative.
ValueError: If `key` is not given when in unique mode, or if `key` is
given when not in unique mode.
"""
if weight < 0:
raise ValueError('Weight must be non-negative')
if self.unique_mode:
if key is None:
raise ValueError(
'Hashable key required for objects when unique mode is enabled.')
if key in self.keys_to_weights:
# Weight updates are not allowed. Ignore the given value of `weight`.
return False
self.keys_to_weights[key] = weight
elif key is not None:
raise ValueError(
'key argument should not be used when unique mode is disabled.')
self.objects.append(obj)
self.weights.append(weight)
self.partial_sums.append(self.total_weight + weight)
if self.save_file is not None:
# Record new item in buffer.
self.save_to_disk_buffer.append((obj, weight, key))
return True
def add_many(self, objs, weights, keys=None):
"""Add many object and their weights to the roulette wheel.
Arguments are the same as the `add` method, except each is a list. Lists
must all be the same length.
Args:
objs: List of objects to be stored.
weights: List of non-negative floats. See `add` method.
keys: List of hashable keys. This argument is only used when in unique
mode. See `add` method.
Returns:
Number of objects added. This number will be less than the number of
objects provided if we are in unique mode and some keys are already
in the roulette wheel.
Raises:
ValueError: If `keys` argument is provided when unique_mode == False, or
is not provided when unique_mode == True.
ValueError: If any of the lists are not the same length.
ValueError: If any of the weights are negative.
"""
if keys is not None and not self.unique_mode:
raise ValueError('Not in unique mode. Do not provide keys.')
elif keys is None and self.unique_mode:
raise ValueError('In unique mode. You must provide hashable keys.')
if keys and len(objs) != len(keys):
raise ValueError('Number of objects does not equal number of keys.')
if len(objs) != len(weights):
raise ValueError('Number of objects does not equal number of weights.')
return sum([self.add(obj, weights[i], key=keys[i] if keys else None)
for i, obj in enumerate(objs)])
def sample(self):
"""Spin the roulette wheel.
Randomly select an object with probability proportional to its weight.
Returns:
object: The selected object.
weight: The weight of the selected object.
Raises:
RuntimeError: If the roulette wheel is empty.
"""
if self.is_empty():
raise RuntimeError('Trying to sample from empty roulette wheel.')
spin = random.random() * self.total_weight
# Binary search.
i = bisect.bisect_right(self.partial_sums, spin)
if i == len(self.partial_sums):
# This should not happen since random.random() will always be strictly
# less than 1.0, and the last partial sum equals self.total_weight().
# However it may happen due to rounding error. In that case it is easy to
# handle this, just select the last object.
i -= 1
return self.objects[i], self.weights[i]
def sample_many(self, count):
"""Spin the roulette wheel `count` times and return the results."""
if self.is_empty():
raise RuntimeError('Trying to sample from empty roulette wheel.')
return [self.sample() for _ in xrange(count)]
def incremental_save(self, log_info=False):
"""Write new entries to disk.
This performs an append operation on the `save_file` given in the
constructor. Any entries added since the last call to `incremental_save`
will be appended to the file.
If a new RouletteWheel is constructed with the same `save_file`, all the
entries written there will be automatically loaded into the instance.
This is useful when a job resumes after preemption.
Args:
log_info: If True, info about this operation will be logged.
Raises:
RuntimeError: If `save_file` given in the constructor is None.
"""
if self.save_file is None:
raise RuntimeError('Cannot call incremental_save. `save_file` is None.')
if log_info:
logging.info('Saving %d new samples to disk.',
len(self.save_to_disk_buffer))
with tf.gfile.OpenFast(self.save_file, 'a') as f:
for entry in self.save_to_disk_buffer:
cPickle.dump(entry, f)
# Clear the buffer.
self.save_to_disk_buffer = []
This diff is collapsed.
licenses(["notice"])
package(default_visibility = [
"//learning/brain/research/neural_coder:__subpackages__",
])
load("@subpar//:subpar.bzl", "par_binary")
par_binary(
name = "run",
srcs = ["run.py"],
deps = [
":defaults",
":ga_train",
":pg_train",
# absl dep :app
# absl dep /flags
# absl dep /logging
],
)
par_binary(
name = "tune",
srcs = ["tune.py"],
deps = [
":defaults",
":run",
# file dep
# absl dep :app
# absl dep /flags
# absl dep /logging
# numpy dep
# tensorflow dep
],
)
py_library(
name = "ga_train",
srcs = ["ga_train.py"],
deps = [
":data",
":defaults",
":ga_lib",
":results_lib",
# file dep
# absl dep /flags
# absl dep /logging
# numpy dep
# tensorflow dep
"//common:utils", # project
],
)
py_library(
name = "ga_lib",
srcs = ["ga_lib.py"],
deps = [
":misc",
# absl dep /flags
# absl dep /logging
# numpy dep
"//common:bf", # project
"//common:utils", # project
],
)
py_test(
name = "ga_train_test",
srcs = ["ga_train_test.py"],
deps = [
":defaults",
":run",
# absl dep /flags
# tensorflow dep
],
)
py_library(
name = "pg_train",
srcs = ["pg_train.py"],
deps = [
":data",
":defaults",
":pg_agent",
":results_lib",
# file dep
# absl dep /flags
# absl dep /logging
# tensorflow dep
# tensorflow internal dep # build_cleaner: keep
],
)
py_library(
name = "pg_agent",
srcs = ["pg_agent.py"],
deps = [
":misc",
# file dep
# absl dep /logging
# numpy dep
# tensorflow dep
"//common:rollout", # project
"//common:utils", # project
],
)
py_test(
name = "pg_agent_test",
srcs = ["pg_agent_test.py"],
deps = [
":data",
":defaults",
":misc",
":pg_agent",
":pg_train",
# absl dep /logging
# numpy dep
# tensorflow dep
"//common:utils", # project
],
)
py_library(
name = "defaults",
srcs = ["defaults.py"],
deps = [
# absl dep /logging
"//common:config_lib", # project
],
)
py_library(
name = "misc",
srcs = ["misc.py"],
)
py_library(
name = "data",
srcs = ["data.py"],
deps = [
":code_tasks",
# absl dep /logging
],
)
py_library(
name = "code_tasks",
srcs = ["code_tasks.py"],
deps = [
":misc",
":test_tasks",
# absl dep /logging
# numpy dep
"//common:bf", # project
"//common:reward", # project
],
)
py_test(
name = "code_tasks_test",
srcs = ["code_tasks_test.py"],
deps = [
":code_tasks",
":defaults",
# numpy dep
# tensorflow dep
],
)
py_library(
name = "test_tasks",
srcs = ["test_tasks.py"],
deps = [
":misc",
"//common:reward", # project
],
)
py_test(
name = "test_tasks_test",
srcs = ["test_tasks_test.py"],
deps = [
":misc",
":test_tasks",
# numpy dep
# tensorflow dep
],
)
py_test(
name = "pg_train_test",
size = "large",
srcs = ["pg_train_test.py"],
deps = [
":defaults",
":run",
# absl dep /logging
# tensorflow dep
],
)
py_library(
name = "results_lib",
srcs = ["results_lib.py"],
deps = [
# file dep
# tensorflow dep
],
)
py_test(
name = "results_lib_test",
srcs = ["results_lib_test.py"],
deps = [
":results_lib",
# tensorflow dep
],
)
par_binary(
name = "aggregate_experiment_results",
srcs = ["aggregate_experiment_results.py"],
deps = [
":misc",
":results_lib",
# file dep
# absl dep :app
# absl dep /flags
# numpy dep
# tensorflow dep
],
)
par_binary(
name = "aggregate_tuning_results",
srcs = ["aggregate_tuning_results.py"],
deps = [
# file dep
# absl dep :app
# absl dep /flags
# tensorflow dep
],
)
This diff is collapsed.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
r"""After running tuning, use this script to aggregate the results.
Usage:
OUT_DIR="<my_tuning_dir>"
bazel run -c opt single_task:aggregate_tuning_results -- \
--alsologtostderr \
--tuning_dir="$OUT_DIR"
"""
import ast
import os
from absl import app
from absl import flags
import tensorflow as tf
FLAGS = flags.FLAGS
flags.DEFINE_string(
'tuning_dir', '',
'Absolute path where results tuning trial folders are found.')
def main(argv):
del argv # Unused.
try:
trial_dirs = tf.gfile.ListDirectory(FLAGS.tuning_dir)
except tf.errors.NotFoundError:
print('Tuning directory %s does not exist.' % (FLAGS.tuning_dir,))
return
metrics = []
for trial_dir in trial_dirs:
tuning_results_file = os.path.join(
FLAGS.tuning_dir, trial_dir, 'tuning_results.txt')
if tf.gfile.Exists(tuning_results_file):
with tf.gfile.FastGFile(tuning_results_file, 'r') as reader:
for line in reader:
metrics.append(ast.literal_eval(line.replace(': nan,', ': 0.0,')))
if not metrics:
print('No trials found.')
return
num_trials = [m['num_trials'] for m in metrics]
assert all(n == num_trials[0] for n in num_trials)
num_trials = num_trials[0]
print('Found %d completed trials out of %d' % (len(metrics), num_trials))
# Sort by objective descending.
sorted_trials = sorted(metrics, key=lambda m: -m['objective'])
for i, metrics in enumerate(sorted_trials):
hparams = metrics['hparams']
keys = sorted(hparams.keys())
print(
str(i).ljust(4) + ': '
+ '{0:.2f}'.format(metrics['objective']).ljust(10)
+ '['
+ ','.join(['{}={}'.format(k, hparams[k]).ljust(24) for k in keys])
+ ']')
if __name__ == '__main__':
app.run(main)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment