Unverified Commit 19726d4d authored by liuzhe-lz's avatar liuzhe-lz Committed by GitHub
Browse files

retiarii graph and mutation (#3057)


Co-authored-by: default avatarliuzhe <zhe.liu@microsoft.com>
parent 45e82b3e
from .graph import *
from .mutator import *
from .operation import *
# these should be experiment config in release
framework = 'pytorch'
"""
Classes related to Graph IR, except `Operation`.
"""
from __future__ import annotations
import copy
import json
from enum import Enum
from typing import *
from .operation import Cell, Operation, _PseudoOperation
__all__ = ['Model', 'ModelStatus', 'Graph', 'Node', 'Edge', 'IllegalGraphError', 'MetricData']
MetricData = NewType('MetricData', Any)
"""
Graph metrics like loss, accuracy, etc.
Maybe we can assume this is a single float number for first iteration.
"""
class TrainingConfig:
"""
Training training_config of a model.
Module will be imported, initialized with generated model and arguments in ``kwargs``.
Attributes
----------
module
Trainer module
kwargs
Trainer keyword arguments
"""
def __init__(self, module: str, kwargs: Dict[str, any]):
self.module = module
self.kwargs = kwargs
def __repr__(self):
return f'TrainingConfig(module={self.module}, kwargs={self.kwargs})'
@staticmethod
def _load(ir: Any) -> TrainingConfig:
return TrainingConfig(ir['module'], ir.get('kwargs', {}))
def _dump(self) -> Any:
return {
'module': self.module,
'kwargs': self.kwargs
}
class Model:
"""
Top-level structure of graph IR.
In execution engine's perspective, this is a trainable neural network model.
In mutator's perspective, this is a sandbox for a round of mutation.
Once a round of mutation starts, a sandbox is created and all mutating operations will happen inside.
When mutation is complete, the sandbox will be frozen to a trainable model.
Then the strategy will submit model to execution engine for training.
The model will record its metrics once trained.
Attributes
----------
status
See `ModelStatus`.
root_graph
The outermost graph which usually takes dataset as input and feeds output to loss function.
graphs
All graphs (subgraphs) in this model.
training_config
Training config
history
Mutation history.
`self` is directly mutated from `self.history[-1]`;
`self.history[-1] is mutated from `self.history[-2]`, and so on.
`self.history[0]` is the base graph.
metric
Training result of the model, or `None` if it's not yet trained or has failed to train.
intermediate_metrics
Intermediate training metrics. If the model is not trained, it's an empty list.
"""
_cur_model_id = 0
def __init__(self, _internal=False):
assert _internal, '`Model()` is private, use `model.fork()` instead'
Model._cur_model_id += 1
self.model_id = Model._cur_model_id
self.status: ModelStatus = ModelStatus.Mutating
self._root_graph_name: str = '_model'
self.graphs: Dict[str, Graph] = {}
self.training_config: TrainingConfig = TrainingConfig('foo', {})
self.history: List[Model] = []
self.metric: Optional[MetricData] = None
self.intermediate_metrics: List[MetricData] = []
self._last_uid: int = 0
def __repr__(self):
return f'Model(model_id={self.model_id}, status={self.status}, graphs={list(self.graphs.keys())}, ' + \
f'training_config={self.training_config}, metric={self.metric}, intermediate_metrics={self.intermediate_metrics})'
@property
def root_graph(self) -> Graph:
return self.graphs[self._root_graph_name]
def fork(self) -> Model:
"""
Create a new model which has same topology, names, and IDs to current one.
Can only be invoked on a frozen model.
The new model will be in `Mutating` state.
This API is used in mutator base class.
"""
new_model = Model(_internal=True)
new_model._root_graph_name = self._root_graph_name
new_model.graphs = {name: graph._fork_to(new_model) for name, graph in self.graphs.items()}
new_model.training_config = copy.deepcopy(self.training_config)
new_model.history = self.history + [self]
new_model._last_uid = self._last_uid
return new_model
def _uid(self) -> int:
self._last_uid += 1
return self._last_uid
@staticmethod
def _load(ir: Any) -> Model:
model = Model(_internal=True)
for graph_name, graph_data in ir.items():
if graph_name != '_training_config':
Graph._load(model, graph_name, graph_data)._register()
#model.training_config = TrainingConfig._load(ir['_training_config'])
return model
def _dump(self) -> Any:
ret = {name: graph._dump() for name, graph in self.graphs.items()}
#ret['_training_config'] = self.training_config._dump()
return ret
class ModelStatus(Enum):
"""
The status of model.
A model is created in `Mutating` status.
When the mutation is done and the model get ready to train, its status becomes `Frozen`.
When training started, the model's status becomes `Training`.
If training is successfully ended, model's `metric` attribute get set and its status becomes `Trained`.
If training failed, the status becomes `Failed`.
"""
Mutating = "mutating"
Frozen = "frozen"
Training = "training"
Trained = "trained"
Failed = "failed"
_InputPseudoUid = -1
_OutputPseudoUid = -2
class Graph:
"""
Graph topology.
This class simply represents the topology, with no semantic meaning.
All other information like metric, non-graph functions, mutation history, etc should go to `Model`.
Each graph belongs to and only belongs to one `Model`.
Attributes
----------
model
The model containing (and owning) this graph.
id
Unique ID in the model.
If two models have graphs of identical ID, they are semantically the same graph.
Typically this means one graph is mutated from another, or they are both mutated from one ancestor.
name
Mnemonic name of this graph. It should have an one-to-one mapping with ID.
input_names
Optional mnemonic names of input parameters.
output_names
Optional mnemonic names of output values.
input_node
...
output_node
...
hidden_nodes
...
nodes
All input/output/hidden nodes.
edges
...
"""
def __init__(self, model: Model, graph_id: int, name: str = None, _internal: bool = False):
assert _internal, '`Graph()` is private'
self.model: Model = model
self.id: int = graph_id
self.name: str = name or f'_generated_{graph_id}'
self.input_names: Optional[List[str]] = None
self.output_names: Optional[List[str]] = None
self.input_node: Node = Node(self, _InputPseudoUid, '_inputs', _PseudoOperation('_inputs'), _internal=True)
self.output_node: Node = Node(self, _OutputPseudoUid, '_outputs', _PseudoOperation('_outputs'), _internal=True)
self.hidden_nodes: List[Node] = []
self.edges: List[Edge] = []
def __repr__(self):
return f'Graph(id={self.id}, name={self.name}, input_names={self.input_names}, ' + \
f'output_names={self.output_names}, num_hidden_nodes={len(self.hidden_nodes)}, num_edges={len(self.edges)})'
@property
def nodes(self) -> List[Node]:
return [self.input_node, self.output_node] + self.hidden_nodes
# mutation
def add_node(self, type: Union[Operation, str], **parameters) -> Node:
if isinstance(type, Operation):
assert not parameters
op = type
else:
op = Operation.new(type, **parameters)
return Node(self, self.model._uid(), None, op, _internal=True)._register()
# mutation
def add_edge(self, head: Tuple[Node, Optional[int]], tail: Tuple[Node, Optional[int]]) -> Edge:
assert head[0].graph is self and tail[0].graph is self
return Edge(head, tail)._register()
def get_node_by_name(self, name: str) -> Optional[Node]:
"""
Returns the node which has specified name; or returns `None` if no node has this name.
"""
found = [node for node in self.nodes if node.name == name]
return found[0] if found else None
def get_nodes_by_type(self, operation_type: str) -> List[Node]:
"""
Returns nodes whose operation is specified typed.
"""
return [node for node in self.hidden_nodes if node.operation.type == operation_type]
def topo_sort(self) -> List[Node]: # TODO
...
def fork(self) -> Graph:
"""
Fork the model and returns corresponding graph in new model.
This shortcut might be helpful because many algorithms only cares about "stem" subgraph instead of whole model.
"""
return self.model.fork().graphs[self.name]
def __eq__(self, other: object) -> bool:
return self is other
def _fork_to(self, model: Model) -> Graph:
new_graph = Graph(model, self.id, self.name, _internal=True)._register()
new_graph.input_names = self.input_names
new_graph.output_names = self.output_names
for node in self.hidden_nodes:
Node(new_graph, node.id, node.name, node.operation, _internal=True)._register()
id_to_new_node = {node.id: node for node in new_graph.nodes}
for edge in self.edges:
new_head = id_to_new_node[edge.head.id]
new_tail = id_to_new_node[edge.tail.id]
Edge((new_head, edge.head_slot), (new_tail, edge.tail_slot), _internal=True)._register()
return new_graph
def _copy(self) -> Graph:
# Copy this graph inside the model.
# The new graph will have identical topology, but its nodes' name and ID will be different.
new_graph = Graph(self.model, self.model._uid(), _internal=True)._register()
new_graph.input_names = self.input_names
new_graph.output_names = self.output_names
id_to_new_node = {} # old node ID -> new node object
for old_node in self.hidden_nodes:
new_node = Node(new_graph, self.model._uid(), None, old_node.operation, _internal=True)._register()
id_to_new_node[old_node.id] = new_node
for edge in self.edges:
new_head = id_to_new_node[edge.head.id]
new_tail = id_to_new_node[edge.tail.id]
Edge((new_head, edge.head_slot), (new_tail, edge.tail_slot), _internal=True)._register()
return new_graph
def _register(self) -> Graph:
self.model.graphs[self.name] = self
return self
@staticmethod
def _load(model: Model, name: str, ir: Any) -> Graph:
graph = Graph(model, model._uid(), name, _internal=True)
graph.input_names = ir.get('inputs')
graph.output_names = ir.get('outputs')
for node_name, node_data in ir['nodes'].items():
Node._load(graph, node_name, node_data)._register()
for edge_data in ir['edges']:
Edge._load(graph, edge_data)._register()
return graph
def _dump(self) -> Any:
return {
'inputs': self.input_names,
'outputs': self.output_names,
'nodes': {node.name: node._dump() for node in self.hidden_nodes},
'edges': [edge._dump() for edge in self.edges]
}
class Node:
"""
An operation or an opaque subgraph inside a graph.
Each node belongs to and only belongs to one `Graph`.
Nodes should never be created with constructor. Use `Graph.add_node()` instead.
The node itself is for topology only.
Information of tensor calculation should all go inside `operation` attribute.
TODO: parameter of subgraph (cell)
It's easy to assign parameters on cell node, but it's hard to "use" them.
We need to design a way to reference stored cell parameters in inner node operations.
e.g. `self.fc = Linear(self.units)` <- how to express `self.units` in IR?
Attributes
----------
graph
The graph containing this node.
id
Unique ID in the model.
If two models have nodes with same ID, they are semantically the same node.
name
Mnemonic name. It should have an one-to-one mapping with ID.
operation
...
cell
Read only shortcut to get the referenced subgraph.
If this node is not a subgraph (is a primitive operation), accessing `cell` will raise an error.
predecessors
Predecessor nodes of this node in the graph. This is an optional mutation helper.
successors
Successor nodes of this node in the graph. This is an optional mutation helper.
incoming_edges
Incoming edges of this node in the graph. This is an optional mutation helper.
outgoing_edges
Outgoing edges of this node in the graph. This is an optional mutation helper.
"""
def __init__(self, graph, node_id, name, operation, _internal=False):
self.graph: Graph = graph
self.id: int = node_id
self.name: str = name
self.operation: Operation = operation
def __repr__(self):
return f'Node(id={self.id}, name={self.name}, operation={self.operation})'
@property
def predecessors(self) -> List[Node]:
return sorted(set(edge.head for edge in self.incoming_edges), key=(lambda node: node.id))
@property
def successors(self) -> List[Node]:
return sorted(set(edge.tail for edge in self.outgoing_edges), key=(lambda node: node.id))
@property
def incoming_edges(self) -> List[Edge]:
return [edge for edge in self.graph.edges if edge.tail is self]
@property
def outgoing_edges(self) -> List[Edge]:
return [edge for edge in self.graph.edges if edge.head is self]
@property
def cell(self) -> Graph:
assert isinstance(self.operation, Cell)
return self.graph.model.graphs[self.operation.parameters['cell']]
# mutation
def update_operation(self, type: Union[Operation, str], **parameters) -> None:
if isinstance(type, Operation):
assert not parameters
self.operation = type
else:
self.operation = Operation.new(type, **parameters)
# mutation
def remove(self) -> None:
assert not self.incoming_edges and not self.outgoing_edges
self.graph.hidden_nodes.remove(self)
# mutation
def specialize_cell(self) -> Graph:
"""
Only available if the operation is a cell.
Duplicate the cell template and let this node reference to newly created copy.
"""
new_cell = self.cell._copy()._register()
self.operation = Operation.new('_cell', cell=new_cell.name)
return new_cell
def __eq__(self, other: object) -> bool:
return self is other
def _register(self) -> Node:
self.graph.hidden_nodes.append(self)
return self
@staticmethod
def _load(graph: Graph, name: str, ir: Any) -> Node:
ir = dict(ir)
if 'type' not in ir and 'cell' in ir:
ir['type'] = '_cell'
op = Operation.new(**ir)
return Node(graph, graph.model._uid(), name, op)
def _dump(self) -> Any:
return {'type': self.operation.type, **self.operation.parameters}
class Edge:
"""
A tensor, or "data flow", between two nodes.
Example forward code snippet:
```
a, b, c = split(x)
p = concat(a, c)
q = sum(b, p)
z = relu(q)
```
Edges in above snippet:
+ head: (split, 0), tail: (concat, 0) # a in concat
+ head: (split, 2), tail: (concat, 1) # c in concat
+ head: (split, 1), tail: (sum, -1 or 0) # b in sum
+ head: (concat, null), tail: (sum, -1 or 1) # p in sum
+ head: (sum, null), tail: (relu, null) # q in relu
Attributes
----------
graph
...
head
Head node.
tail
Tail node.
head_slot
Index of outputs in head node.
If the node has only one output, this should be `null`.
tail_slot
Index of inputs in tail node.
If the node has only one input, this should be `null`.
If the node does not care about order, this can be `-1`.
"""
def __init__(
self,
head: Tuple[Node, Optional[int]],
tail: Tuple[Node, Optional[int]],
_internal: bool = False):
assert _internal, '`Edge()` is private'
self.graph: Graph = head[0].graph
self.head: Node = head[0]
self.tail: Node = tail[0]
self.head_slot: Optional[int] = head[1]
self.tail_slot: Optional[int] = tail[1]
def __repr__(self):
return f'Edge(head=({self.head}, {self.head_slot}), tail=({self.tail}, {self.tail_slot}))'
# mutation
def remove(self) -> None:
self.graph.edges.remove(self)
def _register(self) -> Edge:
self.graph.edges.append(self)
return self
@staticmethod
def _load(graph: Graph, ir: Any) -> Edge:
head = graph.get_node_by_name(ir['head'][0])
tail = graph.get_node_by_name(ir['tail'][0])
return Edge((head, ir['head'][1]), (tail, ir['tail'][1]), _internal=True)
def _dump(self) -> Any:
return {
'head': [self.head.name, self.head_slot],
'tail': [self.tail.name, self.tail_slot]
}
class IllegalGraphError(ValueError):
def __init__(self, graph, *args):
self._debug_dump_graph(graph)
super().__init__(*args)
@staticmethod
def _debug_dump_graph(graph):
if isinstance(graph, Graph):
graph = graph.dump()
with open('generated/debug.json', 'w') as dump_file:
json.dump(graph, dump_file, indent=4)
from __future__ import annotations
from typing import *
from .graph import *
__all__ = ['Sampler', 'Mutator']
Choice = NewType('Choice', Any)
class Sampler:
"""
Handles `Mutator.choice()` calls.
"""
def choice(self, candidates: List[Choice], mutator: Mutator, model: Model, index: int) -> Choice:
raise NotImplementedError()
def mutation_start(self, mutator: Mutator, model: Model) -> None:
pass
def mutation_end(self, mutator: Mutator, model: Model) -> None:
pass
class Mutator:
"""
Mutates graphs in model to generate new model.
`Mutator` class will be used in two places:
1. Inherit `Mutator` to implement graph mutation logic.
2. Use `Mutator` subclass to implement NAS strategy.
In scenario 1, the subclass should implement `Mutator.mutate()` interface with `Mutator.choice()`.
In scenario 2, strategy should use constructor or `Mutator.bind_sampler()` to initialize subclass,
and then use `Mutator.apply()` to mutate model.
For certain mutator subclasses, strategy or sampler can use `Mutator.dry_run()` to predict choice candidates.
# Method names are open for discussion.
"""
def __init__(self, sampler: Optional[Sampler] = None):
self.sampler: Optional[Sampler] = sampler
self._cur_model: Optional[Model] = None
self._cur_choice_idx: Optional[int] = None
def bind_sampler(self, sampler: Sampler) -> Mutator:
"""
Set the sampler which will handle `Mutator.choice` calls.
"""
self.sampler = sampler
def apply(self, model: Model) -> Model:
"""
Apply this mutator on a model.
Returns mutated model.
The model will be copied before mutation and the original model will not be modified.
"""
copy = model.fork()
self._cur_model = copy
self._cur_choice_idx = 0
self.sampler.mutation_start(self, copy)
self.mutate(copy)
self.sampler.mutation_end(self, copy)
self._cur_model = None
self._cur_choice_idx = None
return copy
def dry_run(self, model: Model) -> List[List[Choice]]:
"""
Dry run mutator on a model to collect choice candidates.
If you invoke this method multiple times on same or different models,
it may or may not return identical results, depending on how the subclass implements `Mutator.mutate()`.
"""
sampler_backup = self.sampler
recorder = _RecorderSampler()
self.sampler = recorder
self.apply(model)
self.sampler = sampler_backup
return recorder.recorded_candidates
def mutate(self, model: Model) -> None:
"""
Abstract method to be implemented by subclass.
Mutate a model in place.
"""
raise NotImplementedError()
def choice(self, candidates: Iterable[Choice]) -> Choice:
"""
Ask sampler to make a choice.
"""
ret = self.sampler.choice(list(candidates), self, self._cur_model, self._cur_choice_idx)
self._cur_choice_idx += 1
return ret
class _RecorderSampler(Sampler):
def __init__(self):
self.recorded_candidates: List[List[Choice]] = []
def choice(self, candidates: List[Choice], *args) -> Choice:
self.recorded_candidates.append(candidates)
return candidates[0]
from __future__ import annotations
from enum import Enum
from typing import *
from . import debug_configs
class Operation:
"""
Calculation logic of a graph node.
The constructor is private. Use `Operation.new()` to create operation object.
`Operation` is a naive record.
Do not "mutate" its attributes or store information relate to specific node.
All complex logic should be implemented in `Node` class.
Attributes
----------
type
Operation type name (e.g. Conv2D).
If it starts with underscore, the "operation" is a special one (e.g. subgraph, input/output).
parameters
Arbitrary key-value parameters (e.g. kernel_size).
"""
def __init__(
self,
type: str,
parameters: Dict[str, Any],
_internal_access: bool = False):
assert _internal_access, '`Operation()` is private, use `Operation.new()` instead'
self.type: str = type
self.parameters: Dict[str, Any] = parameters
def to_init_code(self, field: str) -> str:
params = ', '.join(f'{key}={repr(value)}' for key, value in self.parameters.items())
return f'self.{field} = {self._to_class_name()}({params})'
def to_forward_code(self, field: str, output: str, *inputs: str) -> str:
return f'{output} = self.{field}({", ".join(inputs)})'
def _to_class_name(self) -> str:
raise NotImplementedError()
def __bool__(self) -> bool:
return True
@staticmethod
def new(type: str, **parameters: Any) -> Operation:
if type == '_cell':
return Cell(parameters['cell'])
else:
if debug_configs.framework.lower() in ('torch', 'pytorch'):
from .operation_def import torch_op_def
cls = PyTorchOperation._find_subclass(type)
elif debug_configs.framework.lower() in ('tf', 'tensorflow'):
from .operation_def import tf_op_def
cls = TensorFlowOperation._find_subclass(type)
else:
raise ValueError(f'Unsupported framework: {debug_configs.framework}')
return cls(type, parameters, _internal_access=True)
@classmethod
def _find_subclass(cls, subclass_name):
for subclass in cls.__subclasses__():
if subclass.__name__ == subclass_name:
return subclass
return cls
def __repr__(self):
type_name = type(self).__name__
args = [f'{key}={repr(value)}' for key, value in self.parameters.items()]
if type_name != self.type:
args = [f'type="{self.type}"'] + args
return f'{type_name}({", ".join(args)})'
def __eq__(self, other):
return type(other) is type(self) and other.type == self.type and other.parameters == self.parameters
class PyTorchOperation(Operation):
def _to_class_name(self) -> str:
return 'nn.' + self.type
class TensorFlowOperation(Operation):
def _to_class_name(self) -> str:
return 'K.layers.' + self.type
class Cell(Operation):
"""
An operation reference to a subgraph.
Example code:
```
def __init__(...):
...
self.cell = CustomCell(...)
self.relu = K.layers.ReLU()
...
def forward(...):
...
x = self.cell(x)
...
```
In above example, node `self.cell`'s operation is `Cell(cell_name='CustomCell')`.
For comparison, `self.relu`'s operation is `Operation(type='ReLU')`.
TODO: parameters of subgraph (see `Node` class)
Attributes
----------
type
Always "_cell".
parameters
A dict with only one item; the key is "cell" and the value is cell's name.
framework
No real usage. Exists for compatibility with base class.
"""
def __init__(self, cell_name: str):
self.type = '_cell'
self.parameters = {'cell': cell_name}
def to_init_code(self, field: str) -> str:
return f'self.{field} = {self.parameters["cell"]}()'
class _PseudoOperation(Operation):
"""
This is the pseudo operation used by I/O nodes.
The benefit is that users no longer need to verify `Node.operation is not None`,
especially in static type checking.
"""
def __init__(self, type_name: str):
assert type_name.startswith('_')
self.type = type_name
self.parameters = {}
def to_init_code(self, field: str) -> str:
raise ValueError(f'Cannot generate code for pseudo operation "{self.type}"')
def to_forward_code(self, field: str, output: str, *inputs: str) -> str:
raise ValueError(f'Cannot generate code for pseudo operation "{self.type}"')
def __bool__(self) -> bool:
return False
"""
Definition of operation types.
These are currently examples for overriding codegen.
Feel free to propose better package name or hierarchy.
"""
from ..operation import TensorFlowOperation
class Conv2D(TensorFlowOperation):
def to_init_code(self, field):
parameters = {'padding': 'same', **parameters}
super().__init__(type, parameters, _internal_access)
from ..operation import PyTorchOperation
class relu(PyTorchOperation):
def to_init_code(self, field):
return ''
def to_forward_code(self, field, output, *inputs) -> str:
assert len(inputs) == 1
return f'{output} = nn.functional.relu({inputs[0]})'
class Flatten(PyTorchOperation):
def to_init_code(self, field):
return ''
def to_forward_code(self, field, output, *inputs) -> str:
assert len(inputs) == 1
return f'{output} = {inputs[0]}.view({inputs[0]}.size(0), -1)'
class Dense(PyTorchOperation):
def to_init_code(self, field):
return f"self.{field} = nn.Linear({self.parameters['in_features']}, {self.parameters['out_features']})"
def to_forward_code(self, field, output, *inputs) -> str:
assert len(inputs) == 1
return f'{output} = self.{field}({inputs[0]})'
class Softmax(PyTorchOperation):
def to_init_code(self, field):
return ''
def to_forward_code(self, field, output, *inputs) -> str:
assert len(inputs) == 1
return f'{output} = F.softmax({inputs[0]}, -1)'
{
"_model": {
"inputs": ["image"],
"outputs": ["metric"],
"nodes": {
"stem": {"cell": "stem"},
"flatten": {"type": "Flatten"},
"fc1": {"type": "Dense", "units": 1024, "activation": "relu"},
"fc2": {"type": "Dense", "units": 10},
"softmax": {"type": "Softmax"}
},
"edges": [
{"head": ["_inputs", 0], "tail": ["stem", 0]},
{"head": ["stem", 0], "tail": ["flatten", null]},
{"head": ["flatten", null], "tail": ["fc1", null]},
{"head": ["fc1", null], "tail": ["fc2", null]},
{"head": ["fc2", null], "tail": ["softmax", null]},
{"head": ["softmax", null], "tail": ["_outputs", 0]}
]
},
"stem": {
"nodes": {
"conv1": {"type": "Conv2D", "filters": 32, "kernel_size": 5, "activation": "relu"},
"pool1": {"type": "MaxPool2D", "pool_size": 2},
"conv2": {"type": "Conv2D", "filters": 64, "kernel_size": 5, "activation": "relu"},
"pool2": {"type": "MaxPool2D", "pool_size": 2}
},
"edges": [
{"head": ["_inputs", 0], "tail": ["conv1", null]},
{"head": ["conv1", null], "tail": ["pool1", null]},
{"head": ["pool1", null], "tail": ["conv2", null]},
{"head": ["conv2", null], "tail": ["pool2", null]},
{"head": ["pool2", null], "tail": ["_outputs", 0]}
]
}
}
import json
from pathlib import Path
import sys
from nni.retiarii import *
json_files = [
'mnist-tensorflow.json'
]
def test_model_load_dump():
for json_file in json_files:
path = Path(__file__).parent / json_file
_test_file(path)
def _test_file(json_path):
orig_ir = json.load(json_path.open())
model = Model._load(orig_ir)
dump_ir = model._dump()
# add default values to JSON, so we can compare with `==`
for graph_name, graph in orig_ir.items():
if 'inputs' not in graph:
graph['inputs'] = None
if 'outputs' not in graph:
graph['outputs'] = None
for node_name, node in graph['nodes'].items():
if 'type' not in node and 'cell' in node:
node['type'] = '_cell'
assert orig_ir == dump_ir
if __name__ == '__main__':
test_model_load_dump()
import json
from pathlib import Path
import sys
from nni.retiarii import *
# FIXME
import nni.retiarii.debug_configs
nni.retiarii.debug_configs.framework = 'tensorflow'
class DebugSampler(Sampler):
def __init__(self):
self.iteration = 0
def choice(self, candidates, mutator, model, index):
idx = (self.iteration + index) % len(candidates)
return candidates[idx]
def mutation_start(self, mutator, model):
self.iteration += 1
class DebugMutator(Mutator):
def mutate(self, model):
max_pool = Operation.new('MaxPool2D', pool_size = 2)
avg_pool = Operation.new('AveragePooling2D', pool_size=2)
global_pool = Operation.new('GlobalAveragePooling2D')
ops = [max_pool, avg_pool, global_pool]
pool1 = model.graphs['stem'].get_node_by_name('pool1')
pool1.update_operation(self.choice(ops))
pool2 = model.graphs['stem'].get_node_by_name('pool2')
pool2.update_operation(self.choice(ops))
sampler = DebugSampler()
mutator = DebugMutator()
mutator.bind_sampler(sampler)
json_path = Path(__file__).parent / 'mnist-tensorflow.json'
ir = json.load(json_path.open())
model0 = Model._load(ir)
def test_dry_run():
candidates = mutator.dry_run(model0)
assert len(candidates) == 2
assert candidates[0] == [max_pool, avg_pool, global_pool]
assert candidates[1] == [max_pool, avg_pool, global_pool]
def test_mutation():
model1 = mutator.apply(model0)
assert _get_pools(model1) == (avg_pool, global_pool)
model2 = mutator.apply(model1)
assert _get_pools(model2) == (global_pool, max_pool)
assert model2.history == [model0, model1]
assert _get_pools(model0) == (max_pool, max_pool)
assert _get_pools(model1) == (avg_pool, global_pool)
def _get_pools(model):
pool1 = model.graphs['stem'].get_node_by_name('pool1').operation
pool2 = model.graphs['stem'].get_node_by_name('pool2').operation
return pool1, pool2
max_pool = Operation.new(type='MaxPool2D', pool_size=2)
avg_pool = Operation.new(type='AveragePooling2D', pool_size=2)
global_pool = Operation.new(type='GlobalAveragePooling2D')
if __name__ == '__main__':
test_dry_run()
test_mutation()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment