You need to sign in or sign up before continuing.
Unverified Commit c9f03bf6 authored by Neal Wu's avatar Neal Wu Committed by GitHub
Browse files

Merge pull request #5870 from ofirnachum/master

Add training and eval code for efficient-hrl
parents 2c181308 052361de
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
r"""Script for evaluating a UVF agent.
To run locally: See run_eval.py
To run on borg: See train_eval.borg
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import tensorflow as tf
slim = tf.contrib.slim
import gin.tf
# pylint: disable=unused-import
import agent
import train
from utils import utils as uvf_utils
from utils import eval_utils
from environments import create_maze_env
# pylint: enable=unused-import
flags = tf.app.flags
flags.DEFINE_string('eval_dir', None,
'Directory for writing logs/summaries during eval.')
flags.DEFINE_string('checkpoint_dir', None,
'Directory containing checkpoints to eval.')
FLAGS = flags.FLAGS
def get_evaluate_checkpoint_fn(master, output_dir, eval_step_fns,
model_rollout_fn, gamma, max_steps_per_episode,
num_episodes_eval, num_episodes_videos,
tuner_hook, generate_videos,
generate_summaries, video_settings):
"""Returns a function that evaluates a given checkpoint.
Args:
master: BNS name of the TensorFlow master
output_dir: The output directory to which the metric summaries are written.
eval_step_fns: A dictionary of a functions that return a list of
[state, action, reward, discount, transition_type] tensors,
indexed by summary tag name.
model_rollout_fn: Model rollout fn.
gamma: Discount factor for the reward.
max_steps_per_episode: Maximum steps to run each episode for.
num_episodes_eval: Number of episodes to evaluate and average reward over.
num_episodes_videos: Number of episodes to record for video.
tuner_hook: A callable(average reward, global step) that updates a Vizier
tuner trial.
generate_videos: Whether to generate videos of the agent in action.
generate_summaries: Whether to generate summaries.
video_settings: Settings for generating videos of the agent.
Returns:
A function that evaluates a checkpoint.
"""
sess = tf.Session(master, graph=tf.get_default_graph())
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
summary_writer = tf.summary.FileWriter(output_dir)
def evaluate_checkpoint(checkpoint_path):
"""Performs a one-time evaluation of the given checkpoint.
Args:
checkpoint_path: Checkpoint to evaluate.
Returns:
True if the evaluation process should stop
"""
restore_fn = tf.contrib.framework.assign_from_checkpoint_fn(
checkpoint_path,
uvf_utils.get_all_vars(),
ignore_missing_vars=True,
reshape_variables=False)
assert restore_fn is not None, 'cannot restore %s' % checkpoint_path
restore_fn(sess)
global_step = sess.run(slim.get_global_step())
should_stop = False
max_reward = -1e10
max_meta_reward = -1e10
for eval_tag, (eval_step, env_base,) in sorted(eval_step_fns.items()):
if hasattr(env_base, 'set_sess'):
env_base.set_sess(sess) # set session
if generate_summaries:
tf.logging.info(
'[%s] Computing average reward over %d episodes at global step %d.',
eval_tag, num_episodes_eval, global_step)
(average_reward, last_reward,
average_meta_reward, last_meta_reward, average_success,
states, actions) = eval_utils.compute_average_reward(
sess, env_base, eval_step, gamma, max_steps_per_episode,
num_episodes_eval)
tf.logging.info('[%s] Average reward = %f', eval_tag, average_reward)
tf.logging.info('[%s] Last reward = %f', eval_tag, last_reward)
tf.logging.info('[%s] Average meta reward = %f', eval_tag, average_meta_reward)
tf.logging.info('[%s] Last meta reward = %f', eval_tag, last_meta_reward)
tf.logging.info('[%s] Average success = %f', eval_tag, average_success)
if model_rollout_fn is not None:
preds, model_losses = eval_utils.compute_model_loss(
sess, model_rollout_fn, states, actions)
for i, (pred, state, model_loss) in enumerate(
zip(preds, states, model_losses)):
tf.logging.info('[%s] Model rollout step %d: loss=%f', eval_tag, i,
model_loss)
tf.logging.info('[%s] Model rollout step %d: pred=%s', eval_tag, i,
str(pred.tolist()))
tf.logging.info('[%s] Model rollout step %d: state=%s', eval_tag, i,
str(state.tolist()))
# Report the eval stats to the tuner.
if average_reward > max_reward:
max_reward = average_reward
if average_meta_reward > max_meta_reward:
max_meta_reward = average_meta_reward
for (tag, value) in [('Reward/average_%s_reward', average_reward),
('Reward/last_%s_reward', last_reward),
('Reward/average_%s_meta_reward', average_meta_reward),
('Reward/last_%s_meta_reward', last_meta_reward),
('Reward/average_%s_success', average_success)]:
summary_str = tf.Summary(value=[
tf.Summary.Value(
tag=tag % eval_tag,
simple_value=value)
])
summary_writer.add_summary(summary_str, global_step)
summary_writer.flush()
if generate_videos or should_stop:
# Do a manual reset before generating the video to see the initial
# pose of the robot, towards which the reset controller is moving.
if hasattr(env_base, '_gym_env'):
tf.logging.info('Resetting before recording video')
if hasattr(env_base._gym_env, 'reset_model'):
env_base._gym_env.reset_model() # pylint: disable=protected-access
else:
env_base._gym_env.wrapped_env.reset_model()
video_filename = os.path.join(output_dir, 'videos',
'%s_step_%d.mp4' % (eval_tag,
global_step))
eval_utils.capture_video(sess, eval_step, env_base,
max_steps_per_episode * num_episodes_videos,
video_filename, video_settings,
reset_every=max_steps_per_episode)
should_stop = should_stop or (generate_summaries and tuner_hook and
tuner_hook(max_reward, global_step))
return bool(should_stop)
return evaluate_checkpoint
def get_model_rollout(uvf_agent, tf_env):
"""Model rollout function."""
state_spec = tf_env.observation_spec()[0]
action_spec = tf_env.action_spec()[0]
state_ph = tf.placeholder(dtype=state_spec.dtype, shape=state_spec.shape)
action_ph = tf.placeholder(dtype=action_spec.dtype, shape=action_spec.shape)
merged_state = uvf_agent.merged_state(state_ph)
diff_value = uvf_agent.critic_net(tf.expand_dims(merged_state, 0),
tf.expand_dims(action_ph, 0))[0]
diff_value = tf.cast(diff_value, dtype=state_ph.dtype)
state_ph.shape.assert_is_compatible_with(diff_value.shape)
next_state = state_ph + diff_value
def model_rollout_fn(sess, state, action):
return sess.run(next_state, feed_dict={state_ph: state, action_ph: action})
return model_rollout_fn
def get_eval_step(uvf_agent,
state_preprocess,
tf_env,
action_fn,
meta_action_fn,
environment_steps,
num_episodes,
mode='eval'):
"""Get one-step policy/env stepping ops.
Args:
uvf_agent: A UVF agent.
tf_env: A TFEnvironment.
action_fn: A function to produce actions given current state.
meta_action_fn: A function to produce meta actions given current state.
environment_steps: A variable to count the number of steps in the tf_env.
num_episodes: A variable to count the number of episodes.
mode: a string representing the mode=[train, explore, eval].
Returns:
A collect_experience_op that excute an action and store into the
replay_buffer
"""
tf_env.start_collect()
state = tf_env.current_obs()
action = action_fn(state, context=None)
state_repr = state_preprocess(state)
action_spec = tf_env.action_spec()
action_ph = tf.placeholder(dtype=action_spec.dtype, shape=action_spec.shape)
with tf.control_dependencies([state]):
transition_type, reward, discount = tf_env.step(action_ph)
def increment_step():
return environment_steps.assign_add(1)
def increment_episode():
return num_episodes.assign_add(1)
def no_op_int():
return tf.constant(0, dtype=tf.int64)
step_cond = uvf_agent.step_cond_fn(state, action,
transition_type,
environment_steps, num_episodes)
reset_episode_cond = uvf_agent.reset_episode_cond_fn(
state, action,
transition_type, environment_steps, num_episodes)
reset_env_cond = uvf_agent.reset_env_cond_fn(state, action,
transition_type,
environment_steps, num_episodes)
increment_step_op = tf.cond(step_cond, increment_step, no_op_int)
with tf.control_dependencies([increment_step_op]):
increment_episode_op = tf.cond(reset_episode_cond, increment_episode,
no_op_int)
with tf.control_dependencies([reward, discount]):
next_state = tf_env.current_obs()
next_state_repr = state_preprocess(next_state)
with tf.control_dependencies([increment_episode_op]):
post_reward, post_meta_reward = uvf_agent.cond_begin_episode_op(
tf.logical_not(reset_episode_cond),
[state, action_ph, reward, next_state,
state_repr, next_state_repr],
mode=mode, meta_action_fn=meta_action_fn)
# Important: do manual reset after getting the final reward from the
# unreset environment.
with tf.control_dependencies([post_reward, post_meta_reward]):
cond_reset_op = tf.cond(reset_env_cond,
tf_env.reset,
tf_env.current_time_step)
# Add a dummy control dependency to force the reset_op to run
with tf.control_dependencies(cond_reset_op):
post_reward, post_meta_reward = map(tf.identity, [post_reward, post_meta_reward])
eval_step = [next_state, action_ph, transition_type, post_reward, post_meta_reward, discount, uvf_agent.context_vars, state_repr]
if callable(action):
def step_fn(sess):
action_value = action(sess)
return sess.run(eval_step, feed_dict={action_ph: action_value})
else:
action = uvf_utils.clip_to_spec(action, action_spec)
def step_fn(sess):
action_value = sess.run(action)
return sess.run(eval_step, feed_dict={action_ph: action_value})
return step_fn
@gin.configurable
def evaluate(checkpoint_dir,
eval_dir,
environment=None,
num_bin_actions=3,
agent_class=None,
meta_agent_class=None,
state_preprocess_class=None,
gamma=1.0,
num_episodes_eval=10,
eval_interval_secs=60,
max_number_of_evaluations=None,
checkpoint_timeout=None,
timeout_fn=None,
tuner_hook=None,
generate_videos=False,
generate_summaries=True,
num_episodes_videos=5,
video_settings=None,
eval_modes=('eval',),
eval_model_rollout=False,
policy_save_dir='policy',
checkpoint_range=None,
checkpoint_path=None,
max_steps_per_episode=None,
evaluate_nohrl=False):
"""Loads and repeatedly evaluates a checkpointed model at a set interval.
Args:
checkpoint_dir: The directory where the checkpoints reside.
eval_dir: Directory to save the evaluation summary results.
environment: A BaseEnvironment to evaluate.
num_bin_actions: Number of bins for discretizing continuous actions.
agent_class: An RL agent class.
meta_agent_class: A Meta agent class.
gamma: Discount factor for the reward.
num_episodes_eval: Number of episodes to evaluate and average reward over.
eval_interval_secs: The number of seconds between each evaluation run.
max_number_of_evaluations: The max number of evaluations. If None the
evaluation continues indefinitely.
checkpoint_timeout: The maximum amount of time to wait between checkpoints.
If left as `None`, then the process will wait indefinitely.
timeout_fn: Optional function to call after a timeout.
tuner_hook: A callable that takes the average reward and global step and
updates a Vizier tuner trial.
generate_videos: Whether to generate videos of the agent in action.
generate_summaries: Whether to generate summaries.
num_episodes_videos: Number of episodes to evaluate for generating videos.
video_settings: Settings for generating videos of the agent.
optimal action based on the critic.
eval_modes: A tuple of eval modes.
eval_model_rollout: Evaluate model rollout.
policy_save_dir: Optional sub-directory where the policies are
saved.
checkpoint_range: Optional. If provided, evaluate all checkpoints in
the range.
checkpoint_path: Optional sub-directory specifying which checkpoint to
evaluate. If None, will evaluate the most recent checkpoint.
"""
tf_env = create_maze_env.TFPyEnvironment(environment)
observation_spec = [tf_env.observation_spec()]
action_spec = [tf_env.action_spec()]
assert max_steps_per_episode, 'max_steps_per_episode need to be set'
if agent_class.ACTION_TYPE == 'discrete':
assert False
else:
assert agent_class.ACTION_TYPE == 'continuous'
if meta_agent_class is not None:
assert agent_class.ACTION_TYPE == meta_agent_class.ACTION_TYPE
with tf.variable_scope('meta_agent'):
meta_agent = meta_agent_class(
observation_spec,
action_spec,
tf_env,
)
else:
meta_agent = None
with tf.variable_scope('uvf_agent'):
uvf_agent = agent_class(
observation_spec,
action_spec,
tf_env,
)
uvf_agent.set_meta_agent(agent=meta_agent)
with tf.variable_scope('state_preprocess'):
state_preprocess = state_preprocess_class()
# run both actor and critic once to ensure networks are initialized
# and gin configs will be saved
# pylint: disable=protected-access
temp_states = tf.expand_dims(
tf.zeros(
dtype=uvf_agent._observation_spec.dtype,
shape=uvf_agent._observation_spec.shape), 0)
# pylint: enable=protected-access
temp_actions = uvf_agent.actor_net(temp_states)
uvf_agent.critic_net(temp_states, temp_actions)
# create eval_step_fns for each action function
eval_step_fns = dict()
meta_agent = uvf_agent.meta_agent
for meta in [True] + [False] * evaluate_nohrl:
meta_tag = 'hrl' if meta else 'nohrl'
uvf_agent.set_meta_agent(meta_agent if meta else None)
for mode in eval_modes:
# wrap environment
wrapped_environment = uvf_agent.get_env_base_wrapper(
environment, mode=mode)
action_wrapper = lambda agent_: agent_.action
action_fn = action_wrapper(uvf_agent)
meta_action_fn = action_wrapper(meta_agent)
eval_step_fns['%s_%s' % (mode, meta_tag)] = (get_eval_step(
uvf_agent=uvf_agent,
state_preprocess=state_preprocess,
tf_env=tf_env,
action_fn=action_fn,
meta_action_fn=meta_action_fn,
environment_steps=tf.Variable(
0, dtype=tf.int64, name='environment_steps'),
num_episodes=tf.Variable(0, dtype=tf.int64, name='num_episodes'),
mode=mode), wrapped_environment,)
model_rollout_fn = None
if eval_model_rollout:
model_rollout_fn = get_model_rollout(uvf_agent, tf_env)
tf.train.get_or_create_global_step()
if policy_save_dir:
checkpoint_dir = os.path.join(checkpoint_dir, policy_save_dir)
tf.logging.info('Evaluating policies at %s', checkpoint_dir)
tf.logging.info('Running episodes for max %d steps', max_steps_per_episode)
evaluate_checkpoint_fn = get_evaluate_checkpoint_fn(
'', eval_dir, eval_step_fns, model_rollout_fn, gamma,
max_steps_per_episode, num_episodes_eval, num_episodes_videos, tuner_hook,
generate_videos, generate_summaries, video_settings)
if checkpoint_path is not None:
checkpoint_path = os.path.join(checkpoint_dir, checkpoint_path)
evaluate_checkpoint_fn(checkpoint_path)
elif checkpoint_range is not None:
model_files = tf.gfile.Glob(
os.path.join(checkpoint_dir, 'model.ckpt-*.index'))
tf.logging.info('Found %s policies at %s', len(model_files), checkpoint_dir)
model_files = {
int(f.split('model.ckpt-', 1)[1].split('.', 1)[0]):
os.path.splitext(f)[0]
for f in model_files
}
model_files = {
k: v
for k, v in model_files.items()
if k >= checkpoint_range[0] and k <= checkpoint_range[1]
}
tf.logging.info('Evaluating %d policies at %s',
len(model_files), checkpoint_dir)
for _, checkpoint_path in sorted(model_files.items()):
evaluate_checkpoint_fn(checkpoint_path)
else:
eval_utils.evaluate_checkpoint_repeatedly(
checkpoint_dir,
evaluate_checkpoint_fn,
eval_interval_secs=eval_interval_secs,
max_number_of_evaluations=max_number_of_evaluations,
checkpoint_timeout=checkpoint_timeout,
timeout_fn=timeout_fn)
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Random policy on an environment."""
import tensorflow as tf
import numpy as np
import random
from environments import create_maze_env
app = tf.app
flags = tf.flags
logging = tf.logging
FLAGS = flags.FLAGS
flags.DEFINE_string('env', 'AntMaze', 'environment name: AntMaze, AntPush, or AntFall')
flags.DEFINE_integer('episode_length', 500, 'episode length')
flags.DEFINE_integer('num_episodes', 50, 'number of episodes')
def get_goal_sample_fn(env_name):
if env_name == 'AntMaze':
# NOTE: When evaluating (i.e. the metrics shown in the paper,
# we use the commented out goal sampling function. The uncommented
# one is only used for training.
#return lambda: np.array([0., 16.])
return lambda: np.random.uniform((-4, -4), (20, 20))
elif env_name == 'AntPush':
return lambda: np.array([0., 19.])
elif env_name == 'AntFall':
return lambda: np.array([0., 27., 4.5])
else:
assert False, 'Unknown env'
def get_reward_fn(env_name):
if env_name == 'AntMaze':
return lambda obs, goal: -np.sum(np.square(obs[:2] - goal)) ** 0.5
elif env_name == 'AntPush':
return lambda obs, goal: -np.sum(np.square(obs[:2] - goal)) ** 0.5
elif env_name == 'AntFall':
return lambda obs, goal: -np.sum(np.square(obs[:3] - goal)) ** 0.5
else:
assert False, 'Unknown env'
def success_fn(last_reward):
return last_reward > -5.0
class EnvWithGoal(object):
def __init__(self, base_env, env_name):
self.base_env = base_env
self.goal_sample_fn = get_goal_sample_fn(env_name)
self.reward_fn = get_reward_fn(env_name)
self.goal = None
def reset(self):
obs = self.base_env.reset()
self.goal = self.goal_sample_fn()
return np.concatenate([obs, self.goal])
def step(self, a):
obs, _, done, info = self.base_env.step(a)
reward = self.reward_fn(obs, self.goal)
return np.concatenate([obs, self.goal]), reward, done, info
@property
def action_space(self):
return self.base_env.action_space
def run_environment(env_name, episode_length, num_episodes):
env = EnvWithGoal(
create_maze_env.create_maze_env(env_name).gym,
env_name)
def action_fn(obs):
action_space = env.action_space
action_space_mean = (action_space.low + action_space.high) / 2.0
action_space_magn = (action_space.high - action_space.low) / 2.0
random_action = (action_space_mean +
action_space_magn *
np.random.uniform(low=-1.0, high=1.0,
size=action_space.shape))
return random_action
rewards = []
successes = []
for ep in range(num_episodes):
rewards.append(0.0)
successes.append(False)
obs = env.reset()
for _ in range(episode_length):
obs, reward, done, _ = env.step(action_fn(obs))
rewards[-1] += reward
successes[-1] = success_fn(reward)
if done:
break
logging.info('Episode %d reward: %.2f, Success: %d', ep + 1, rewards[-1], successes[-1])
logging.info('Average Reward over %d episodes: %.2f',
num_episodes, np.mean(rewards))
logging.info('Average Success over %d episodes: %.2f',
num_episodes, np.mean(successes))
def main(unused_argv):
logging.set_verbosity(logging.INFO)
run_environment(FLAGS.env, FLAGS.episode_length, FLAGS.num_episodes)
if __name__ == '__main__':
app.run()
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
r"""Script for evaluating a UVF agent.
To run locally: See scripts/local_eval.py
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import gin.tf
# pylint: disable=unused-import
import eval as eval_
# pylint: enable=unused-import
flags = tf.app.flags
FLAGS = flags.FLAGS
def main(_):
tf.logging.set_verbosity(tf.logging.INFO)
assert FLAGS.checkpoint_dir, "Flag 'checkpoint_dir' must be set."
assert FLAGS.eval_dir, "Flag 'eval_dir' must be set."
if FLAGS.config_file:
for config_file in FLAGS.config_file:
gin.parse_config_file(config_file)
if FLAGS.params:
gin.parse_config(FLAGS.params)
eval_.evaluate(FLAGS.checkpoint_dir, FLAGS.eval_dir)
if __name__ == "__main__":
tf.app.run()
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
r"""Script for training an RL agent using the UVF algorithm.
To run locally: See scripts/local_train.py
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import gin.tf
# pylint: enable=unused-import
import train
# pylint: disable=unused-import
flags = tf.app.flags
FLAGS = flags.FLAGS
def main(_):
tf.logging.set_verbosity(tf.logging.INFO)
if FLAGS.config_file:
for config_file in FLAGS.config_file:
gin.parse_config_file(config_file)
if FLAGS.params:
gin.parse_config(FLAGS.params)
assert FLAGS.train_dir, "Flag 'train_dir' must be set."
return train.train_uvf(FLAGS.train_dir)
if __name__ == '__main__':
tf.app.run()
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Script to run run_eval.py locally.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from subprocess import call
import sys
CONFIGS_PATH = 'configs'
CONTEXT_CONFIGS_PATH = 'context/configs'
def main():
bb = './'
base_num_args = 6
if len(sys.argv) < base_num_args:
print(
"usage: python %s <exp_name> <context_setting_gin> <context_gin> "
"<agent_gin> <suite> [params...]"
% sys.argv[0])
sys.exit(0)
exp = sys.argv[1]
context_setting = sys.argv[2]
context = sys.argv[3]
agent = sys.argv[4]
assert sys.argv[5] in ["suite"], "args[5] must be `suite'"
suite = ""
binary = "python {bb}/run_eval{suite}.py ".format(bb=bb, suite=suite)
h = os.environ["HOME"]
ucp = CONFIGS_PATH
ccp = CONTEXT_CONFIGS_PATH
extra = ''
command_str = ("{binary} "
"--logtostderr "
"--checkpoint_dir={h}/tmp/{context_setting}/{context}/{agent}/{exp}/train "
"--eval_dir={h}/tmp/{context_setting}/{context}/{agent}/{exp}/eval "
"--config_file={ucp}/{agent}.gin "
"--config_file={ucp}/eval_{extra}uvf.gin "
"--config_file={ccp}/{context_setting}.gin "
"--config_file={ccp}/{context}.gin ").format(
h=h,
ucp=ucp,
ccp=ccp,
context_setting=context_setting,
context=context,
agent=agent,
extra=extra,
suite=suite,
exp=exp,
binary=binary)
for extra_arg in sys.argv[base_num_args:]:
command_str += "--params='%s' " % extra_arg
print(command_str)
call(command_str, shell=True)
if __name__ == "__main__":
main()
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Script to run run_train.py locally.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import random
from subprocess import call
import sys
CONFIGS_PATH = './configs'
CONTEXT_CONFIGS_PATH = './context/configs'
def main():
bb = './'
base_num_args = 6
if len(sys.argv) < base_num_args:
print(
"usage: python %s <exp_name> <context_setting_gin> <env_context_gin> "
"<agent_gin> <suite> [params...]"
% sys.argv[0])
sys.exit(0)
exp = sys.argv[1] # Name for experiment, e.g. 'test001'
context_setting = sys.argv[2] # Context setting, e.g. 'hiro_orig'
context = sys.argv[3] # Environment-specific context, e.g. 'ant_maze'
agent = sys.argv[4] # Agent settings, e.g. 'base_uvf'
assert sys.argv[5] in ["suite"], "args[5] must be `suite'"
suite = ""
binary = "python {bb}/run_train{suite}.py ".format(bb=bb, suite=suite)
h = os.environ["HOME"]
ucp = CONFIGS_PATH
ccp = CONTEXT_CONFIGS_PATH
extra = ''
port = random.randint(2000, 8000)
command_str = ("{binary} "
"--train_dir={h}/tmp/{context_setting}/{context}/{agent}/{exp}/train "
"--config_file={ucp}/{agent}.gin "
"--config_file={ucp}/train_{extra}uvf.gin "
"--config_file={ccp}/{context_setting}.gin "
"--config_file={ccp}/{context}.gin "
"--summarize_gradients=False "
"--save_interval_secs=60 "
"--save_summaries_secs=1 "
"--master=local "
"--alsologtostderr ").format(h=h, ucp=ucp,
context_setting=context_setting,
context=context, ccp=ccp,
suite=suite, agent=agent, extra=extra,
exp=exp, binary=binary,
port=port)
for extra_arg in sys.argv[base_num_args:]:
command_str += "--params='%s' " % extra_arg
print(command_str)
call(command_str, shell=True)
if __name__ == "__main__":
main()
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
r"""Script for training an RL agent using the UVF algorithm.
To run locally: See run_train.py
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import tensorflow as tf
slim = tf.contrib.slim
import gin.tf
# pylint: disable=unused-import
import train_utils
import agent as agent_
from agents import circular_buffer
from utils import utils as uvf_utils
from environments import create_maze_env
# pylint: enable=unused-import
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string('goal_sample_strategy', 'sample',
'None, sample, FuN')
LOAD_PATH = None
def collect_experience(tf_env, agent, meta_agent, state_preprocess,
replay_buffer, meta_replay_buffer,
action_fn, meta_action_fn,
environment_steps, num_episodes, num_resets,
episode_rewards, episode_meta_rewards,
store_context,
disable_agent_reset):
"""Collect experience in a tf_env into a replay_buffer using action_fn.
Args:
tf_env: A TFEnvironment.
agent: A UVF agent.
meta_agent: A Meta Agent.
replay_buffer: A Replay buffer to collect experience in.
meta_replay_buffer: A Replay buffer to collect meta agent experience in.
action_fn: A function to produce actions given current state.
meta_action_fn: A function to produce meta actions given current state.
environment_steps: A variable to count the number of steps in the tf_env.
num_episodes: A variable to count the number of episodes.
num_resets: A variable to count the number of resets.
store_context: A boolean to check if store context in replay.
disable_agent_reset: A boolean that disables agent from resetting.
Returns:
A collect_experience_op that excute an action and store into the
replay_buffers
"""
tf_env.start_collect()
state = tf_env.current_obs()
state_repr = state_preprocess(state)
action = action_fn(state, context=None)
with tf.control_dependencies([state]):
transition_type, reward, discount = tf_env.step(action)
def increment_step():
return environment_steps.assign_add(1)
def increment_episode():
return num_episodes.assign_add(1)
def increment_reset():
return num_resets.assign_add(1)
def update_episode_rewards(context_reward, meta_reward, reset):
new_episode_rewards = tf.concat(
[episode_rewards[:1] + context_reward, episode_rewards[1:]], 0)
new_episode_meta_rewards = tf.concat(
[episode_meta_rewards[:1] + meta_reward,
episode_meta_rewards[1:]], 0)
return tf.group(
episode_rewards.assign(
tf.cond(reset,
lambda: tf.concat([[0.], episode_rewards[:-1]], 0),
lambda: new_episode_rewards)),
episode_meta_rewards.assign(
tf.cond(reset,
lambda: tf.concat([[0.], episode_meta_rewards[:-1]], 0),
lambda: new_episode_meta_rewards)))
def no_op_int():
return tf.constant(0, dtype=tf.int64)
step_cond = agent.step_cond_fn(state, action,
transition_type,
environment_steps, num_episodes)
reset_episode_cond = agent.reset_episode_cond_fn(
state, action,
transition_type, environment_steps, num_episodes)
reset_env_cond = agent.reset_env_cond_fn(state, action,
transition_type,
environment_steps, num_episodes)
increment_step_op = tf.cond(step_cond, increment_step, no_op_int)
increment_episode_op = tf.cond(reset_episode_cond, increment_episode,
no_op_int)
increment_reset_op = tf.cond(reset_env_cond, increment_reset, no_op_int)
increment_op = tf.group(increment_step_op, increment_episode_op,
increment_reset_op)
with tf.control_dependencies([increment_op, reward, discount]):
next_state = tf_env.current_obs()
next_state_repr = state_preprocess(next_state)
next_reset_episode_cond = tf.logical_or(
agent.reset_episode_cond_fn(
state, action,
transition_type, environment_steps, num_episodes),
tf.equal(discount, 0.0))
if store_context:
context = [tf.identity(var) + tf.zeros_like(var) for var in agent.context_vars]
meta_context = [tf.identity(var) + tf.zeros_like(var) for var in meta_agent.context_vars]
else:
context = []
meta_context = []
with tf.control_dependencies([next_state] + context + meta_context):
if disable_agent_reset:
collect_experience_ops = [tf.no_op()] # don't reset agent
else:
collect_experience_ops = agent.cond_begin_episode_op(
tf.logical_not(reset_episode_cond),
[state, action, reward, next_state,
state_repr, next_state_repr],
mode='explore', meta_action_fn=meta_action_fn)
context_reward, meta_reward = collect_experience_ops
collect_experience_ops = list(collect_experience_ops)
collect_experience_ops.append(
update_episode_rewards(tf.reduce_sum(context_reward), meta_reward,
reset_episode_cond))
meta_action_every_n = agent.tf_context.meta_action_every_n
with tf.control_dependencies(collect_experience_ops):
transition = [state, action, reward, discount, next_state]
meta_action = tf.to_float(
tf.concat(context, -1)) # Meta agent action is low-level context
meta_end = tf.logical_and( # End of meta-transition.
tf.equal(agent.tf_context.t % meta_action_every_n, 1),
agent.tf_context.t > 1)
with tf.variable_scope(tf.get_variable_scope(), reuse=tf.AUTO_REUSE):
states_var = tf.get_variable('states_var',
[meta_action_every_n, state.shape[-1]],
state.dtype)
actions_var = tf.get_variable('actions_var',
[meta_action_every_n, action.shape[-1]],
action.dtype)
state_var = tf.get_variable('state_var', state.shape, state.dtype)
reward_var = tf.get_variable('reward_var', reward.shape, reward.dtype)
meta_action_var = tf.get_variable('meta_action_var',
meta_action.shape, meta_action.dtype)
meta_context_var = [
tf.get_variable('meta_context_var%d' % idx,
meta_context[idx].shape, meta_context[idx].dtype)
for idx in range(len(meta_context))]
actions_var_upd = tf.scatter_update(
actions_var, (agent.tf_context.t - 2) % meta_action_every_n, action)
with tf.control_dependencies([actions_var_upd]):
actions = tf.identity(actions_var) + tf.zeros_like(actions_var)
meta_reward = tf.identity(meta_reward) + tf.zeros_like(meta_reward)
meta_reward = tf.reshape(meta_reward, reward.shape)
reward = 0.1 * meta_reward
meta_transition = [state_var, meta_action_var,
reward_var + reward,
discount * (1 - tf.to_float(next_reset_episode_cond)),
next_state]
meta_transition.extend([states_var, actions])
if store_context: # store current and next context into replay
transition += context + list(agent.context_vars)
meta_transition += meta_context_var + list(meta_agent.context_vars)
meta_step_cond = tf.squeeze(tf.logical_and(step_cond, tf.logical_or(next_reset_episode_cond, meta_end)))
collect_experience_op = tf.group(
replay_buffer.maybe_add(transition, step_cond),
meta_replay_buffer.maybe_add(meta_transition, meta_step_cond),
)
with tf.control_dependencies([collect_experience_op]):
collect_experience_op = tf.cond(reset_env_cond,
tf_env.reset,
tf_env.current_time_step)
meta_period = tf.equal(agent.tf_context.t % meta_action_every_n, 1)
states_var_upd = tf.scatter_update(
states_var, (agent.tf_context.t - 1) % meta_action_every_n,
next_state)
state_var_upd = tf.assign(
state_var,
tf.cond(meta_period, lambda: next_state, lambda: state_var))
reward_var_upd = tf.assign(
reward_var,
tf.cond(meta_period,
lambda: tf.zeros_like(reward_var),
lambda: reward_var + reward))
meta_action = tf.to_float(tf.concat(agent.context_vars, -1))
meta_action_var_upd = tf.assign(
meta_action_var,
tf.cond(meta_period, lambda: meta_action, lambda: meta_action_var))
meta_context_var_upd = [
tf.assign(
meta_context_var[idx],
tf.cond(meta_period,
lambda: meta_agent.context_vars[idx],
lambda: meta_context_var[idx]))
for idx in range(len(meta_context))]
return tf.group(
collect_experience_op,
states_var_upd,
state_var_upd,
reward_var_upd,
meta_action_var_upd,
*meta_context_var_upd)
def sample_best_meta_actions(state_reprs, next_state_reprs, prev_meta_actions,
low_states, low_actions, low_state_reprs,
inverse_dynamics, uvf_agent, k=10):
"""Return meta-actions which approximately maximize low-level log-probs."""
sampled_actions = inverse_dynamics.sample(state_reprs, next_state_reprs, k, prev_meta_actions)
sampled_actions = tf.stop_gradient(sampled_actions)
sampled_log_probs = tf.reshape(uvf_agent.log_probs(
tf.tile(low_states, [k, 1, 1]),
tf.tile(low_actions, [k, 1, 1]),
tf.tile(low_state_reprs, [k, 1, 1]),
[tf.reshape(sampled_actions, [-1, sampled_actions.shape[-1]])]),
[k, low_states.shape[0],
low_states.shape[1], -1])
fitness = tf.reduce_sum(sampled_log_probs, [2, 3])
best_actions = tf.argmax(fitness, 0)
actions = tf.gather_nd(
sampled_actions,
tf.stack([best_actions,
tf.range(prev_meta_actions.shape[0], dtype=tf.int64)], -1))
return actions
@gin.configurable
def train_uvf(train_dir,
environment=None,
num_bin_actions=3,
agent_class=None,
meta_agent_class=None,
state_preprocess_class=None,
inverse_dynamics_class=None,
exp_action_wrapper=None,
replay_buffer=None,
meta_replay_buffer=None,
replay_num_steps=1,
meta_replay_num_steps=1,
critic_optimizer=None,
actor_optimizer=None,
meta_critic_optimizer=None,
meta_actor_optimizer=None,
repr_optimizer=None,
relabel_contexts=False,
meta_relabel_contexts=False,
batch_size=64,
repeat_size=0,
num_episodes_train=2000,
initial_episodes=2,
initial_steps=None,
num_updates_per_observation=1,
num_collect_per_update=1,
num_collect_per_meta_update=1,
gamma=1.0,
meta_gamma=1.0,
reward_scale_factor=1.0,
target_update_period=1,
should_stop_early=None,
clip_gradient_norm=0.0,
summarize_gradients=False,
debug_summaries=False,
log_every_n_steps=100,
prefetch_queue_capacity=2,
policy_save_dir='policy',
save_policy_every_n_steps=1000,
save_policy_interval_secs=0,
replay_context_ratio=0.0,
next_state_as_context_ratio=0.0,
state_index=0,
zero_timer_ratio=0.0,
timer_index=-1,
debug=False,
max_policies_to_save=None,
max_steps_per_episode=None,
load_path=LOAD_PATH):
"""Train an agent."""
tf_env = create_maze_env.TFPyEnvironment(environment)
observation_spec = [tf_env.observation_spec()]
action_spec = [tf_env.action_spec()]
max_steps_per_episode = max_steps_per_episode or tf_env.pyenv.max_episode_steps
assert max_steps_per_episode, 'max_steps_per_episode need to be set'
if initial_steps is None:
initial_steps = initial_episodes * max_steps_per_episode
if agent_class.ACTION_TYPE == 'discrete':
assert False
else:
assert agent_class.ACTION_TYPE == 'continuous'
assert agent_class.ACTION_TYPE == meta_agent_class.ACTION_TYPE
with tf.variable_scope('meta_agent'):
meta_agent = meta_agent_class(
observation_spec,
action_spec,
tf_env,
debug_summaries=debug_summaries)
meta_agent.set_replay(replay=meta_replay_buffer)
with tf.variable_scope('uvf_agent'):
uvf_agent = agent_class(
observation_spec,
action_spec,
tf_env,
debug_summaries=debug_summaries)
uvf_agent.set_meta_agent(agent=meta_agent)
uvf_agent.set_replay(replay=replay_buffer)
with tf.variable_scope('state_preprocess'):
state_preprocess = state_preprocess_class()
with tf.variable_scope('inverse_dynamics'):
inverse_dynamics = inverse_dynamics_class(
meta_agent.sub_context_as_action_specs[0])
# Create counter variables
global_step = tf.contrib.framework.get_or_create_global_step()
num_episodes = tf.Variable(0, dtype=tf.int64, name='num_episodes')
num_resets = tf.Variable(0, dtype=tf.int64, name='num_resets')
num_updates = tf.Variable(0, dtype=tf.int64, name='num_updates')
num_meta_updates = tf.Variable(0, dtype=tf.int64, name='num_meta_updates')
episode_rewards = tf.Variable([0.] * 100, name='episode_rewards')
episode_meta_rewards = tf.Variable([0.] * 100, name='episode_meta_rewards')
# Create counter variables summaries
train_utils.create_counter_summaries([
('environment_steps', global_step),
('num_episodes', num_episodes),
('num_resets', num_resets),
('num_updates', num_updates),
('num_meta_updates', num_meta_updates),
('replay_buffer_adds', replay_buffer.get_num_adds()),
('meta_replay_buffer_adds', meta_replay_buffer.get_num_adds()),
])
tf.summary.scalar('avg_episode_rewards',
tf.reduce_mean(episode_rewards[1:]))
tf.summary.scalar('avg_episode_meta_rewards',
tf.reduce_mean(episode_meta_rewards[1:]))
tf.summary.histogram('episode_rewards', episode_rewards[1:])
tf.summary.histogram('episode_meta_rewards', episode_meta_rewards[1:])
# Create init ops
action_fn = uvf_agent.action
action_fn = uvf_agent.add_noise_fn(action_fn, global_step=None)
meta_action_fn = meta_agent.action
meta_action_fn = meta_agent.add_noise_fn(meta_action_fn, global_step=None)
meta_actions_fn = meta_agent.actions
meta_actions_fn = meta_agent.add_noise_fn(meta_actions_fn, global_step=None)
init_collect_experience_op = collect_experience(
tf_env,
uvf_agent,
meta_agent,
state_preprocess,
replay_buffer,
meta_replay_buffer,
action_fn,
meta_action_fn,
environment_steps=global_step,
num_episodes=num_episodes,
num_resets=num_resets,
episode_rewards=episode_rewards,
episode_meta_rewards=episode_meta_rewards,
store_context=True,
disable_agent_reset=False,
)
# Create train ops
collect_experience_op = collect_experience(
tf_env,
uvf_agent,
meta_agent,
state_preprocess,
replay_buffer,
meta_replay_buffer,
action_fn,
meta_action_fn,
environment_steps=global_step,
num_episodes=num_episodes,
num_resets=num_resets,
episode_rewards=episode_rewards,
episode_meta_rewards=episode_meta_rewards,
store_context=True,
disable_agent_reset=False,
)
train_op_list = []
repr_train_op = tf.constant(0.0)
for mode in ['meta', 'nometa']:
if mode == 'meta':
agent = meta_agent
buff = meta_replay_buffer
critic_opt = meta_critic_optimizer
actor_opt = meta_actor_optimizer
relabel = meta_relabel_contexts
num_steps = meta_replay_num_steps
my_gamma = meta_gamma,
n_updates = num_meta_updates
else:
agent = uvf_agent
buff = replay_buffer
critic_opt = critic_optimizer
actor_opt = actor_optimizer
relabel = relabel_contexts
num_steps = replay_num_steps
my_gamma = gamma
n_updates = num_updates
with tf.name_scope(mode):
batch = buff.get_random_batch(batch_size, num_steps=num_steps)
states, actions, rewards, discounts, next_states = batch[:5]
with tf.name_scope('Reward'):
tf.summary.scalar('average_step_reward', tf.reduce_mean(rewards))
rewards *= reward_scale_factor
batch_queue = slim.prefetch_queue.prefetch_queue(
[states, actions, rewards, discounts, next_states] + batch[5:],
capacity=prefetch_queue_capacity,
name='batch_queue')
batch_dequeue = batch_queue.dequeue()
if repeat_size > 0:
batch_dequeue = [
tf.tile(batch, (repeat_size+1,) + (1,) * (batch.shape.ndims - 1))
for batch in batch_dequeue
]
batch_size *= (repeat_size + 1)
states, actions, rewards, discounts, next_states = batch_dequeue[:5]
if mode == 'meta':
low_states = batch_dequeue[5]
low_actions = batch_dequeue[6]
low_state_reprs = state_preprocess(low_states)
state_reprs = state_preprocess(states)
next_state_reprs = state_preprocess(next_states)
if mode == 'meta': # Re-label meta-action
prev_actions = actions
if FLAGS.goal_sample_strategy == 'None':
pass
elif FLAGS.goal_sample_strategy == 'FuN':
actions = inverse_dynamics.sample(state_reprs, next_state_reprs, 1, prev_actions, sc=0.1)
actions = tf.stop_gradient(actions)
elif FLAGS.goal_sample_strategy == 'sample':
actions = sample_best_meta_actions(state_reprs, next_state_reprs, prev_actions,
low_states, low_actions, low_state_reprs,
inverse_dynamics, uvf_agent, k=10)
else:
assert False
if state_preprocess.trainable and mode == 'meta':
# Representation learning is based on meta-transitions, but is trained
# along with low-level policy updates.
repr_loss, _, _ = state_preprocess.loss(states, next_states, low_actions, low_states)
repr_train_op = slim.learning.create_train_op(
repr_loss,
repr_optimizer,
global_step=None,
update_ops=None,
summarize_gradients=summarize_gradients,
clip_gradient_norm=clip_gradient_norm,
variables_to_train=state_preprocess.get_trainable_vars(),)
# Get contexts for training
contexts, next_contexts = agent.sample_contexts(
mode='train', batch_size=batch_size,
state=states, next_state=next_states,
)
if not relabel: # Re-label context (in the style of TDM or HER).
contexts, next_contexts = (
batch_dequeue[-2*len(contexts):-1*len(contexts)],
batch_dequeue[-1*len(contexts):])
merged_states = agent.merged_states(states, contexts)
merged_next_states = agent.merged_states(next_states, next_contexts)
if mode == 'nometa':
context_rewards, context_discounts = agent.compute_rewards(
'train', state_reprs, actions, rewards, next_state_reprs, contexts)
elif mode == 'meta': # Meta-agent uses sum of rewards, not context-specific rewards.
_, context_discounts = agent.compute_rewards(
'train', states, actions, rewards, next_states, contexts)
context_rewards = rewards
if agent.gamma_index is not None:
context_discounts *= tf.cast(
tf.reshape(contexts[agent.gamma_index], (-1,)),
dtype=context_discounts.dtype)
else: context_discounts *= my_gamma
critic_loss = agent.critic_loss(merged_states, actions,
context_rewards, context_discounts,
merged_next_states)
critic_loss = tf.reduce_mean(critic_loss)
actor_loss = agent.actor_loss(merged_states, actions,
context_rewards, context_discounts,
merged_next_states)
actor_loss *= tf.to_float( # Only update actor every N steps.
tf.equal(n_updates % target_update_period, 0))
critic_train_op = slim.learning.create_train_op(
critic_loss,
critic_opt,
global_step=n_updates,
update_ops=None,
summarize_gradients=summarize_gradients,
clip_gradient_norm=clip_gradient_norm,
variables_to_train=agent.get_trainable_critic_vars(),)
critic_train_op = uvf_utils.tf_print(
critic_train_op, [critic_train_op],
message='critic_loss',
print_freq=1000,
name='critic_loss')
train_op_list.append(critic_train_op)
if actor_loss is not None:
actor_train_op = slim.learning.create_train_op(
actor_loss,
actor_opt,
global_step=None,
update_ops=None,
summarize_gradients=summarize_gradients,
clip_gradient_norm=clip_gradient_norm,
variables_to_train=agent.get_trainable_actor_vars(),)
actor_train_op = uvf_utils.tf_print(
actor_train_op, [actor_train_op],
message='actor_loss',
print_freq=1000,
name='actor_loss')
train_op_list.append(actor_train_op)
assert len(train_op_list) == 4
# Update targets should happen after the networks have been updated.
with tf.control_dependencies(train_op_list[2:]):
update_targets_op = uvf_utils.periodically(
uvf_agent.update_targets, target_update_period, 'update_targets')
if meta_agent is not None:
with tf.control_dependencies(train_op_list[:2]):
update_meta_targets_op = uvf_utils.periodically(
meta_agent.update_targets, target_update_period, 'update_targets')
assert_op = tf.Assert( # Hack to get training to stop.
tf.less_equal(global_step, 200 + num_episodes_train * max_steps_per_episode),
[global_step])
with tf.control_dependencies([update_targets_op, assert_op]):
train_op = tf.add_n(train_op_list[2:], name='post_update_targets')
# Representation training steps on every low-level policy training step.
train_op += repr_train_op
with tf.control_dependencies([update_meta_targets_op, assert_op]):
meta_train_op = tf.add_n(train_op_list[:2],
name='post_update_meta_targets')
if debug_summaries:
train_.gen_debug_batch_summaries(batch)
slim.summaries.add_histogram_summaries(
uvf_agent.get_trainable_critic_vars(), 'critic_vars')
slim.summaries.add_histogram_summaries(
uvf_agent.get_trainable_actor_vars(), 'actor_vars')
train_ops = train_utils.TrainOps(train_op, meta_train_op,
collect_experience_op)
policy_save_path = os.path.join(train_dir, policy_save_dir, 'model.ckpt')
policy_vars = uvf_agent.get_actor_vars() + meta_agent.get_actor_vars() + [
global_step, num_episodes, num_resets
] + list(uvf_agent.context_vars) + list(meta_agent.context_vars) + state_preprocess.get_trainable_vars()
# add critic vars, since some test evaluation depends on them
policy_vars += uvf_agent.get_trainable_critic_vars() + meta_agent.get_trainable_critic_vars()
policy_saver = tf.train.Saver(
policy_vars, max_to_keep=max_policies_to_save, sharded=False)
lowlevel_vars = (uvf_agent.get_actor_vars() +
uvf_agent.get_trainable_critic_vars() +
state_preprocess.get_trainable_vars())
lowlevel_saver = tf.train.Saver(lowlevel_vars)
def policy_save_fn(sess):
policy_saver.save(
sess, policy_save_path, global_step=global_step, write_meta_graph=False)
if save_policy_interval_secs > 0:
tf.logging.info(
'Wait %d secs after save policy.' % save_policy_interval_secs)
time.sleep(save_policy_interval_secs)
train_step_fn = train_utils.TrainStep(
max_number_of_steps=num_episodes_train * max_steps_per_episode + 100,
num_updates_per_observation=num_updates_per_observation,
num_collect_per_update=num_collect_per_update,
num_collect_per_meta_update=num_collect_per_meta_update,
log_every_n_steps=log_every_n_steps,
policy_save_fn=policy_save_fn,
save_policy_every_n_steps=save_policy_every_n_steps,
should_stop_early=should_stop_early).train_step
local_init_op = tf.local_variables_initializer()
init_targets_op = tf.group(uvf_agent.update_targets(1.0),
meta_agent.update_targets(1.0))
def initialize_training_fn(sess):
"""Initialize training function."""
sess.run(local_init_op)
sess.run(init_targets_op)
if load_path:
tf.logging.info('Restoring low-level from %s' % load_path)
lowlevel_saver.restore(sess, load_path)
global_step_value = sess.run(global_step)
assert global_step_value == 0, 'Global step should be zero.'
collect_experience_call = sess.make_callable(
init_collect_experience_op)
for _ in range(initial_steps):
collect_experience_call()
train_saver = tf.train.Saver(max_to_keep=2, sharded=True)
tf.logging.info('train dir: %s', train_dir)
return slim.learning.train(
train_ops,
train_dir,
train_step_fn=train_step_fn,
save_interval_secs=FLAGS.save_interval_secs,
saver=train_saver,
log_every_n_steps=0,
global_step=global_step,
master="",
is_chief=(FLAGS.task == 0),
save_summaries_secs=FLAGS.save_summaries_secs,
init_fn=initialize_training_fn)
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
r""""""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import namedtuple
import os
import time
import tensorflow as tf
import gin.tf
flags = tf.app.flags
flags.DEFINE_multi_string('config_file', None,
'List of paths to the config files.')
flags.DEFINE_multi_string('params', None,
'Newline separated list of Gin parameter bindings.')
flags.DEFINE_string('train_dir', None,
'Directory for writing logs/summaries during training.')
flags.DEFINE_string('master', 'local',
'BNS name of the TensorFlow master to use.')
flags.DEFINE_integer('task', 0, 'task id')
flags.DEFINE_integer('save_interval_secs', 300, 'The frequency at which '
'checkpoints are saved, in seconds.')
flags.DEFINE_integer('save_summaries_secs', 30, 'The frequency at which '
'summaries are saved, in seconds.')
flags.DEFINE_boolean('summarize_gradients', False,
'Whether to generate gradient summaries.')
FLAGS = flags.FLAGS
TrainOps = namedtuple('TrainOps',
['train_op', 'meta_train_op', 'collect_experience_op'])
class TrainStep(object):
"""Handles training step."""
def __init__(self,
max_number_of_steps=0,
num_updates_per_observation=1,
num_collect_per_update=1,
num_collect_per_meta_update=1,
log_every_n_steps=1,
policy_save_fn=None,
save_policy_every_n_steps=0,
should_stop_early=None):
"""Returns a function that is executed at each step of slim training.
Args:
max_number_of_steps: Optional maximum number of train steps to take.
num_updates_per_observation: Number of updates per observation.
log_every_n_steps: The frequency, in terms of global steps, that the loss
and global step and logged.
policy_save_fn: A tf.Saver().save function to save the policy.
save_policy_every_n_steps: How frequently to save the policy.
should_stop_early: Optional hook to report whether training should stop.
Raises:
ValueError: If policy_save_fn is not provided when
save_policy_every_n_steps > 0.
"""
if save_policy_every_n_steps and policy_save_fn is None:
raise ValueError(
'policy_save_fn is required when save_policy_every_n_steps > 0')
self.max_number_of_steps = max_number_of_steps
self.num_updates_per_observation = num_updates_per_observation
self.num_collect_per_update = num_collect_per_update
self.num_collect_per_meta_update = num_collect_per_meta_update
self.log_every_n_steps = log_every_n_steps
self.policy_save_fn = policy_save_fn
self.save_policy_every_n_steps = save_policy_every_n_steps
self.should_stop_early = should_stop_early
self.last_global_step_val = 0
self.train_op_fn = None
self.collect_and_train_fn = None
tf.logging.info('Training for %d max_number_of_steps',
self.max_number_of_steps)
def train_step(self, sess, train_ops, global_step, _):
"""This function will be called at each step of training.
This represents one step of the DDPG algorithm and can include:
1. collect a <state, action, reward, next_state> transition
2. update the target network
3. train the actor
4. train the critic
Args:
sess: A Tensorflow session.
train_ops: A DdpgTrainOps tuple of train ops to run.
global_step: The global step.
Returns:
A scalar total loss.
A boolean should stop.
"""
start_time = time.time()
if self.train_op_fn is None:
self.train_op_fn = sess.make_callable([train_ops.train_op, global_step])
self.meta_train_op_fn = sess.make_callable([train_ops.meta_train_op, global_step])
self.collect_fn = sess.make_callable([train_ops.collect_experience_op, global_step])
self.collect_and_train_fn = sess.make_callable(
[train_ops.train_op, global_step, train_ops.collect_experience_op])
self.collect_and_meta_train_fn = sess.make_callable(
[train_ops.meta_train_op, global_step, train_ops.collect_experience_op])
for _ in range(self.num_collect_per_update - 1):
self.collect_fn()
for _ in range(self.num_updates_per_observation - 1):
self.train_op_fn()
total_loss, global_step_val, _ = self.collect_and_train_fn()
if (global_step_val // self.num_collect_per_meta_update !=
self.last_global_step_val // self.num_collect_per_meta_update):
self.meta_train_op_fn()
time_elapsed = time.time() - start_time
should_stop = False
if self.max_number_of_steps:
should_stop = global_step_val >= self.max_number_of_steps
if global_step_val != self.last_global_step_val:
if (self.save_policy_every_n_steps and
global_step_val // self.save_policy_every_n_steps !=
self.last_global_step_val // self.save_policy_every_n_steps):
self.policy_save_fn(sess)
if (self.log_every_n_steps and
global_step_val % self.log_every_n_steps == 0):
tf.logging.info(
'global step %d: loss = %.4f (%.3f sec/step) (%d steps/sec)',
global_step_val, total_loss, time_elapsed, 1 / time_elapsed)
self.last_global_step_val = global_step_val
stop_early = bool(self.should_stop_early and self.should_stop_early())
return total_loss, should_stop or stop_early
def create_counter_summaries(counters):
"""Add named summaries to counters, a list of tuples (name, counter)."""
if counters:
with tf.name_scope('Counters/'):
for name, counter in counters:
tf.summary.scalar(name, counter)
def gen_debug_batch_summaries(batch):
"""Generates summaries for the sampled replay batch."""
states, actions, rewards, _, next_states = batch
with tf.name_scope('batch'):
for s in range(states.get_shape()[-1]):
tf.summary.histogram('states_%d' % s, states[:, s])
for s in range(states.get_shape()[-1]):
tf.summary.histogram('next_states_%d' % s, next_states[:, s])
for a in range(actions.get_shape()[-1]):
tf.summary.histogram('actions_%d' % a, actions[:, a])
tf.summary.histogram('rewards', rewards)
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Evaluation utility functions.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from collections import namedtuple
logging = tf.logging
import gin.tf
@gin.configurable
def evaluate_checkpoint_repeatedly(checkpoint_dir,
evaluate_checkpoint_fn,
eval_interval_secs=600,
max_number_of_evaluations=None,
checkpoint_timeout=None,
timeout_fn=None):
"""Evaluates a checkpointed model at a set interval."""
if max_number_of_evaluations is not None and max_number_of_evaluations <= 0:
raise ValueError(
'`max_number_of_evaluations` must be either None or a positive number.')
number_of_evaluations = 0
for checkpoint_path in tf.contrib.training.checkpoints_iterator(
checkpoint_dir,
min_interval_secs=eval_interval_secs,
timeout=checkpoint_timeout,
timeout_fn=timeout_fn):
retries = 3
for _ in range(retries):
try:
should_stop = evaluate_checkpoint_fn(checkpoint_path)
break
except tf.errors.DataLossError as e:
logging.warn(
'Encountered a DataLossError while evaluating a checkpoint. This '
'can happen when reading a checkpoint before it is fully written. '
'Retrying...'
)
time.sleep(2.0)
def compute_model_loss(sess, model_rollout_fn, states, actions):
"""Computes model loss."""
preds, losses = [], []
preds.append(states[0])
losses.append(0)
for state, action in zip(states[1:], actions[1:]):
pred = model_rollout_fn(sess, preds[-1], action)
loss = np.sqrt(np.sum((state - pred) ** 2))
preds.append(pred)
losses.append(loss)
return preds, losses
def compute_average_reward(sess, env_base, step_fn, gamma, num_steps,
num_episodes):
"""Computes the discounted reward for a given number of steps.
Args:
sess: The tensorflow session.
env_base: A python environment.
step_fn: A function that takes in `sess` and returns a list of
[state, action, reward, discount, transition_type] values.
gamma: discounting factor to apply to the reward.
num_steps: number of steps to compute the reward over.
num_episodes: number of episodes to average the reward over.
Returns:
average_reward: a scalar of discounted reward.
last_reward: last reward received.
"""
average_reward = 0
average_last_reward = 0
average_meta_reward = 0
average_last_meta_reward = 0
average_success = 0.
states, actions = None, None
for i in range(num_episodes):
env_base.end_episode()
env_base.begin_episode()
(reward, last_reward, meta_reward, last_meta_reward,
states, actions) = compute_reward(
sess, step_fn, gamma, num_steps)
s_reward = last_meta_reward # Navigation
success = (s_reward > -5.0) # When using diff=False
logging.info('Episode = %d, reward = %s, meta_reward = %f, '
'last_reward = %s, last meta_reward = %f, success = %s',
i, reward, meta_reward, last_reward, last_meta_reward,
success)
average_reward += reward
average_last_reward += last_reward
average_meta_reward += meta_reward
average_last_meta_reward += last_meta_reward
average_success += success
average_reward /= num_episodes
average_last_reward /= num_episodes
average_meta_reward /= num_episodes
average_last_meta_reward /= num_episodes
average_success /= num_episodes
return (average_reward, average_last_reward,
average_meta_reward, average_last_meta_reward,
average_success,
states, actions)
def compute_reward(sess, step_fn, gamma, num_steps):
"""Computes the discounted reward for a given number of steps.
Args:
sess: The tensorflow session.
step_fn: A function that takes in `sess` and returns a list of
[state, action, reward, discount, transition_type] values.
gamma: discounting factor to apply to the reward.
num_steps: number of steps to compute the reward over.
Returns:
reward: cumulative discounted reward.
last_reward: reward received at final step.
"""
total_reward = 0
total_meta_reward = 0
gamma_step = 1
states = []
actions = []
for _ in range(num_steps):
state, action, transition_type, reward, meta_reward, discount, _, _ = step_fn(sess)
total_reward += reward * gamma_step * discount
total_meta_reward += meta_reward * gamma_step * discount
gamma_step *= gamma
states.append(state)
actions.append(action)
return (total_reward, reward, total_meta_reward, meta_reward,
states, actions)
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""TensorFlow utility functions.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from copy import deepcopy
import tensorflow as tf
from tf_agents import specs
from tf_agents.utils import common
_tf_print_counts = dict()
_tf_print_running_sums = dict()
_tf_print_running_counts = dict()
_tf_print_ids = 0
def get_contextual_env_base(env_base, begin_ops=None, end_ops=None):
"""Wrap env_base with additional tf ops."""
# pylint: disable=protected-access
def init(self_, env_base):
self_._env_base = env_base
attribute_list = ["_render_mode", "_gym_env"]
for attribute in attribute_list:
if hasattr(env_base, attribute):
setattr(self_, attribute, getattr(env_base, attribute))
if hasattr(env_base, "physics"):
self_._physics = env_base.physics
elif hasattr(env_base, "gym"):
class Physics(object):
def render(self, *args, **kwargs):
return env_base.gym.render("rgb_array")
physics = Physics()
self_._physics = physics
self_.physics = physics
def set_sess(self_, sess):
self_._sess = sess
if hasattr(self_._env_base, "set_sess"):
self_._env_base.set_sess(sess)
def begin_episode(self_):
self_._env_base.reset()
if begin_ops is not None:
self_._sess.run(begin_ops)
def end_episode(self_):
self_._env_base.reset()
if end_ops is not None:
self_._sess.run(end_ops)
return type("ContextualEnvBase", (env_base.__class__,), dict(
__init__=init,
set_sess=set_sess,
begin_episode=begin_episode,
end_episode=end_episode,
))(env_base)
# pylint: enable=protected-access
def merge_specs(specs_):
"""Merge TensorSpecs.
Args:
specs_: List of TensorSpecs to be merged.
Returns:
a TensorSpec: a merged TensorSpec.
"""
shape = specs_[0].shape
dtype = specs_[0].dtype
name = specs_[0].name
for spec in specs_[1:]:
assert shape[1:] == spec.shape[1:], "incompatible shapes: %s, %s" % (
shape, spec.shape)
assert dtype == spec.dtype, "incompatible dtypes: %s, %s" % (
dtype, spec.dtype)
shape = merge_shapes((shape, spec.shape), axis=0)
return specs.TensorSpec(
shape=shape,
dtype=dtype,
name=name,
)
def merge_shapes(shapes, axis=0):
"""Merge TensorShapes.
Args:
shapes: List of TensorShapes to be merged.
axis: optional, the axis to merge shaped.
Returns:
a TensorShape: a merged TensorShape.
"""
assert len(shapes) > 1
dims = deepcopy(shapes[0].dims)
for shape in shapes[1:]:
assert shapes[0].ndims == shape.ndims
dims[axis] += shape.dims[axis]
return tf.TensorShape(dims=dims)
def get_all_vars(ignore_scopes=None):
"""Get all tf variables in scope.
Args:
ignore_scopes: A list of scope names to ignore.
Returns:
A list of all tf variables in scope.
"""
all_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
all_vars = [var for var in all_vars if ignore_scopes is None or not
any(var.name.startswith(scope) for scope in ignore_scopes)]
return all_vars
def clip(tensor, range_=None):
"""Return a tf op which clips tensor according to range_.
Args:
tensor: A Tensor to be clipped.
range_: None, or a tuple representing (minval, maxval)
Returns:
A clipped Tensor.
"""
if range_ is None:
return tf.identity(tensor)
elif isinstance(range_, (tuple, list)):
assert len(range_) == 2
return tf.clip_by_value(tensor, range_[0], range_[1])
else: raise NotImplementedError("Unacceptable range input: %r" % range_)
def clip_to_bounds(value, minimum, maximum):
"""Clips value to be between minimum and maximum.
Args:
value: (tensor) value to be clipped.
minimum: (numpy float array) minimum value to clip to.
maximum: (numpy float array) maximum value to clip to.
Returns:
clipped_value: (tensor) `value` clipped to between `minimum` and `maximum`.
"""
value = tf.minimum(value, maximum)
return tf.maximum(value, minimum)
clip_to_spec = common.clip_to_spec
def _clip_to_spec(value, spec):
"""Clips value to a given bounded tensor spec.
Args:
value: (tensor) value to be clipped.
spec: (BoundedTensorSpec) spec containing min. and max. values for clipping.
Returns:
clipped_value: (tensor) `value` clipped to be compatible with `spec`.
"""
return clip_to_bounds(value, spec.minimum, spec.maximum)
join_scope = common.join_scope
def _join_scope(parent_scope, child_scope):
"""Joins a parent and child scope using `/`, checking for empty/none.
Args:
parent_scope: (string) parent/prefix scope.
child_scope: (string) child/suffix scope.
Returns:
joined scope: (string) parent and child scopes joined by /.
"""
if not parent_scope:
return child_scope
if not child_scope:
return parent_scope
return '/'.join([parent_scope, child_scope])
def assign_vars(vars_, values):
"""Returns the update ops for assigning a list of vars.
Args:
vars_: A list of variables.
values: A list of tensors representing new values.
Returns:
A list of update ops for the variables.
"""
return [var.assign(value) for var, value in zip(vars_, values)]
def identity_vars(vars_):
"""Return the identity ops for a list of tensors.
Args:
vars_: A list of tensors.
Returns:
A list of identity ops.
"""
return [tf.identity(var) for var in vars_]
def tile(var, batch_size=1):
"""Return tiled tensor.
Args:
var: A tensor representing the state.
batch_size: Batch size.
Returns:
A tensor with shape [batch_size,] + var.shape.
"""
batch_var = tf.tile(
tf.expand_dims(var, 0),
(batch_size,) + (1,) * var.get_shape().ndims)
return batch_var
def batch_list(vars_list):
"""Batch a list of variables.
Args:
vars_list: A list of tensor variables.
Returns:
A list of tensor variables with additional first dimension.
"""
return [tf.expand_dims(var, 0) for var in vars_list]
def tf_print(op,
tensors,
message="",
first_n=-1,
name=None,
sub_messages=None,
print_freq=-1,
include_count=True):
"""tf.Print, but to stdout."""
# TODO(shanegu): `name` is deprecated. Remove from the rest of codes.
global _tf_print_ids
_tf_print_ids += 1
name = _tf_print_ids
_tf_print_counts[name] = 0
if print_freq > 0:
_tf_print_running_sums[name] = [0 for _ in tensors]
_tf_print_running_counts[name] = 0
def print_message(*xs):
"""print message fn."""
_tf_print_counts[name] += 1
if print_freq > 0:
for i, x in enumerate(xs):
_tf_print_running_sums[name][i] += x
_tf_print_running_counts[name] += 1
if (print_freq <= 0 or _tf_print_running_counts[name] >= print_freq) and (
first_n < 0 or _tf_print_counts[name] <= first_n):
for i, x in enumerate(xs):
if print_freq > 0:
del x
x = _tf_print_running_sums[name][i]/_tf_print_running_counts[name]
if sub_messages is None:
sub_message = str(i)
else:
sub_message = sub_messages[i]
log_message = "%s, %s" % (message, sub_message)
if include_count:
log_message += ", count=%d" % _tf_print_counts[name]
tf.logging.info("[%s]: %s" % (log_message, x))
if print_freq > 0:
for i, x in enumerate(xs):
_tf_print_running_sums[name][i] = 0
_tf_print_running_counts[name] = 0
return xs[0]
print_op = tf.py_func(print_message, tensors, tensors[0].dtype)
with tf.control_dependencies([print_op]):
op = tf.identity(op)
return op
periodically = common.periodically
def _periodically(body, period, name='periodically'):
"""Periodically performs a tensorflow op."""
if period is None or period == 0:
return tf.no_op()
if period < 0:
raise ValueError("period cannot be less than 0.")
if period == 1:
return body()
with tf.variable_scope(None, default_name=name):
counter = tf.get_variable(
"counter",
shape=[],
dtype=tf.int64,
trainable=False,
initializer=tf.constant_initializer(period, dtype=tf.int64))
def _wrapped_body():
with tf.control_dependencies([body()]):
return counter.assign(1)
update = tf.cond(
tf.equal(counter, period), _wrapped_body,
lambda: counter.assign_add(1))
return update
soft_variables_update = common.soft_variables_update
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment