"src/vscode:/vscode.git/clone" did not exist on "f6d77130d7b053c155686d20c1f4d2693acd623e"
Unverified Commit c785655e authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Merge pull request #207 from microsoft/master

merge master
parents 9fae194a d6b61e2f
import tensorflow as tf
import logging
from . import default_layers
_logger = logging.getLogger(__name__)
class LayerInfo:
def __init__(self, op):
self.op = op
self.name = op.name
self.type = op.type
class Compressor:
"""Abstract base TensorFlow compressor"""
def __init__(self, config_list):
self._bound_model = None
self._config_list = config_list
def __call__(self, model):
"""Compress given graph with algorithm implemented by subclass.
The graph will be editted and returned.
"""
self.compress(model)
return model
def compress(self, model):
"""Compress given graph with algorithm implemented by subclass.
This will edit the graph.
"""
assert self._bound_model is None, "Each NNI compressor instance can only compress one model"
self._bound_model = model
self.bind_model(model)
for op in model.get_operations():
layer = LayerInfo(op)
config = self._select_config(layer)
if config is not None:
self._instrument_layer(layer, config)
def compress_default_graph(self):
"""Compress the default graph with algorithm implemented by subclass.
This will edit the default graph.
"""
self.compress(tf.get_default_graph())
def bind_model(self, model):
"""This method is called when a model is bound to the compressor.
Compressors can optionally overload this method to do model-specific initialization.
It is guaranteed that only one model will be bound to each compressor instance.
"""
pass
def update_epoch(self, epoch, sess):
"""If user want to update mask every epoch, user can override this method
"""
pass
def step(self, sess):
"""If user want to update mask every step, user can override this method
"""
pass
def _instrument_layer(self, layer, config):
raise NotImplementedError()
def _select_config(self, layer):
ret = None
for config in self._config_list:
op_types = config.get('op_types')
if op_types == 'default':
op_types = default_layers.op_weight_index.keys()
if op_types and layer.type not in op_types:
continue
if config.get('op_names') and layer.name not in config['op_names']:
continue
ret = config
if ret is None or ret.get('exclude'):
return None
return ret
class Pruner(Compressor):
"""Abstract base TensorFlow pruner"""
def __init__(self, config_list):
super().__init__(config_list)
def calc_mask(self, weight, config, op, op_type, op_name):
"""Pruners should overload this method to provide mask for weight tensors.
The mask must have the same shape and type comparing to the weight.
It will be applied with `multiply()` operation.
This method works as a subgraph which will be inserted into the bound model.
"""
raise NotImplementedError("Pruners must overload calc_mask()")
def _instrument_layer(self, layer, config):
# it seems the graph editor can only swap edges of nodes or remove all edges from a node
# it cannot remove one edge from a node, nor can it assign a new edge to a node
# we assume there is a proxy operation between the weight and the Conv2D layer
# this is true as long as the weight is `tf.Value`
# not sure what will happen if the weight is calculated from other operations
weight_index = _detect_weight_index(layer)
if weight_index is None:
_logger.warning('Failed to detect weight for layer {}'.format(layer.name))
return
weight_op = layer.op.inputs[weight_index].op
weight = weight_op.inputs[0]
mask = self.calc_mask(weight, config, op=layer.op, op_type=layer.type, op_name=layer.name)
new_weight = weight * mask
tf.contrib.graph_editor.swap_outputs(weight_op, new_weight.op)
class Quantizer(Compressor):
"""Abstract base TensorFlow quantizer"""
def __init__(self, config_list):
super().__init__(config_list)
def quantize_weight(self, weight, config, op, op_type, op_name):
raise NotImplementedError("Quantizer must overload quantize_weight()")
def _instrument_layer(self, layer, config):
weight_index = _detect_weight_index(layer)
if weight_index is None:
_logger.warning('Failed to detect weight for layer {}'.format(layer.name))
return
weight_op = layer.op.inputs[weight_index].op
weight = weight_op.inputs[0]
new_weight = self.quantize_weight(weight, config, op=layer.op, op_type=layer.type, op_name=layer.name)
tf.contrib.graph_editor.swap_outputs(weight_op, new_weight.op)
def _detect_weight_index(layer):
index = default_layers.op_weight_index.get(layer.type)
if index is not None:
return index
weight_indices = [ i for i, op in enumerate(layer.op.inputs) if op.name.endswith('Variable/read') ]
if len(weight_indices) == 1:
return weight_indices[0]
return None
op_weight_index = {
'Conv2D': None,
'Conv3D': None,
'DepthwiseConv2dNative': None,
'Mul': None,
'MatMul': None,
}
from .compressor import LayerInfo, Compressor, Pruner, Quantizer
from .builtin_pruners import *
from .builtin_quantizers import *
import logging
import torch
from .compressor import Pruner
__all__ = [ 'LevelPruner', 'AGP_Pruner', 'SensitivityPruner' ]
logger = logging.getLogger('torch pruner')
class LevelPruner(Pruner):
"""Prune to an exact pruning level specification
"""
def __init__(self, config_list):
"""
config_list: supported keys:
- sparsity
"""
super().__init__(config_list)
def calc_mask(self, weight, config, **kwargs):
w_abs = weight.abs()
k = int(weight.numel() * config['sparsity'])
if k == 0:
return torch.ones(weight.shape)
threshold = torch.topk(w_abs.view(-1), k, largest = False).values.max()
return torch.gt(w_abs, threshold).type(weight.type())
class AGP_Pruner(Pruner):
"""An automated gradual pruning algorithm that prunes the smallest magnitude
weights to achieve a preset level of network sparsity.
Michael Zhu and Suyog Gupta, "To prune, or not to prune: exploring the
efficacy of pruning for model compression", 2017 NIPS Workshop on Machine
Learning of Phones and other Consumer Devices,
https://arxiv.org/pdf/1710.01878.pdf
"""
def __init__(self, config_list):
"""
config_list: supported keys:
- initial_sparsity
- final_sparsity: you should make sure initial_sparsity <= final_sparsity
- start_epoch: start epoch numer begin update mask
- end_epoch: end epoch number stop update mask, you should make sure start_epoch <= end_epoch
- frequency: if you want update every 2 epoch, you can set it 2
"""
super().__init__(config_list)
self.mask_list = {}
self.now_epoch = 1
def calc_mask(self, weight, config, op_name, **kwargs):
mask = self.mask_list.get(op_name, torch.ones(weight.shape))
target_sparsity = self.compute_target_sparsity(config)
k = int(weight.numel() * target_sparsity)
if k == 0 or target_sparsity >= 1 or target_sparsity <= 0:
return mask
# if we want to generate new mask, we should update weigth first
w_abs = weight.abs()*mask
threshold = torch.topk(w_abs.view(-1), k, largest = False).values.max()
new_mask = torch.gt(w_abs, threshold).type(weight.type())
self.mask_list[op_name] = new_mask
return new_mask
def compute_target_sparsity(self, config):
end_epoch = config.get('end_epoch', 1)
start_epoch = config.get('start_epoch', 1)
freq = config.get('frequency', 1)
final_sparsity = config.get('final_sparsity', 0)
initial_sparsity = config.get('initial_sparsity', 0)
if end_epoch <= start_epoch or initial_sparsity >= final_sparsity:
logger.warning('your end epoch <= start epoch or initial_sparsity >= final_sparsity')
return final_sparsity
if end_epoch <= self.now_epoch:
return final_sparsity
span = ((end_epoch - start_epoch-1)//freq)*freq
assert span > 0
target_sparsity = (final_sparsity +
(initial_sparsity - final_sparsity)*
(1.0 - ((self.now_epoch - start_epoch)/span))**3)
return target_sparsity
def update_epoch(self, epoch):
if epoch > 0:
self.now_epoch = epoch
class SensitivityPruner(Pruner):
"""Use algorithm from "Learning both Weights and Connections for Efficient Neural Networks"
https://arxiv.org/pdf/1506.02626v3.pdf
I.e.: "The pruning threshold is chosen as a quality parameter multiplied
by the standard deviation of a layers weights."
"""
def __init__(self, config_list):
"""
config_list: supported keys:
- sparsity: chosen pruning sparsity
"""
super().__init__(config_list)
self.mask_list = {}
def calc_mask(self, weight, config, op_name, **kwargs):
mask = self.mask_list.get(op_name, torch.ones(weight.shape))
# if we want to generate new mask, we should update weigth first
weight = weight*mask
target_sparsity = config['sparsity'] * torch.std(weight).item()
k = int(weight.numel() * target_sparsity)
if k == 0:
return mask
w_abs = weight.abs()
threshold = torch.topk(w_abs.view(-1), k, largest = False).values.max()
new_mask = torch.gt(w_abs, threshold).type(weight.type())
self.mask_list[op_name] = new_mask
return new_mask
import logging
import torch
from .compressor import Quantizer
__all__ = [ 'NaiveQuantizer', 'QAT_Quantizer', 'DoReFaQuantizer' ]
logger = logging.getLogger(__name__)
class NaiveQuantizer(Quantizer):
"""quantize weight to 8 bits
"""
def __init__(self, config_list):
super().__init__(config_list)
self.layer_scale = {}
def quantize_weight(self, weight, config, op_name, **kwargs):
new_scale = weight.abs().max() / 127
scale = max(self.layer_scale.get(op_name, 0), new_scale)
self.layer_scale[op_name] = scale
orig_type = weight.type() # TODO: user layer
return weight.div(scale).type(torch.int8).type(orig_type).mul(scale)
class QAT_Quantizer(Quantizer):
"""Quantizer using the DoReFa scheme, as defined in:
Quantization and Training of Neural Networks for Efficient Integer-Arithmetic-Only Inference
http://openaccess.thecvf.com/content_cvpr_2018/papers/Jacob_Quantization_and_Training_CVPR_2018_paper.pdf
"""
def __init__(self, config_list):
"""
config_list: supported keys:
- q_bits
"""
super().__init__(config_list)
def quantize_weight(self, weight, config, **kwargs):
if config['q_bits'] <= 1:
return weight
a = torch.min(weight)
b = torch.max(weight)
n = pow(2, config['q_bits'])
scale = (b-a)/(n-1)
zero_point = a
out = torch.round((weight - zero_point)/scale)
out = out*scale + zero_point
orig_type = weight.dtype
return out.type(orig_type)
class DoReFaQuantizer(Quantizer):
"""Quantizer using the DoReFa scheme, as defined in:
Zhou et al., DoReFa-Net: Training Low Bitwidth Convolutional Neural Networks with Low Bitwidth Gradients
(https://arxiv.org/abs/1606.06160)
"""
def __init__(self, config_list):
"""
config_list: supported keys:
- q_bits
"""
super().__init__(config_list)
def quantize_weight(self, weight, config, **kwargs):
out = weight.tanh()
out = out /( 2 * out.abs().max()) + 0.5
out = self.quantize(out, config['q_bits'])
out = 2 * out -1
return out
def quantize(self, input_ri, q_bits):
scale = pow(2, q_bits)-1
output = torch.round(input_ri*scale)/scale
return output
import torch
import logging
from . import default_layers
_logger = logging.getLogger(__name__)
class LayerInfo:
def __init__(self, name, module):
self.module = module
self.name = name
self.type = type(module).__name__
self._forward = None
class Compressor:
"""Abstract base PyTorch compressor"""
def __init__(self, config_list):
self._bound_model = None
self._config_list = config_list
def __call__(self, model):
self.compress(model)
return model
def compress(self, model):
"""Compress the model with algorithm implemented by subclass.
The model will be instrumented and user should never edit it after calling this method.
"""
assert self._bound_model is None, "Each NNI compressor instance can only compress one model"
self._bound_model = model
self.bind_model(model)
for name, module in model.named_modules():
layer = LayerInfo(name, module)
config = self._select_config(layer)
if config is not None:
self._instrument_layer(layer, config)
def bind_model(self, model):
"""This method is called when a model is bound to the compressor.
Users can optionally overload this method to do model-specific initialization.
It is guaranteed that only one model will be bound to each compressor instance.
"""
pass
def update_epoch(self, epoch):
"""if user want to update model every epoch, user can override this method
"""
pass
def step(self):
"""if user want to update model every step, user can override this method
"""
pass
def _instrument_layer(self, layer, config):
raise NotImplementedError()
def _select_config(self, layer):
ret = None
for config in self._config_list:
op_types = config.get('op_types')
if op_types == 'default':
op_types = default_layers.weighted_modules
if op_types and layer.type not in op_types:
continue
if config.get('op_names') and layer.name not in config['op_names']:
continue
ret = config
if ret is None or ret.get('exclude'):
return None
return ret
class Pruner(Compressor):
"""Abstract base PyTorch pruner"""
def __init__(self, config_list):
super().__init__(config_list)
def calc_mask(self, weight, config, op, op_type, op_name):
"""Pruners should overload this method to provide mask for weight tensors.
The mask must have the same shape and type comparing to the weight.
It will be applied with `mul()` operation.
This method is effectively hooked to `forward()` method of the model.
"""
raise NotImplementedError("Pruners must overload calc_mask()")
def _instrument_layer(self, layer, config):
# TODO: support multiple weight tensors
# create a wrapper forward function to replace the original one
assert layer._forward is None, 'Each model can only be compressed once'
if not _check_weight(layer.module):
_logger.warning('Module {} does not have parameter "weight"'.format(layer.name))
return
layer._forward = layer.module.forward
def new_forward(*input):
# apply mask to weight
old_weight = layer.module.weight.data
mask = self.calc_mask(old_weight, config, op=layer.module, op_type=layer.type, op_name=layer.name)
layer.module.weight.data = old_weight.mul(mask)
# calculate forward
ret = layer._forward(*input)
# recover original weight
layer.module.weight.data = old_weight
return ret
layer.module.forward = new_forward
class Quantizer(Compressor):
"""Base quantizer for pytorch quantizer"""
def __init__(self, config_list):
super().__init__(config_list)
def __call__(self, model):
self.compress(model)
return model
def quantize_weight(self, weight, config, op, op_type, op_name):
"""user should know where dequantize goes and implement it in quantize method
we now do not provide dequantize method
"""
raise NotImplementedError("Quantizer must overload quantize_weight()")
def _instrument_layer(self, layer, config):
assert layer._forward is None, 'Each model can only be compressed once'
if not _check_weight(layer.module):
_logger.warning('Module {} does not have parameter "weight"'.format(layer.name))
return
layer._forward = layer.module.forward
def new_forward(*input):
weight = layer.module.weight.data
new_weight = self.quantize_weight(weight, config, op=layer.module, op_type=layer.type, op_name=layer.name)
layer.module.weight.data = new_weight
return layer._forward(*input)
layer.module.forward = new_forward
def _check_weight(module):
try:
return isinstance(module.weight, torch.nn.Parameter) and isinstance(module.weight.data, torch.Tensor)
except AttributeError:
return False
weighted_modules = [
'Conv1d', 'Conv2d', 'Conv3d', 'ConvTranspose1d', 'ConvTranspose2d', 'ConvTranspose3d',
'Linear', 'Bilinear',
'PReLU',
'Embedding', 'EmbeddingBag',
]
......@@ -30,7 +30,8 @@ ModuleName = {
'NetworkMorphism': 'nni.networkmorphism_tuner.networkmorphism_tuner',
'Curvefitting': 'nni.curvefitting_assessor.curvefitting_assessor',
'MetisTuner': 'nni.metis_tuner.metis_tuner',
'GPTuner': 'nni.gp_tuner.gp_tuner'
'GPTuner': 'nni.gp_tuner.gp_tuner',
'PPOTuner': 'nni.ppo_tuner.ppo_tuner'
}
ClassName = {
......@@ -44,6 +45,7 @@ ClassName = {
'NetworkMorphism':'NetworkMorphismTuner',
'MetisTuner':'MetisTuner',
'GPTuner':'GPTuner',
'PPOTuner': 'PPOTuner',
'Medianstop': 'MedianstopAssessor',
'Curvefitting': 'CurvefittingAssessor'
......
......@@ -83,7 +83,7 @@ class GPTuner(Tuner):
"""
self._space = TargetSpace(search_space, self._random_state)
def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
"""Generate next parameter for trial
If the number of trial result is lower than cold start number,
gp will first randomly generate some parameters.
......@@ -123,7 +123,7 @@ class GPTuner(Tuner):
logger.info("Generate paramageters:\n %s", results)
return results
def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
"""Tuner receive result from trial.
Parameters
......
......@@ -55,6 +55,14 @@ class TargetSpace():
[item[1] for item in sorted(pbounds.items(), key=lambda x: x[0])]
)
# check values type
for _bound in self._bounds:
if _bound['_type'] == 'choice':
try:
[float(val) for val in _bound['_value']]
except ValueError:
raise ValueError("GP Tuner supports only numerical values")
# preallocated memory for X and Y points
self._params = np.empty(shape=(0, self.dim))
self._target = np.empty(shape=(0))
......
......@@ -21,16 +21,16 @@
hyperband_advisor.py
"""
import sys
import math
import copy
import logging
import numpy as np
import json_tricks
import math
import sys
from nni.protocol import CommandType, send
import json_tricks
import numpy as np
from nni.common import multi_phase_enabled
from nni.msg_dispatcher_base import MsgDispatcherBase
from nni.common import init_logger, multi_phase_enabled
from nni.protocol import CommandType, send
from nni.utils import NodeType, OptimizeMode, MetricType, extract_scalar_reward
import nni.parameter_expressions as parameter_expressions
......@@ -53,6 +53,7 @@ def create_parameter_id():
_next_parameter_id += 1
return _next_parameter_id - 1
def create_bracket_parameter_id(brackets_id, brackets_curr_decay, increased_id=-1):
"""Create a full id for a specific bracket's hyperparameter configuration
......@@ -77,6 +78,7 @@ def create_bracket_parameter_id(brackets_id, brackets_curr_decay, increased_id=-
increased_id])
return params_id
def json2parameter(ss_spec, random_state):
"""Randomly generate values for hyperparameters from hyperparameter space i.e., x.
......@@ -100,7 +102,7 @@ def json2parameter(ss_spec, random_state):
_index = random_state.randint(len(_value))
chosen_params = json2parameter(ss_spec[NodeType.VALUE][_index], random_state)
else:
chosen_params = eval('parameter_expressions.' + # pylint: disable=eval-used
chosen_params = eval('parameter_expressions.' + # pylint: disable=eval-used
_type)(*(_value + [random_state]))
else:
chosen_params = dict()
......@@ -114,6 +116,7 @@ def json2parameter(ss_spec, random_state):
chosen_params = copy.deepcopy(ss_spec)
return chosen_params
class Bracket():
"""A bracket in Hyperband, all the information of a bracket is managed by an instance of this class
......@@ -137,12 +140,12 @@ class Bracket():
self.bracket_id = s
self.s_max = s_max
self.eta = eta
self.n = math.ceil((s_max + 1) * (eta**s) / (s + 1) - _epsilon) # pylint: disable=invalid-name
self.r = R / eta**s # pylint: disable=invalid-name
self.n = math.ceil((s_max + 1) * (eta ** s) / (s + 1) - _epsilon) # pylint: disable=invalid-name
self.r = R / eta ** s # pylint: disable=invalid-name
self.i = 0
self.hyper_configs = [] # [ {id: params}, {}, ... ]
self.configs_perf = [] # [ {id: [seq, acc]}, {}, ... ]
self.num_configs_to_run = [] # [ n, n, n, ... ]
self.hyper_configs = [] # [ {id: params}, {}, ... ]
self.configs_perf = [] # [ {id: [seq, acc]}, {}, ... ]
self.num_configs_to_run = [] # [ n, n, n, ... ]
self.num_finished_configs = [] # [ n, n, n, ... ]
self.optimize_mode = OptimizeMode(optimize_mode)
self.no_more_trial = False
......@@ -153,7 +156,7 @@ class Bracket():
def get_n_r(self):
"""return the values of n and r for the next round"""
return math.floor(self.n / self.eta**self.i + _epsilon), math.floor(self.r * self.eta**self.i + _epsilon)
return math.floor(self.n / self.eta ** self.i + _epsilon), math.floor(self.r * self.eta ** self.i + _epsilon)
def increase_i(self):
"""i means the ith round. Increase i by 1"""
......@@ -185,7 +188,6 @@ class Bracket():
else:
self.configs_perf[i][parameter_id] = [seq, value]
def inform_trial_end(self, i):
"""If the trial is finished and the corresponding round (i.e., i) has all its trials finished,
it will choose the top k trials for the next round (i.e., i+1)
......@@ -195,16 +197,17 @@ class Bracket():
i: int
the ith round
"""
global _KEY # pylint: disable=global-statement
global _KEY # pylint: disable=global-statement
self.num_finished_configs[i] += 1
_logger.debug('bracket id: %d, round: %d %d, finished: %d, all: %d', self.bracket_id, self.i, i, self.num_finished_configs[i], self.num_configs_to_run[i])
_logger.debug('bracket id: %d, round: %d %d, finished: %d, all: %d', self.bracket_id, self.i, i,
self.num_finished_configs[i], self.num_configs_to_run[i])
if self.num_finished_configs[i] >= self.num_configs_to_run[i] \
and self.no_more_trial is False:
and self.no_more_trial is False:
# choose candidate configs from finished configs to run in the next round
assert self.i == i + 1
this_round_perf = self.configs_perf[i]
if self.optimize_mode is OptimizeMode.Maximize:
sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1], reverse=True) # reverse
sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1], reverse=True) # reverse
else:
sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1])
_logger.debug('bracket %s next round %s, sorted hyper configs: %s', self.bracket_id, self.i, sorted_perf)
......@@ -214,7 +217,7 @@ class Bracket():
for k in range(next_n):
params_id = sorted_perf[k][0]
params = self.hyper_configs[i][params_id]
params[_KEY] = next_r # modify r
params[_KEY] = next_r # modify r
# generate new id
increased_id = params_id.split('_')[-1]
new_id = create_bracket_parameter_id(self.bracket_id, self.i, increased_id)
......@@ -223,7 +226,7 @@ class Bracket():
return [[key, value] for key, value in hyper_configs.items()]
return None
def get_hyperparameter_configurations(self, num, r, searchspace_json, random_state): # pylint: disable=invalid-name
def get_hyperparameter_configurations(self, num, r, searchspace_json, random_state): # pylint: disable=invalid-name
"""Randomly generate num hyperparameter configurations from search space
Parameters
......@@ -236,7 +239,7 @@ class Bracket():
list
a list of hyperparameter configurations. Format: [[key1, value1], [key2, value2], ...]
"""
global _KEY # pylint: disable=global-statement
global _KEY # pylint: disable=global-statement
assert self.i == 0
hyperparameter_configs = dict()
for _ in range(num):
......@@ -263,6 +266,7 @@ class Bracket():
self.num_configs_to_run.append(len(hyper_configs))
self.increase_i()
class Hyperband(MsgDispatcherBase):
"""Hyperband inherit from MsgDispatcherBase rather than Tuner, because it integrates both tuner's functions and assessor's functions.
This is an implementation that could fully leverage available resources, i.e., high parallelism.
......@@ -277,14 +281,15 @@ class Hyperband(MsgDispatcherBase):
optimize_mode: str
optimize mode, 'maximize' or 'minimize'
"""
def __init__(self, R=60, eta=3, optimize_mode='maximize'):
"""B = (s_max + 1)R"""
super(Hyperband, self).__init__()
self.R = R # pylint: disable=invalid-name
self.R = R # pylint: disable=invalid-name
self.eta = eta
self.brackets = dict() # dict of Bracket
self.generated_hyper_configs = [] # all the configs waiting for run
self.completed_hyper_configs = [] # all the completed configs
self.brackets = dict() # dict of Bracket
self.generated_hyper_configs = [] # all the configs waiting for run
self.completed_hyper_configs = [] # all the completed configs
self.s_max = math.floor(math.log(self.R, self.eta) + _epsilon)
self.curr_s = self.s_max
......@@ -302,12 +307,11 @@ class Hyperband(MsgDispatcherBase):
self.job_id_para_id_map = dict()
def handle_initialize(self, data):
"""data is search space
"""callback for initializing the advisor
Parameters
----------
data: int
number of trial jobs
data: dict
search space
"""
self.handle_update_search_space(data)
send(CommandType.Initialized, '')
......@@ -348,14 +352,8 @@ class Hyperband(MsgDispatcherBase):
}
return ret
def handle_update_search_space(self, data):
"""data: JSON object, which is search space
Parameters
----------
data: int
number of trial jobs
"""
self.searchspace_json = data
self.random_state = np.random.RandomState()
......
......@@ -27,6 +27,7 @@ import logging
import hyperopt as hp
import numpy as np
from nni.tuner import Tuner
from nni.nas_utils import rewrite_nas_space
from nni.utils import NodeType, OptimizeMode, extract_scalar_reward, split_index
logger = logging.getLogger('hyperopt_AutoML')
......@@ -240,6 +241,7 @@ class HyperoptTuner(Tuner):
return hp.anneal.suggest
raise RuntimeError('Not support tuner algorithm in hyperopt.')
@rewrite_nas_space
def update_search_space(self, search_space):
"""
Update search space definition in tuner by search_space in parameters.
......
......@@ -79,7 +79,7 @@ class MedianstopAssessor(Assessor):
self.completed_avg_history[trial_job_id].append(history_sum / cnt)
self.running_history.pop(trial_job_id)
else:
logger.warning('trial_end: trial_job_id does not in running_history')
logger.warning('trial_end: trial_job_id does not exist in running_history')
def assess_trial(self, trial_job_id, trial_history):
"""assess_trial
......@@ -112,7 +112,7 @@ class MedianstopAssessor(Assessor):
logger.exception(error)
except Exception as error:
logger.warning('unrecognized exception in medianstop_assessor:')
logger.excpetion(error)
logger.exception(error)
self._update_data(trial_job_id, num_trial_history)
if self.high_better:
......
......@@ -22,6 +22,7 @@ import logging
from collections import defaultdict
import json_tricks
from nni import NoMoreTrialError
from .protocol import CommandType, send
from .msg_dispatcher_base import MsgDispatcherBase
from .assessor import AssessResult
......@@ -41,8 +42,9 @@ We need this because NNI manager may send metrics after reporting a trial ended.
TODO: move this logic to NNI manager
'''
def _sort_history(history):
ret = [ ]
ret = []
for i, _ in enumerate(history):
if i in history:
ret.append(history[i])
......@@ -50,17 +52,20 @@ def _sort_history(history):
break
return ret
# Tuner global variables
_next_parameter_id = 0
_trial_params = {}
'''key: trial job ID; value: parameters'''
_customized_parameter_ids = set()
def _create_parameter_id():
global _next_parameter_id # pylint: disable=global-statement
_next_parameter_id += 1
return _next_parameter_id - 1
def _pack_parameter(parameter_id, params, customized=False, trial_job_id=None, parameter_index=None):
_trial_params[parameter_id] = params
ret = {
......@@ -76,6 +81,7 @@ def _pack_parameter(parameter_id, params, customized=False, trial_job_id=None, p
ret['parameter_index'] = 0
return json_tricks.dumps(ret)
class MsgDispatcher(MsgDispatcherBase):
def __init__(self, tuner, assessor=None):
super(MsgDispatcher, self).__init__()
......@@ -100,11 +106,16 @@ class MsgDispatcher(MsgDispatcherBase):
self.tuner.update_search_space(data)
send(CommandType.Initialized, '')
def send_trial_callback(self, id, params):
"""For tuner to issue trial config when the config is generated
"""
send(CommandType.NewTrialJob, _pack_parameter(id, params))
def handle_request_trial_jobs(self, data):
# data: number or trial jobs
ids = [_create_parameter_id() for _ in range(data)]
_logger.debug("requesting for generating params of {}".format(ids))
params_list = self.tuner.generate_multiple_parameters(ids)
params_list = self.tuner.generate_multiple_parameters(ids, st_callback=self.send_trial_callback)
for i, _ in enumerate(params_list):
send(CommandType.NewTrialJob, _pack_parameter(ids[i], params_list[i]))
......@@ -117,7 +128,7 @@ class MsgDispatcher(MsgDispatcherBase):
def handle_import_data(self, data):
"""Import additional data for tuning
data: a list of dictionarys, each of which has at least two keys, 'parameter' and 'value'
data: a list of dictionaries, each of which has at least two keys, 'parameter' and 'value'
"""
self.tuner.import_data(data)
......@@ -144,8 +155,12 @@ class MsgDispatcher(MsgDispatcherBase):
assert data['trial_job_id'] is not None
assert data['parameter_index'] is not None
param_id = _create_parameter_id()
param = self.tuner.generate_parameters(param_id, trial_job_id=data['trial_job_id'])
send(CommandType.SendTrialJobParameter, _pack_parameter(param_id, param, trial_job_id=data['trial_job_id'], parameter_index=data['parameter_index']))
try:
param = self.tuner.generate_parameters(param_id, trial_job_id=data['trial_job_id'])
except NoMoreTrialError:
param = None
send(CommandType.SendTrialJobParameter, _pack_parameter(param_id, param, trial_job_id=data['trial_job_id'],
parameter_index=data['parameter_index']))
else:
raise ValueError('Data type not supported: {}'.format(data['type']))
......@@ -179,7 +194,8 @@ class MsgDispatcher(MsgDispatcherBase):
customized = True
else:
customized = False
self.tuner.receive_trial_result(id_, _trial_params[id_], value, customized=customized, trial_job_id=data.get('trial_job_id'))
self.tuner.receive_trial_result(id_, _trial_params[id_], value, customized=customized,
trial_job_id=data.get('trial_job_id'))
def _handle_intermediate_metric_data(self, data):
"""Call assessor to process intermediate results
......@@ -214,7 +230,8 @@ class MsgDispatcher(MsgDispatcherBase):
_logger.debug('BAD, kill %s', trial_job_id)
send(CommandType.KillTrialJob, json_tricks.dumps(trial_job_id))
# notify tuner
_logger.debug('env var: NNI_INCLUDE_INTERMEDIATE_RESULTS: [%s]', dispatcher_env_vars.NNI_INCLUDE_INTERMEDIATE_RESULTS)
_logger.debug('env var: NNI_INCLUDE_INTERMEDIATE_RESULTS: [%s]',
dispatcher_env_vars.NNI_INCLUDE_INTERMEDIATE_RESULTS)
if dispatcher_env_vars.NNI_INCLUDE_INTERMEDIATE_RESULTS == 'true':
self._earlystop_notify_tuner(data)
else:
......
......@@ -18,7 +18,6 @@
# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ==================================================================================================
#import json_tricks
import os
import threading
import logging
......@@ -39,7 +38,12 @@ _logger = logging.getLogger(__name__)
QUEUE_LEN_WARNING_MARK = 20
_worker_fast_exit_on_terminate = True
class MsgDispatcherBase(Recoverable):
"""This is where tuners and assessors are not defined yet.
Inherits this class to make your own advisor.
"""
def __init__(self):
if multi_thread_enabled():
self.pool = ThreadPool()
......@@ -49,7 +53,8 @@ class MsgDispatcherBase(Recoverable):
self.default_command_queue = Queue()
self.assessor_command_queue = Queue()
self.default_worker = threading.Thread(target=self.command_queue_worker, args=(self.default_command_queue,))
self.assessor_worker = threading.Thread(target=self.command_queue_worker, args=(self.assessor_command_queue,))
self.assessor_worker = threading.Thread(target=self.command_queue_worker,
args=(self.assessor_command_queue,))
self.default_worker.start()
self.assessor_worker.start()
self.worker_exceptions = []
......@@ -72,7 +77,8 @@ class MsgDispatcherBase(Recoverable):
if multi_thread_enabled():
result = self.pool.map_async(self.process_command_thread, [(command, data)])
self.thread_results.append(result)
if any([thread_result.ready() and not thread_result.successful() for thread_result in self.thread_results]):
if any([thread_result.ready() and not thread_result.successful() for thread_result in
self.thread_results]):
_logger.debug('Caught thread exception')
break
else:
......@@ -112,7 +118,8 @@ class MsgDispatcherBase(Recoverable):
def enqueue_command(self, command, data):
"""Enqueue command into command queues
"""
if command == CommandType.TrialEnd or (command == CommandType.ReportMetricData and data['type'] == 'PERIODICAL'):
if command == CommandType.TrialEnd or (
command == CommandType.ReportMetricData and data['type'] == 'PERIODICAL'):
self.assessor_command_queue.put((command, data))
else:
self.default_command_queue.put((command, data))
......@@ -142,14 +149,14 @@ class MsgDispatcherBase(Recoverable):
_logger.debug('process_command: command: [{}], data: [{}]'.format(command, data))
command_handlers = {
# Tunner commands:
# Tuner commands:
CommandType.Initialize: self.handle_initialize,
CommandType.RequestTrialJobs: self.handle_request_trial_jobs,
CommandType.UpdateSearchSpace: self.handle_update_search_space,
CommandType.ImportData: self.handle_import_data,
CommandType.AddCustomizedTrialJob: self.handle_add_customized_trial,
# Tunner/Assessor commands:
# Tuner/Assessor commands:
CommandType.ReportMetricData: self.handle_report_metric_data,
CommandType.TrialEnd: self.handle_trial_end,
......@@ -163,22 +170,88 @@ class MsgDispatcherBase(Recoverable):
pass
def handle_initialize(self, data):
"""Initialize search space and tuner, if any
This method is meant to be called only once for each experiment, after calling this method,
dispatcher should `send(CommandType.Initialized, '')`, to set the status of the experiment to be "INITIALIZED".
Parameters
----------
data: dict
search space
"""
raise NotImplementedError('handle_initialize not implemented')
def handle_request_trial_jobs(self, data):
"""The message dispatcher is demanded to generate `data` trial jobs.
These trial jobs should be sent via `send(CommandType.NewTrialJob, json_tricks.dumps(parameter))`,
where `parameter` will be received by NNI Manager and eventually accessible to trial jobs as "next parameter".
Semantically, message dispatcher should do this `send` exactly `data` times.
The JSON sent by this method should follow the format of
{
"parameter_id": 42
"parameters": {
// this will be received by trial
},
"parameter_source": "algorithm" // optional
}
Parameters
----------
data: int
number of trial jobs
"""
raise NotImplementedError('handle_request_trial_jobs not implemented')
def handle_update_search_space(self, data):
raise NotImplementedError('handle_update_search_space not implemented')
"""This method will be called when search space is updated.
It's recommended to call this method in `handle_initialize` to initialize search space.
*No need to* notify NNI Manager when this update is done.
Parameters
----------
data: dict
search space
"""
raise NotImplementedError('handle_update_search_space not implemented')
def handle_import_data(self, data):
"""Import previous data when experiment is resumed.
Parameters
----------
data: list
a list of dictionaries, each of which has at least two keys, 'parameter' and 'value'
"""
raise NotImplementedError('handle_import_data not implemented')
def handle_add_customized_trial(self, data):
"""Experimental API. Not recommended for usage.
"""
raise NotImplementedError('handle_add_customized_trial not implemented')
def handle_report_metric_data(self, data):
"""Called when metric data is reported or new parameters are requested (for multiphase).
When new parameters are requested, this method should send a new parameter.
Parameters
----------
data: dict
a dict which contains 'parameter_id', 'value', 'trial_job_id', 'type', 'sequence'.
type: can be `MetricType.REQUEST_PARAMETER`, `MetricType.FINAL` or `MetricType.PERIODICAL`.
`REQUEST_PARAMETER` is used to request new parameters for multiphase trial job. In this case,
the dict will contain additional keys: `trial_job_id`, `parameter_index`. Refer to `msg_dispatcher.py`
as an example.
Raises
------
ValueError
Data type is not supported
"""
raise NotImplementedError('handle_report_metric_data not implemented')
def handle_trial_end(self, data):
"""Called when the state of one of the trials is changed
Parameters
----------
data: dict
a dict with keys: trial_job_id, event, hyper_params.
trial_job_id: the id generated by training service.
event: the job’s state.
hyper_params: the string that is sent by message dispatcher during the creation of trials.
"""
raise NotImplementedError('handle_trial_end not implemented')
......@@ -17,10 +17,16 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT
# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ==================================================================================================
import functools
import logging
from . import trial
_logger = logging.getLogger(__name__)
_MUTABLE_LAYER_SPACE_PREFIX = "_mutable_layer"
def classic_mode(
mutable_id,
mutable_layer_id,
......@@ -34,13 +40,11 @@ def classic_mode(
without touching the full model graph.'''
if trial.get_current_parameter() is None:
trial.get_next_parameter()
mutable_block = trial.get_current_parameter(mutable_id)
chosen_layer = mutable_block[mutable_layer_id]["chosen_layer"]
chosen_inputs = mutable_block[mutable_layer_id]["chosen_inputs"]
real_chosen_inputs = [optional_inputs[input_name]
for input_name in chosen_inputs]
layer_out = funcs[chosen_layer](
[fixed_inputs, real_chosen_inputs], **funcs_args[chosen_layer])
chosen_layer, chosen_inputs = _get_layer_and_inputs_from_tuner(mutable_id, mutable_layer_id,
list(optional_inputs.keys()))
real_chosen_inputs = [optional_inputs[input_name] for input_name in chosen_inputs]
layer_out = funcs[chosen_layer]([fixed_inputs, real_chosen_inputs], **funcs_args[chosen_layer])
return layer_out
......@@ -173,20 +177,44 @@ def reload_tensorflow_variables(tf, session):
tf: tensorflow module
'''
subgraph_from_tuner = trial.get_next_parameter()
for mutable_id, mutable_block in subgraph_from_tuner.items():
mutable_layers = set()
for subgraph_key in subgraph_from_tuner:
if "/" in subgraph_key:
# has to remove the last, could be layer_choice or whatever
mutable_id, mutable_layer_id = _decompose_general_key(subgraph_key[:subgraph_key.rfind("/")])
if mutable_id is not None:
mutable_layers.add((mutable_id, mutable_layer_id))
mutable_layers = sorted(list(mutable_layers))
for mutable_id, mutable_layer_id in mutable_layers:
if mutable_id not in name_space:
_logger.warning("{} not found in name space".format(mutable_id))
continue
for mutable_layer_id, mutable_layer in mutable_block.items():
name_prefix = "{}_{}".format(mutable_id, mutable_layer_id)
# extract layer information from the subgraph sampled by tuner
chosen_layer = name_space[name_prefix]['funcs'].index(
mutable_layer["chosen_layer"])
chosen_inputs = [1 if inp in mutable_layer["chosen_inputs"]
else 0 for inp in name_space[name_prefix]['optional_inputs']]
# load these information into pre-defined tensorflow variables
tf_variables[name_prefix]['funcs'].load(chosen_layer, session)
tf_variables[name_prefix]['optional_inputs'].load(
chosen_inputs, session)
name_prefix = "{}_{}".format(mutable_id, mutable_layer_id)
# get optional inputs names
optional_inputs = name_space[name_prefix]['optional_inputs']
# extract layer information from the subgraph sampled by tuner
chosen_layer, chosen_inputs = _get_layer_and_inputs_from_tuner(mutable_id, mutable_layer_id, optional_inputs)
chosen_layer = name_space[name_prefix]['funcs'].index(chosen_layer)
chosen_inputs = [1 if inp in chosen_inputs else 0 for inp in optional_inputs]
# load these information into pre-defined tensorflow variables
tf_variables[name_prefix]['funcs'].load(chosen_layer, session)
tf_variables[name_prefix]['optional_inputs'].load(
chosen_inputs, session)
def _construct_general_key(mutable_id, mutable_layer_id):
# Mutable layer key in a general (search space) format
# that is, prefix/mutable_id/mutable_layer_id
return _MUTABLE_LAYER_SPACE_PREFIX + "/" + mutable_id + "/" + mutable_layer_id
def _decompose_general_key(key):
# inverse operation of above
if not key.startswith(_MUTABLE_LAYER_SPACE_PREFIX):
return None, None
else:
_, mutable_id, mutable_layer_id = key.split("/", maxsplit=2)
return mutable_id, mutable_layer_id
def darts_training(tf, session, loss, feed_dict):
......@@ -205,4 +233,107 @@ def training_update(nas_mode, tf=None, session=None, loss=None, feed_dict=None):
if nas_mode == 'darts_mode':
darts_training(tf, session, loss, feed_dict)
elif nas_mode == 'enas_mode':
reload_tensorflow_variables(tf, session)
\ No newline at end of file
reload_tensorflow_variables(tf, session)
def _get_layer_and_inputs_from_tuner(mutable_id, mutable_layer_id, optional_inputs):
# optional_inputs should be name(key)s of the optional inputs
try:
mutable_block = trial.get_current_parameter(mutable_id)
# There is a NAS tuner
chosen_layer = mutable_block[mutable_layer_id]["chosen_layer"]
chosen_inputs = mutable_block[mutable_layer_id]["chosen_inputs"]
except KeyError:
# Try to find converted NAS parameters
params = trial.get_current_parameter()
expected_prefix = _construct_general_key(mutable_id, mutable_layer_id)
chosen_layer = params[expected_prefix + "/layer_choice"]
# find how many to choose
optional_input_size = int(params[expected_prefix + "/optional_input_size"]) # convert uniform to randint
# find who to choose, can duplicate
optional_input_state = params[expected_prefix + "/optional_input_chosen_state"]
chosen_inputs = []
# make sure dict -> list produce stable result by sorting
optional_inputs_keys = sorted(optional_inputs)
for i in range(optional_input_size):
chosen_inputs.append(optional_inputs_keys[optional_input_state % len(optional_inputs)])
optional_input_state //= len(optional_inputs)
_logger.info("%s_%s: layer: %s, optional inputs: %s" % (mutable_id, mutable_layer_id,
chosen_layer, chosen_inputs))
return chosen_layer, chosen_inputs
def convert_nas_search_space(search_space):
"""
Args:
param search_space: raw search space
return: the new search space, mutable_layers will be converted into choice
"""
ret = dict()
for k, v in search_space.items():
if "_type" not in v:
# this should not happen
_logger.warning("There is no _type in one of your search space values with key '%s'"
". Please check your search space" % k)
ret[k] = v
elif v["_type"] != "mutable_layer":
ret[k] = v
else:
_logger.info("Converting mutable_layer search space with key '%s'" % k)
# v["_value"] looks like {'mutable_layer_1': {'layer_choice': ...} ...}
values = v["_value"]
for layer_name, layer_data in values.items():
# there should be at most layer_choice, optional_inputs, optional_input_size in layer_data
# add "_mutable_layer" as prefix so that they can be recovered later
layer_key = _construct_general_key(k, layer_name)
if layer_data.get("layer_choice"): # filter out empty choice and no choice
layer_choice = layer_data["layer_choice"]
else:
raise ValueError("No layer choice found in %s" % layer_key)
if layer_data.get("optional_input_size"):
input_size = layer_data["optional_input_size"]
if isinstance(input_size, int):
input_size = [input_size, input_size]
if input_size[0] > input_size[1] or input_size[0] < 0:
_logger.error("Might not be able to handle optional_input_size < 0, please double check")
input_size[1] += 1
else:
_logger.info("Optional input choices are set to empty by default in %s" % layer_key)
input_size = [0, 1]
if layer_data.get("optional_inputs"):
total_state_size = len(layer_data["optional_inputs"]) ** (input_size[1] - 1)
else:
_logger.info("Optional inputs not found in %s" % layer_key)
total_state_size = 1
converted = {
layer_key + "/layer_choice": {
"_type": "choice", "_value": layer_choice
},
layer_key + "/optional_input_size": {
"_type": "randint", "_value": input_size
},
layer_key + "/optional_input_chosen_state": {
"_type": "randint", "_value": [0, total_state_size]
}
}
_logger.info(converted)
ret.update(converted)
return ret
def rewrite_nas_space(func):
@functools.wraps(func)
def wrap(self, search_space):
search_space = convert_nas_search_space(search_space)
return func(self, search_space)
return wrap
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
functions for sampling from hidden state
"""
import tensorflow as tf
from .util import fc
class Pd:
"""
A particular probability distribution
"""
def flatparam(self):
raise NotImplementedError
def mode(self):
raise NotImplementedError
def neglogp(self, x):
# Usually it's easier to define the negative logprob
raise NotImplementedError
def kl(self, other):
raise NotImplementedError
def entropy(self):
raise NotImplementedError
def sample(self):
raise NotImplementedError
def logp(self, x):
return - self.neglogp(x)
def get_shape(self):
return self.flatparam().shape
@property
def shape(self):
return self.get_shape()
def __getitem__(self, idx):
return self.__class__(self.flatparam()[idx])
class PdType:
"""
Parametrized family of probability distributions
"""
def pdclass(self):
raise NotImplementedError
def pdfromflat(self, flat, mask, nsteps, size, is_act_model):
return self.pdclass()(flat, mask, nsteps, size, is_act_model)
def pdfromlatent(self, latent_vector, init_scale, init_bias):
raise NotImplementedError
def param_shape(self):
raise NotImplementedError
def sample_shape(self):
raise NotImplementedError
def sample_dtype(self):
raise NotImplementedError
def param_placeholder(self, prepend_shape, name=None):
return tf.placeholder(dtype=tf.float32, shape=prepend_shape+self.param_shape(), name=name)
def sample_placeholder(self, prepend_shape, name=None):
return tf.placeholder(dtype=self.sample_dtype(), shape=prepend_shape+self.sample_shape(), name=name)
class CategoricalPd(Pd):
"""
categorical prossibility distribution
"""
def __init__(self, logits, mask_npinf, nsteps, size, is_act_model):
self.logits = logits
self.mask_npinf = mask_npinf
self.nsteps = nsteps
self.size = size
self.is_act_model = is_act_model
def flatparam(self):
return self.logits
def mode(self):
return tf.argmax(self.logits, axis=-1)
@property
def mean(self):
return tf.nn.softmax(self.logits)
def neglogp(self, x):
"""
return tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logits, labels=x)
Note: we can't use sparse_softmax_cross_entropy_with_logits because
the implementation does not allow second-order derivatives...
"""
if x.dtype in {tf.uint8, tf.int32, tf.int64}:
# one-hot encoding
x_shape_list = x.shape.as_list()
logits_shape_list = self.logits.get_shape().as_list()[:-1]
for xs, ls in zip(x_shape_list, logits_shape_list):
if xs is not None and ls is not None:
assert xs == ls, 'shape mismatch: {} in x vs {} in logits'.format(xs, ls)
x = tf.one_hot(x, self.logits.get_shape().as_list()[-1])
else:
# already encoded
assert x.shape.as_list() == self.logits.shape.as_list()
return tf.nn.softmax_cross_entropy_with_logits_v2(
logits=self.logits,
labels=x)
def kl(self, other):
"""kl"""
a0 = self.logits - tf.reduce_max(self.logits, axis=-1, keepdims=True)
a1 = other.logits - tf.reduce_max(other.logits, axis=-1, keepdims=True)
ea0 = tf.exp(a0)
ea1 = tf.exp(a1)
z0 = tf.reduce_sum(ea0, axis=-1, keepdims=True)
z1 = tf.reduce_sum(ea1, axis=-1, keepdims=True)
p0 = ea0 / z0
return tf.reduce_sum(p0 * (a0 - tf.log(z0) - a1 + tf.log(z1)), axis=-1)
def entropy(self):
"""compute entropy"""
a0 = self.logits - tf.reduce_max(self.logits, axis=-1, keepdims=True)
ea0 = tf.exp(a0)
z0 = tf.reduce_sum(ea0, axis=-1, keepdims=True)
p0 = ea0 / z0
return tf.reduce_sum(p0 * (tf.log(z0) - a0), axis=-1)
def sample(self):
"""sample from logits"""
if not self.is_act_model:
re_res = tf.reshape(self.logits, [-1, self.nsteps, self.size])
masked_res = tf.math.add(re_res, self.mask_npinf)
re_masked_res = tf.reshape(masked_res, [-1, self.size])
u = tf.random_uniform(tf.shape(re_masked_res), dtype=self.logits.dtype)
return tf.argmax(re_masked_res - tf.log(-tf.log(u)), axis=-1)
else:
u = tf.random_uniform(tf.shape(self.logits), dtype=self.logits.dtype)
return tf.argmax(self.logits - tf.log(-tf.log(u)), axis=-1)
@classmethod
def fromflat(cls, flat):
return cls(flat)
class CategoricalPdType(PdType):
"""
to create CategoricalPd
"""
def __init__(self, ncat, nsteps, np_mask, is_act_model):
self.ncat = ncat
self.nsteps = nsteps
self.np_mask = np_mask
self.is_act_model = is_act_model
def pdclass(self):
return CategoricalPd
def pdfromlatent(self, latent_vector, init_scale=1.0, init_bias=0.0):
"""add fc and create CategoricalPd"""
pdparam, mask, mask_npinf = _matching_fc(latent_vector, 'pi', self.ncat, self.nsteps,
init_scale=init_scale, init_bias=init_bias,
np_mask=self.np_mask, is_act_model=self.is_act_model)
return self.pdfromflat(pdparam, mask_npinf, self.nsteps, self.ncat, self.is_act_model), pdparam, mask, mask_npinf
def param_shape(self):
return [self.ncat]
def sample_shape(self):
return []
def sample_dtype(self):
return tf.int32
def _matching_fc(tensor, name, size, nsteps, init_scale, init_bias, np_mask, is_act_model):
"""
add fc op, and add mask op when not in action mode
"""
if tensor.shape[-1] == size:
assert False
return tensor
else:
mask = tf.get_variable("act_mask", dtype=tf.float32, initializer=np_mask[0], trainable=False)
mask_npinf = tf.get_variable("act_mask_npinf", dtype=tf.float32, initializer=np_mask[1], trainable=False)
res = fc(tensor, name, size, init_scale=init_scale, init_bias=init_bias)
if not is_act_model:
re_res = tf.reshape(res, [-1, nsteps, size])
masked_res = tf.math.multiply(re_res, mask)
re_masked_res = tf.reshape(masked_res, [-1, size])
return re_masked_res, mask, mask_npinf
else:
return res, mask, mask_npinf
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
the main model of policy/value network
"""
import tensorflow as tf
from .util import initialize, get_session
class Model:
"""
We use this object to :
__init__:
- Creates the step_model
- Creates the train_model
train():
- Make the training part (feedforward and retropropagation of gradients)
save/load():
- Save load the model
"""
def __init__(self, *, policy, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size=None, np_mask=None):
"""
init
"""
self.sess = sess = get_session()
with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE):
# CREATE OUR TWO MODELS
# act_model that is used for sampling
act_model = policy(nbatch_act, 1, sess, np_mask=np_mask, is_act_model=True)
# Train model for training
if microbatch_size is None:
train_model = policy(nbatch_train, nsteps, sess, np_mask=np_mask, is_act_model=False)
else:
train_model = policy(microbatch_size, nsteps, sess, np_mask=np_mask, is_act_model=False)
# CREATE THE PLACEHOLDERS
self.A = A = train_model.pdtype.sample_placeholder([None])
self.ADV = ADV = tf.placeholder(tf.float32, [None])
self.R = R = tf.placeholder(tf.float32, [None])
# Keep track of old actor
self.OLDNEGLOGPAC = OLDNEGLOGPAC = tf.placeholder(tf.float32, [None])
# Keep track of old critic
self.OLDVPRED = OLDVPRED = tf.placeholder(tf.float32, [None])
self.LR = LR = tf.placeholder(tf.float32, [])
# Cliprange
self.CLIPRANGE = CLIPRANGE = tf.placeholder(tf.float32, [])
neglogpac = train_model.pd.neglogp(A)
# Calculate the entropy
# Entropy is used to improve exploration by limiting the premature convergence to suboptimal policy.
entropy = tf.reduce_mean(train_model.pd.entropy())
# CALCULATE THE LOSS
# Total loss = Policy gradient loss - entropy * entropy coefficient + Value coefficient * value loss
# Clip the value to reduce variability during Critic training
# Get the predicted value
vpred = train_model.vf
vpredclipped = OLDVPRED + tf.clip_by_value(train_model.vf - OLDVPRED, - CLIPRANGE, CLIPRANGE)
# Unclipped value
vf_losses1 = tf.square(vpred - R)
# Clipped value
vf_losses2 = tf.square(vpredclipped - R)
vf_loss = .5 * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2))
# Calculate ratio (pi current policy / pi old policy)
ratio = tf.exp(OLDNEGLOGPAC - neglogpac)
# Defining Loss = - J is equivalent to max J
pg_losses = -ADV * ratio
pg_losses2 = -ADV * tf.clip_by_value(ratio, 1.0 - CLIPRANGE, 1.0 + CLIPRANGE)
# Final PG loss
pg_loss = tf.reduce_mean(tf.maximum(pg_losses, pg_losses2))
approxkl = .5 * tf.reduce_mean(tf.square(neglogpac - OLDNEGLOGPAC))
clipfrac = tf.reduce_mean(tf.to_float(tf.greater(tf.abs(ratio - 1.0), CLIPRANGE)))
# Total loss
loss = pg_loss - entropy * ent_coef + vf_loss * vf_coef
# UPDATE THE PARAMETERS USING LOSS
# 1. Get the model parameters
params = tf.trainable_variables('ppo2_model')
# 2. Build our trainer
self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients
grads_and_var = self.trainer.compute_gradients(loss, params)
grads, var = zip(*grads_and_var)
if max_grad_norm is not None:
# Clip the gradients (normalize)
grads, _grad_norm = tf.clip_by_global_norm(grads, max_grad_norm)
grads_and_var = list(zip(grads, var))
# zip aggregate each gradient with parameters associated
# For instance zip(ABCD, xyza) => Ax, By, Cz, Da
self.grads = grads
self.var = var
self._train_op = self.trainer.apply_gradients(grads_and_var)
self.loss_names = ['policy_loss', 'value_loss', 'policy_entropy', 'approxkl', 'clipfrac']
self.stats_list = [pg_loss, vf_loss, entropy, approxkl, clipfrac]
self.train_model = train_model
self.act_model = act_model
self.step = act_model.step
self.value = act_model.value
self.initial_state = act_model.initial_state
initialize()
def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None):
"""
train the model.
Here we calculate advantage A(s,a) = R + yV(s') - V(s)
Returns = R + yV(s')
"""
advs = returns - values
# Normalize the advantages
advs = (advs - advs.mean()) / (advs.std() + 1e-8)
td_map = {
self.train_model.X : obs,
self.A : actions,
self.ADV : advs,
self.R : returns,
self.LR : lr,
self.CLIPRANGE : cliprange,
self.OLDNEGLOGPAC : neglogpacs,
self.OLDVPRED : values
}
if states is not None:
td_map[self.train_model.S] = states
td_map[self.train_model.M] = masks
return self.sess.run(
self.stats_list + [self._train_op],
td_map
)[:-1]
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
build policy/value network from model
"""
import tensorflow as tf
from .distri import CategoricalPdType
from .util import lstm_model, fc, observation_placeholder, adjust_shape
class PolicyWithValue:
"""
Encapsulates fields and methods for RL policy and value function estimation with shared parameters
"""
def __init__(self, env, observations, latent, estimate_q=False, vf_latent=None, sess=None, np_mask=None, is_act_model=False, **tensors):
"""
Parameters:
----------
env: RL environment
observations: tensorflow placeholder in which the observations will be fed
latent: latent state from which policy distribution parameters should be inferred
vf_latent: latent state from which value function should be inferred (if None, then latent is used)
sess: tensorflow session to run calculations in (if None, default session is used)
**tensors: tensorflow tensors for additional attributes such as state or mask
"""
self.X = observations
self.state = tf.constant([])
self.initial_state = None
self.__dict__.update(tensors)
vf_latent = vf_latent if vf_latent is not None else latent
vf_latent = tf.layers.flatten(vf_latent)
latent = tf.layers.flatten(latent)
# Based on the action space, will select what probability distribution type
self.np_mask = np_mask
self.pdtype = CategoricalPdType(env.action_space.n, env.nsteps, np_mask, is_act_model)
self.act_latent = latent
self.nh = env.action_space.n
self.pd, self.pi, self.mask, self.mask_npinf = self.pdtype.pdfromlatent(latent, init_scale=0.01)
# Take an action
self.action = self.pd.sample()
# Calculate the neg log of our probability
self.neglogp = self.pd.neglogp(self.action)
self.sess = sess or tf.get_default_session()
assert estimate_q is False
self.vf = fc(vf_latent, 'vf', 1)
self.vf = self.vf[:, 0]
if is_act_model:
self._build_model_for_step()
def _evaluate(self, variables, observation, **extra_feed):
sess = self.sess
feed_dict = {self.X: adjust_shape(self.X, observation)}
for inpt_name, data in extra_feed.items():
if inpt_name in self.__dict__.keys():
inpt = self.__dict__[inpt_name]
if isinstance(inpt, tf.Tensor) and inpt._op.type == 'Placeholder':
feed_dict[inpt] = adjust_shape(inpt, data)
return sess.run(variables, feed_dict)
def _build_model_for_step(self):
# multiply with weight and apply mask on self.act_latent to generate
self.act_step = step = tf.placeholder(shape=(), dtype=tf.int64, name='act_step')
with tf.variable_scope('pi', reuse=tf.AUTO_REUSE):
from .util import ortho_init
nin = self.act_latent.get_shape()[1].value
w = tf.get_variable("w", [nin, self.nh], initializer=ortho_init(0.01))
b = tf.get_variable("b", [self.nh], initializer=tf.constant_initializer(0.0))
logits = tf.matmul(self.act_latent, w)+b
piece = tf.slice(self.mask, [step, 0], [1, self.nh])
re_piece = tf.reshape(piece, [-1])
masked_logits = tf.math.multiply(logits, re_piece)
npinf_piece = tf.slice(self.mask_npinf, [step, 0], [1, self.nh])
re_npinf_piece = tf.reshape(npinf_piece, [-1])
def sample(logits, mask_npinf):
new_logits = tf.math.add(logits, mask_npinf)
u = tf.random_uniform(tf.shape(new_logits), dtype=logits.dtype)
return tf.argmax(new_logits - tf.log(-tf.log(u)), axis=-1)
def neglogp(logits, x):
# return tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logits, labels=x)
# Note: we can't use sparse_softmax_cross_entropy_with_logits because
# the implementation does not allow second-order derivatives...
if x.dtype in {tf.uint8, tf.int32, tf.int64}:
# one-hot encoding
x_shape_list = x.shape.as_list()
logits_shape_list = logits.get_shape().as_list()[:-1]
for xs, ls in zip(x_shape_list, logits_shape_list):
if xs is not None and ls is not None:
assert xs == ls, 'shape mismatch: {} in x vs {} in logits'.format(xs, ls)
x = tf.one_hot(x, logits.get_shape().as_list()[-1])
else:
# already encoded
assert x.shape.as_list() == logits.shape.as_list()
return tf.nn.softmax_cross_entropy_with_logits_v2(
logits=logits,
labels=x)
self.act_action = sample(masked_logits, re_npinf_piece)
self.act_neglogp = neglogp(masked_logits, self.act_action)
def step(self, step, observation, **extra_feed):
"""
Compute next action(s) given the observation(s)
Parameters:
----------
observation: observation data (either single or a batch)
**extra_feed: additional data such as state or mask (names of the arguments should match the ones in constructor, see __init__)
Returns:
-------
(action, value estimate, next state, negative log likelihood of the action under current policy parameters) tuple
"""
extra_feed['act_step'] = step
a, v, state, neglogp = self._evaluate([self.act_action, self.vf, self.state, self.act_neglogp], observation, **extra_feed)
if state.size == 0:
state = None
return a, v, state, neglogp
def value(self, ob, *args, **kwargs):
"""
Compute value estimate(s) given the observation(s)
Parameters:
----------
observation: observation data (either single or a batch)
**extra_feed: additional data such as state or mask (names of the arguments should match the ones in constructor, see __init__)
Returns:
-------
value estimate
"""
return self._evaluate(self.vf, ob, *args, **kwargs)
def build_lstm_policy(model_config, value_network=None, estimate_q=False, **policy_kwargs):
"""
build lstm policy and value network, they share the same lstm network.
the parameters all use their default values.
"""
policy_network = lstm_model(**policy_kwargs)
def policy_fn(nbatch=None, nsteps=None, sess=None, observ_placeholder=None, np_mask=None, is_act_model=False):
ob_space = model_config.observation_space
X = observ_placeholder if observ_placeholder is not None else observation_placeholder(ob_space, batch_size=nbatch)
extra_tensors = {}
# encode_observation is not necessary anymore as we use embedding_lookup
encoded_x = X
with tf.variable_scope('pi', reuse=tf.AUTO_REUSE):
policy_latent = policy_network(encoded_x, 1, model_config.observation_space.n)
if isinstance(policy_latent, tuple):
policy_latent, recurrent_tensors = policy_latent
if recurrent_tensors is not None:
# recurrent architecture, need a few more steps
nenv = nbatch // nsteps
assert nenv > 0, 'Bad input for recurrent policy: batch size {} smaller than nsteps {}'.format(nbatch, nsteps)
policy_latent, recurrent_tensors = policy_network(encoded_x, nenv, model_config.observation_space.n)
extra_tensors.update(recurrent_tensors)
_v_net = value_network
assert _v_net is None or _v_net == 'shared'
vf_latent = policy_latent
policy = PolicyWithValue(
env=model_config,
observations=X,
latent=policy_latent,
vf_latent=vf_latent,
sess=sess,
estimate_q=estimate_q,
np_mask=np_mask,
is_act_model=is_act_model,
**extra_tensors
)
return policy
return policy_fn
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment