worldmodel.py 5.85 KB
Newer Older
Jacob Buckman's avatar
Jacob Buckman committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import tensorflow as tf
import numpy as np
import nn

from learner import CoreModel

class DeterministicWorldModel(CoreModel):
  """
  A simple feed-forward neural network world model, with an option for an ensemble.
  """

  @property
  def saveid(self):
    return "worldmodel"

  def create_params(self, env_config, learner_config):
    self.obs_dim = np.prod(env_config["obs_dims"])
    self.action_dim = env_config["action_dim"]
    self.reward_scale = env_config["reward_scale"]
    self.discount = env_config["discount"]

    self.aux_hidden_dim = self.learner_config["aux_hidden_dim"]
    self.transition_hidden_dim = self.learner_config["transition_hidden_dim"]
    self.bayesian_config = self.learner_config["bayesian"]

    with tf.variable_scope(self.name):
      if self.bayesian_config:
        self.transition_predictor = nn.EnsembleFeedForwardNet('transition_predictor', self.obs_dim + self.action_dim, [self.obs_dim], layers=8, hidden_dim=self.transition_hidden_dim, get_uncertainty=True, ensemble_size=self.bayesian_config["transition"]["ensemble_size"], train_sample_count=self.bayesian_config["transition"]["train_sample_count"], eval_sample_count=self.bayesian_config["transition"]["eval_sample_count"])
        self.done_predictor =       nn.EnsembleFeedForwardNet('done_predictor', self.obs_dim + self.obs_dim + self.action_dim, [], layers=4, hidden_dim=self.aux_hidden_dim, get_uncertainty=True, ensemble_size=self.bayesian_config["transition"]["ensemble_size"], train_sample_count=self.bayesian_config["transition"]["train_sample_count"], eval_sample_count=self.bayesian_config["transition"]["eval_sample_count"])
        self.reward_predictor =     nn.EnsembleFeedForwardNet('reward_predictor', self.obs_dim + self.obs_dim + self.action_dim, [], layers=4, hidden_dim=self.aux_hidden_dim, get_uncertainty=True, ensemble_size=self.bayesian_config["reward"]["ensemble_size"], train_sample_count=self.bayesian_config["reward"]["train_sample_count"], eval_sample_count=self.bayesian_config["reward"]["eval_sample_count"])
      else:
        self.transition_predictor = nn.FeedForwardNet('transition_predictor', self.obs_dim + self.action_dim, [self.obs_dim], layers=8, hidden_dim=self.transition_hidden_dim, get_uncertainty=True)
        self.done_predictor =       nn.FeedForwardNet('done_predictor',   self.obs_dim + self.obs_dim + self.action_dim, [], layers=4, hidden_dim=self.aux_hidden_dim, get_uncertainty=True)
        self.reward_predictor =     nn.FeedForwardNet('reward_predictor', self.obs_dim + self.obs_dim + self.action_dim, [], layers=4, hidden_dim=self.aux_hidden_dim, get_uncertainty=True)

  def get_ensemble_idx_info(self):
    if self.bayesian_config is not False:
      ensemble_idxs = tf.random_shuffle(tf.range(self.transition_predictor.ensemble_size))
      transition_ensemble_sample_n = self.transition_predictor.eval_sample_count
      reward_ensemble_sample_n = self.reward_predictor.eval_sample_count
      ensemble_idxs = ensemble_idxs[:transition_ensemble_sample_n]
      return ensemble_idxs, transition_ensemble_sample_n, reward_ensemble_sample_n
    else:
      return None, 1, 1

  def build_training_graph(self, obs, next_obs, actions, rewards, dones, data_size):
    info = tf.concat([obs, actions], -1)
    predicted_next_obs = self.transition_predictor(info, is_eval=False, reduce_mode="random") + obs
    next_info = tf.concat([next_obs, info], -1)
    predicted_dones = self.done_predictor(next_info, is_eval=False, reduce_mode="random")
    predicted_rewards = self.reward_predictor(next_info, is_eval=False, reduce_mode="random")

    done_losses = tf.nn.sigmoid_cross_entropy_with_logits(labels=dones, logits=predicted_dones)
    reward_losses = .5 * tf.square(rewards - predicted_rewards)
    next_obs_losses = .5 * tf.reduce_sum(tf.square(next_obs - predicted_next_obs), -1)

    done_loss = tf.reduce_mean(done_losses)
    reward_loss = tf.reduce_mean(reward_losses)
    next_obs_loss = tf.reduce_mean(next_obs_losses)
    reg_loss = .0001 * (self.done_predictor.l2_loss() +
                        self.reward_predictor.l2_loss() +
                        self.transition_predictor.l2_loss())

    total_loss = done_loss + reward_loss + next_obs_loss + reg_loss

    inspect = (total_loss, done_loss, reward_loss, next_obs_loss, reg_loss)

    return total_loss, inspect

  def init_extra_info(self, obs):
    return tf.zeros_like(obs)

  def transition(self, obs, action, extra_info, ensemble_idxs=None, pre_expanded=None):
    info = tf.concat([obs, action], -1)
    next_obs_delta = self.transition_predictor(info, reduce_mode="none", ensemble_idxs=ensemble_idxs, pre_expanded=pre_expanded)
    if ensemble_idxs is None:
      next_obs = tf.expand_dims(obs,-2) + next_obs_delta
      next_info = tf.concat([next_obs, tf.expand_dims(info,-2)], -1)
    else:
      next_obs = obs + next_obs_delta
      next_info = tf.concat([next_obs, info], -1)
    done = tf.nn.sigmoid(self.done_predictor(next_info, reduce_mode="none", ensemble_idxs=ensemble_idxs, pre_expanded=True))
    extra_info = tf.zeros_like(obs)
    return next_obs, done, extra_info

  def get_rewards(self, obs, action, next_obs):
    next_info = tf.concat([next_obs, obs, action], -1)
    reward = self.reward_predictor(next_info, reduce_mode="none")
    return reward