envwrap.py 3.15 KB
Newer Older
Jacob Buckman's avatar
Jacob Buckman committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from builtins import object
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

try:
  import roboschool
except:
  pass
import gym
import numpy as np

from config import config

MAX_FRAMES = config["env"]["max_frames"]

gym.logger.level=40

def get_env(env_name, *args, **kwargs):
  MAPPING = {
    "CartPole-v0": CartPoleWrapper,
  }
  if env_name in MAPPING: return MAPPING[env_name](env_name, *args, **kwargs)
  else: return NoTimeLimitMujocoWrapper(env_name, *args, **kwargs)

class GymWrapper(object):
  """
  Generic wrapper for OpenAI gym environments.
  """
  def __init__(self, env_name):
    self.internal_env = gym.make(env_name)
    self.observation_space = self.internal_env.observation_space
    self.action_space = self.internal_env.action_space
    self.custom_init()

  def custom_init(self):
    pass

  def reset(self):
    self.clock = 0
    return self.preprocess_obs(self.internal_env.reset())

  # returns normalized actions
  def sample(self):
    return self.action_space.sample()

  # this is used for converting continuous approximations back to the original domain
  def normalize_actions(self, actions):
    return actions

  # puts actions into a form where they can be predicted. by default, called after sample()
  def unnormalize_actions(self, actions):
    return actions

  def preprocess_obs(self, obs):
    # return np.append(obs, [self.clock/float(MAX_FRAMES)])
    return obs

  def step(self, normalized_action):
    out = self.internal_env.step(normalized_action)
    self.clock += 1
    obs, reward, done = self.preprocess_obs(out[0]), out[1], float(out[2])
    reset = done == 1. or self.clock == MAX_FRAMES
    return obs, reward, done, reset

  def render_rollout(self, states):
    ## states is numpy array of size [timesteps, state]
    self.internal_env.reset()
    for state in states:
      self.internal_env.env.state = state
      self.internal_env.render()

class CartPoleWrapper(GymWrapper):
  """
  Wrap CartPole.
  """
  def sample(self):
    return np.array([np.random.uniform(0., 1.)])

  def normalize_actions(self, action):
    return 1 if action[0] >= 0 else 0

  def unnormalize_actions(self, action):
    return 2. * action - 1.

class NoTimeLimitMujocoWrapper(GymWrapper):
  """
  Wrap Mujoco-style environments, removing the termination condition after time.
  This is needed to keep it Markovian.
  """
  def __init__(self, env_name):
    self.internal_env = gym.make(env_name).env
    self.observation_space = self.internal_env.observation_space
    self.action_space = self.internal_env.action_space
    self.custom_init()