Unverified Commit d165905d authored by QuanluZhang's avatar QuanluZhang Committed by GitHub
Browse files

[Retiarii] end2end (#3122)

parent 7d1acfbd
...@@ -9,6 +9,8 @@ from .common_utils import print_error ...@@ -9,6 +9,8 @@ from .common_utils import print_error
def rest_put(url, data, timeout, show_error=False): def rest_put(url, data, timeout, show_error=False):
'''Call rest put method''' '''Call rest put method'''
print('## [nnictl] PUT', url)
print(data)
try: try:
response = requests.put(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\ response = requests.put(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\
data=data, timeout=timeout) data=data, timeout=timeout)
...@@ -20,6 +22,8 @@ def rest_put(url, data, timeout, show_error=False): ...@@ -20,6 +22,8 @@ def rest_put(url, data, timeout, show_error=False):
def rest_post(url, data, timeout, show_error=False): def rest_post(url, data, timeout, show_error=False):
'''Call rest post method''' '''Call rest post method'''
print('## [nnictl] POST', url)
print(data)
try: try:
response = requests.post(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\ response = requests.post(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\
data=data, timeout=timeout) data=data, timeout=timeout)
......
# Copyright (c) Microsoft Corporation. # Copyright (c) Microsoft Corporation.
# Licensed under the MIT license. # Licensed under the MIT license.
import os
import copy import copy
import functools import functools
from enum import Enum, unique from enum import Enum, unique
...@@ -9,8 +8,6 @@ import json_tricks ...@@ -9,8 +8,6 @@ import json_tricks
from schema import And from schema import And
from . import parameter_expressions from . import parameter_expressions
from .runtime.common import init_logger
from .runtime.env_vars import dispatcher_env_vars
to_json = functools.partial(json_tricks.dumps, allow_nan=True) to_json = functools.partial(json_tricks.dumps, allow_nan=True)
...@@ -120,16 +117,6 @@ def convert_dict2tuple(value): ...@@ -120,16 +117,6 @@ def convert_dict2tuple(value):
return value return value
def init_dispatcher_logger():
"""
Initialize dispatcher logging configuration
"""
logger_file_path = 'dispatcher.log'
if dispatcher_env_vars.NNI_LOG_DIRECTORY is not None:
logger_file_path = os.path.join(dispatcher_env_vars.NNI_LOG_DIRECTORY, logger_file_path)
init_logger(logger_file_path, dispatcher_env_vars.NNI_LOG_LEVEL)
def json2space(x, oldy=None, name=NodeType.ROOT): def json2space(x, oldy=None, name=NodeType.ROOT):
""" """
Change search space from json format to hyperopt format Change search space from json format to hyperopt format
......
...@@ -47,4 +47,4 @@ ignore-patterns=test* ...@@ -47,4 +47,4 @@ ignore-patterns=test*
# List of members which are set dynamically and missed by pylint inference # List of members which are set dynamically and missed by pylint inference
generated-members=numpy.*,torch.*,tensorflow.* generated-members=numpy.*,torch.*,tensorflow.*
ignored-modules=tensorflow ignored-modules=tensorflow,_win32,msvcrt
import os
import sys
from nni.retiarii.integration import RetiariiAdvisor
...@@ -8,7 +8,7 @@ import torch.nn.functional as F ...@@ -8,7 +8,7 @@ import torch.nn.functional as F
import sys import sys
from pathlib import Path from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[2])) sys.path.append(str(Path(__file__).resolve().parents[2]))
from nni.retiarii import nn import nni.retiarii.nn.pytorch as nn
# Paper suggests 0.9997 momentum, for TensorFlow. Equivalent PyTorch momentum is # Paper suggests 0.9997 momentum, for TensorFlow. Equivalent PyTorch momentum is
# 1.0 - tensorflow. # 1.0 - tensorflow.
...@@ -126,7 +126,8 @@ class MNASNet(nn.Module): ...@@ -126,7 +126,8 @@ class MNASNet(nn.Module):
def __init__(self, alpha, depths, convops, kernel_sizes, num_layers, def __init__(self, alpha, depths, convops, kernel_sizes, num_layers,
skips, num_classes=1000, dropout=0.2): skips, num_classes=1000, dropout=0.2):
super(MNASNet, self).__init__() super(MNASNet, self).__init__(alpha, depths, convops, kernel_sizes, num_layers,
skips, num_classes, dropout)
assert alpha > 0.0 assert alpha > 0.0
assert len(depths) == len(convops) == len(kernel_sizes) == len(num_layers) == len(skips) == 7 assert len(depths) == len(convops) == len(kernel_sizes) == len(num_layers) == len(skips) == 7
self.alpha = alpha self.alpha = alpha
...@@ -287,31 +288,5 @@ class MobileConv(nn.Module): ...@@ -287,31 +288,5 @@ class MobileConv(nn.Module):
out = out + x out = out + x
return out return out
#====================Training approach
'''
import sdk
from sdk.mutators.builtin_mutators import ModuleMutator
import datasets
class ModelTrain(sdk.Trainer):
def __init__(self, device='cuda'):
super(ModelTrain, self).__init__()
self.device = torch.device(device)
self.data_provider = datasets.ImagenetDataProvider(save_path="/data/v-yugzh/imagenet",
train_batch_size=32,
test_batch_size=32,
valid_size=None,
n_worker=4,
resize_scale=0.08,
distort_color='normal')
def train_dataloader(self):
return self.data_provider.train
def val_dataloader(self):
return self.data_provider.valid
'''
#====================Experiment config
# mnasnet0_5 # mnasnet0_5
ir_module = _InvertedResidual(16, 16, 3, 1, 1, True) ir_module = _InvertedResidual(16, 16, 3, 1, 1, True)
\ No newline at end of file
import warnings
import torch
import torch.nn as torch_nn
from torchvision.models.utils import load_state_dict_from_url
import torch.nn.functional as F
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[2]))
from nni.retiarii import nn
# Paper suggests 0.9997 momentum, for TensorFlow. Equivalent PyTorch momentum is
# 1.0 - tensorflow.
_BN_MOMENTUM = 1 - 0.9997
_FIRST_DEPTH = 32
_MOBILENET_V2_FILTERS = [16, 24, 32, 64, 96, 160, 320]
_MOBILENET_V2_NUM_LAYERS = [1, 2, 3, 4, 3, 3, 1]
class _ResidualBlock(nn.Module):
def __init__(self, net):
super().__init__()
self.net = net
def forward(self, x):
return self.net(x) + x
class _InvertedResidual(nn.Module):
def __init__(self, in_ch, out_ch, kernel_size, stride, expansion_factor, skip, bn_momentum=0.1):
super(_InvertedResidual, self).__init__()
assert stride in [1, 2]
assert kernel_size in [3, 5]
mid_ch = in_ch * expansion_factor
self.apply_residual = skip and in_ch == out_ch and stride == 1
self.layers = nn.Sequential(
# Pointwise
nn.Conv2d(in_ch, mid_ch, 1, bias=False),
nn.BatchNorm2d(mid_ch, momentum=bn_momentum),
nn.ReLU(inplace=True),
# Depthwise
nn.Conv2d(mid_ch, mid_ch, kernel_size, padding=kernel_size // 2,
stride=stride, groups=mid_ch, bias=False),
nn.BatchNorm2d(mid_ch, momentum=bn_momentum),
nn.ReLU(inplace=True),
# Linear pointwise. Note that there's no activation.
nn.Conv2d(mid_ch, out_ch, 1, bias=False),
nn.BatchNorm2d(out_ch, momentum=bn_momentum))
'''self.cells = nn.ModuleList()
for i in range(5):
self.cells.append(nn.Conv2d(2,2,2))'''
#self.x = 10
def forward(self, input):
'''out = input
x = 0
for i, each in enumerate(self.cells):
out = each(out)
if i == 1:
out = F.relu(out)
x += i
y = out
for i in range(3):
y = y + out'''
'''out = self.cells[0](out)
out = self.cells[1](out)
out = self.cells[2](out)
out = self.cells[3](out)
out = self.cells[4](out)'''
if self.apply_residual:
ret = self.layers(input) + input
else:
ret = self.layers(input)
return ret
def _stack_inverted_residual(in_ch, out_ch, kernel_size, skip, stride, exp_factor, repeats, bn_momentum):
""" Creates a stack of inverted residuals. """
assert repeats >= 1
# First one has no skip, because feature map size changes.
first = _InvertedResidual(in_ch, out_ch, kernel_size, stride, exp_factor, skip, bn_momentum=bn_momentum)
remaining = []
for _ in range(1, repeats):
remaining.append(_InvertedResidual(out_ch, out_ch, kernel_size, 1, exp_factor, skip, bn_momentum=bn_momentum))
return nn.Sequential(first, *remaining)
def _stack_normal_conv(in_ch, out_ch, kernel_size, skip, dconv, stride, repeats, bn_momentum):
assert repeats >= 1
stack = []
for i in range(repeats):
s = stride if i == 0 else 1
if dconv:
modules = [
nn.Conv2d(in_ch, in_ch, kernel_size, padding=kernel_size // 2, stride=s, groups=in_ch, bias=False),
nn.BatchNorm2d(in_ch, momentum=bn_momentum),
nn.ReLU(inplace=True),
nn.Conv2d(in_ch, out_ch, 1, padding=0, stride=1, bias=False),
nn.BatchNorm2d(out_ch, momentum=bn_momentum)
]
else:
modules = [
nn.Conv2d(in_ch, out_ch, kernel_size, padding=kernel_size // 2, stride=s, bias=False),
nn.ReLU(inplace=True),
nn.BatchNorm2d(out_ch, momentum=bn_momentum)
]
if skip and in_ch == out_ch and s == 1:
# use different implementation for skip and noskip to align with pytorch
stack.append(_ResidualBlock(nn.Sequential(*modules)))
else:
stack += modules
in_ch = out_ch
return stack
def _round_to_multiple_of(val, divisor, round_up_bias=0.9):
""" Asymmetric rounding to make `val` divisible by `divisor`. With default
bias, will round up, unless the number is no more than 10% greater than the
smaller divisible value, i.e. (83, 8) -> 80, but (84, 8) -> 88. """
assert 0.0 < round_up_bias < 1.0
new_val = max(divisor, int(val + divisor / 2) // divisor * divisor)
return new_val if new_val >= round_up_bias * val else new_val + divisor
def _get_depths(depths, alpha):
""" Scales tensor depths as in reference MobileNet code, prefers rouding up
rather than down. """
return [_round_to_multiple_of(depth * alpha, 8) for depth in depths]
class MNASNet(nn.Module):
""" MNASNet, as described in https://arxiv.org/pdf/1807.11626.pdf. This
implements the B1 variant of the model.
>>> model = MNASNet(1000, 1.0)
>>> x = torch.rand(1, 3, 224, 224)
>>> y = model(x)
>>> y.dim()
1
>>> y.nelement()
1000
"""
# Version 2 adds depth scaling in the initial stages of the network.
_version = 2
def __init__(self, alpha, depths, convops, kernel_sizes, num_layers,
skips, num_classes=1000, dropout=0.2):
super(MNASNet, self).__init__()
assert alpha > 0.0
assert len(depths) == len(convops) == len(kernel_sizes) == len(num_layers) == len(skips) == 7
self.alpha = alpha
self.num_classes = num_classes
depths = _get_depths([_FIRST_DEPTH] + depths, alpha)
exp_ratios = [3, 3, 3, 6, 6, 6, 6]
strides = [1, 2, 2, 2, 1, 2, 1]
layers = [
# First layer: regular conv.
nn.Conv2d(3, depths[0], 3, padding=1, stride=2, bias=False),
nn.BatchNorm2d(depths[0], momentum=_BN_MOMENTUM),
nn.ReLU(inplace=True),
]
count = 0
for conv, prev_depth, depth, ks, skip, stride, repeat, exp_ratio in \
zip(convops, depths[:-1], depths[1:], kernel_sizes, skips, strides, num_layers, exp_ratios):
if conv == "mconv":
# MNASNet blocks: stacks of inverted residuals.
layers.append(_stack_inverted_residual(prev_depth, depth, ks, skip,
stride, exp_ratio, repeat, _BN_MOMENTUM))
else:
# Normal conv and depth-separated conv
layers += _stack_normal_conv(prev_depth, depth, ks, skip, conv == "dconv",
stride, repeat, _BN_MOMENTUM)
count += 1
if count >= 2:
break
layers += [
# Final mapping to classifier input.
nn.Conv2d(depths[7], 1280, 1, padding=0, stride=1, bias=False),
nn.BatchNorm2d(1280, momentum=_BN_MOMENTUM),
nn.ReLU(inplace=True),
]
self.layers = nn.Sequential(*layers)
self.classifier = nn.Sequential(nn.Dropout(p=dropout, inplace=True),
nn.Linear(1280, num_classes))
self._initialize_weights()
#self.for_test = 10
def forward(self, x):
#if self.for_test == 10:
x = self.layers(x)
# Equivalent to global avgpool and removing H and W dimensions.
x = x.mean([2, 3])
x = F.relu(x)
return self.classifier(x)
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
torch_nn.init.kaiming_normal_(m.weight, mode="fan_out",
nonlinearity="relu")
if m.bias is not None:
torch_nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
torch_nn.init.ones_(m.weight)
torch_nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
torch_nn.init.kaiming_uniform_(m.weight, mode="fan_out",
nonlinearity="sigmoid")
torch_nn.init.zeros_(m.bias)
def test_model(model):
model(torch.randn(2, 3, 224, 224))
#====================Training approach
'''
import sdk
from sdk.mutators.builtin_mutators import ModuleMutator
import datasets
class ModelTrain(sdk.Trainer):
def __init__(self, device='cuda'):
super(ModelTrain, self).__init__()
self.device = torch.device(device)
self.data_provider = datasets.ImagenetDataProvider(save_path="/data/v-yugzh/imagenet",
train_batch_size=32,
test_batch_size=32,
valid_size=None,
n_worker=4,
resize_scale=0.08,
distort_color='normal')
def train_dataloader(self):
return self.data_provider.train
def val_dataloader(self):
return self.data_provider.valid
'''
#====================Experiment config
# mnasnet0_5
ir_module = _InvertedResidual(16, 16, 3, 1, 1, True)
\ No newline at end of file
...@@ -10,6 +10,7 @@ _logger = logging.getLogger(__name__) ...@@ -10,6 +10,7 @@ _logger = logging.getLogger(__name__)
class BlockMutator(Mutator): class BlockMutator(Mutator):
def __init__(self, target: str): def __init__(self, target: str):
super(BlockMutator, self).__init__()
self.target = target self.target = target
def mutate(self, model): def mutate(self, model):
......
...@@ -4,39 +4,38 @@ import random ...@@ -4,39 +4,38 @@ import random
import os import os
from nni.retiarii import Model, submit_models, wait_models from nni.retiarii import Model, submit_models, wait_models
from nni.retiarii import get_base_model_ir, get_specified_mutators, get_trainer from nni.retiarii.strategy import BaseStrategy
from nni.retiarii import Sampler from nni.retiarii import Sampler
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
class RandomSampler(Sampler): class RandomSampler(Sampler):
def choice(self, candidates, mutator, model, index): def choice(self, candidates, mutator, model, index):
return random.choice(candidates) return random.choice(candidates)
def simple_startegy(): class SimpleStrategy(BaseStrategy):
try: def __init__(self):
_logger.info('stargety start...') self.name = ''
while True:
model = get_base_model_ir()
_logger.info('apply mutators...')
applied_mutators = get_specified_mutators()
_logger.info('mutators: {}'.format(applied_mutators))
random_sampler = RandomSampler()
for mutator in applied_mutators:
_logger.info('mutate model...')
mutator.bind_sampler(random_sampler)
model = mutator.apply(model)
# get and apply training approach
_logger.info('apply training approach...')
trainer = get_trainer()
model.apply_trainer(trainer['modulename'], trainer['args'])
# run models
submit_models(model)
wait_models(model)
_logger.info('Strategy says:', model.metric)
except Exception as e:
_logger.error(logging.exception('message'))
if __name__ == '__main__': def run(self, base_model, applied_mutators, trainer):
simple_startegy() try:
_logger.info('stargety start...')
while True:
model = base_model
_logger.info('apply mutators...')
_logger.info('mutators: {}'.format(applied_mutators))
random_sampler = RandomSampler()
for mutator in applied_mutators:
_logger.info('mutate model...')
mutator.bind_sampler(random_sampler)
model = mutator.apply(model)
# get and apply training approach
_logger.info('apply training approach...')
model.apply_trainer(trainer['modulename'], trainer['args'])
# run models
submit_models(model)
wait_models(model)
_logger.info('Strategy says:', model.metric)
except Exception as e:
_logger.error(logging.exception('message'))
...@@ -6,11 +6,19 @@ from pathlib import Path ...@@ -6,11 +6,19 @@ from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[2])) sys.path.append(str(Path(__file__).resolve().parents[2]))
from nni.retiarii.converter.graph_gen import convert_to_graph from nni.retiarii.converter.graph_gen import convert_to_graph
from nni.retiarii.converter.visualize import visualize_model from nni.retiarii.converter.visualize import visualize_model
from nni.retiarii import nn
from nni.retiarii.codegen.pytorch import model_to_pytorch_script from nni.retiarii.codegen.pytorch import model_to_pytorch_script
from nni.retiarii import nn
from nni.retiarii.trainer import PyTorchImageClassificationTrainer
from nni.retiarii.utils import TraceClassArguments
from base_mnasnet import MNASNet from base_mnasnet import MNASNet
from nni.experiment import Experiment from nni.experiment import RetiariiExperiment, RetiariiExpConfig
#from simple_strategy import SimpleStrategy
#from tpe_strategy import TPEStrategy
from nni.retiarii.strategies import TPEStrategy
from mutator import BlockMutator
if __name__ == '__main__': if __name__ == '__main__':
_DEFAULT_DEPTHS = [16, 24, 40, 80, 96, 192, 320] _DEFAULT_DEPTHS = [16, 24, 40, 80, 96, 192, 320]
...@@ -18,54 +26,37 @@ if __name__ == '__main__': ...@@ -18,54 +26,37 @@ if __name__ == '__main__':
_DEFAULT_SKIPS = [False, True, True, True, True, True, True] _DEFAULT_SKIPS = [False, True, True, True, True, True, True]
_DEFAULT_KERNEL_SIZES = [3, 3, 5, 5, 3, 5, 3] _DEFAULT_KERNEL_SIZES = [3, 3, 5, 5, 3, 5, 3]
_DEFAULT_NUM_LAYERS = [1, 3, 3, 3, 2, 4, 1] _DEFAULT_NUM_LAYERS = [1, 3, 3, 3, 2, 4, 1]
nn.enable_record_args()
base_model = MNASNet(0.5, _DEFAULT_DEPTHS, _DEFAULT_CONVOPS, _DEFAULT_KERNEL_SIZES, with TraceClassArguments() as tca:
_DEFAULT_NUM_LAYERS, _DEFAULT_SKIPS) base_model = MNASNet(0.5, _DEFAULT_DEPTHS, _DEFAULT_CONVOPS, _DEFAULT_KERNEL_SIZES,
recorded_module_args = nn.get_records() _DEFAULT_NUM_LAYERS, _DEFAULT_SKIPS)
nn.disable_record_args() trainer = PyTorchImageClassificationTrainer(base_model, dataset_cls="CIFAR10",
print(recorded_module_args) dataset_kwargs={"root": "data/cifar10", "download": True},
script_module = torch.jit.script(base_model) dataloader_kwargs={"batch_size": 32},
model = convert_to_graph(script_module, base_model, recorded_module_args) optimizer_kwargs={"lr": 1e-3},
#code_script = model_to_pytorch_script(model) trainer_kwargs={"max_epochs": 1})
#print(code_script)
'''script_module = torch.jit.script(base_model)
model = convert_to_graph(script_module, base_model, tca.recorded_arguments)
code_script = model_to_pytorch_script(model)
print(code_script)
print("Model: ", model) print("Model: ", model)
graph_ir = model._dump() graph_ir = model._dump()
print(graph_ir) print(graph_ir)
#visualize_model(graph_ir) visualize_model(graph_ir)'''
# new interface
applied_mutators = []
applied_mutators.append(BlockMutator('mutable_0'))
applied_mutators.append(BlockMutator('mutable_1'))
simple_startegy = TPEStrategy()
exp = RetiariiExperiment(base_model, trainer, applied_mutators, simple_startegy, tca)
# TODO: new interface exp_config = RetiariiExpConfig.create_template('local')
#exp = Experiment() exp_config.experiment_name = 'mnasnet_search'
#exp.start_retiarii_experiment(base_model, training_approach, exp_config.trial_concurrency = 2
# applied_mutators, strategy, exp_config.max_trial_number = 10
# exp_config)
exp_config = {'authorName': 'nni', exp.run(exp_config, 8081, debug=True)
'experimentName': 'naive',
'trialConcurrency': 3,
'maxExecDuration': '1h',
'maxTrialNum': 10,
'trainingServicePlatform': 'local'
}
applied_mutators = [{'filepath': os.path.join(os.getcwd(), 'mutator.py'), 'classname': 'BlockMutator', 'args': {'target': 'mutable_0'}},
{'filepath': os.path.join(os.getcwd(), 'mutator.py'), 'classname': 'BlockMutator', 'args': {'target': 'mutable_1'}}]
training_approach = {'modulename': 'nni.retiarii.trainer.PyTorchImageClassificationTrainer', 'args': {
"dataset_cls": "CIFAR10",
"dataset_kwargs": {
"root": "data/cifar10",
"download": True
},
"dataloader_kwargs": {
"batch_size": 32
},
"optimizer_kwargs": {
"lr": 1e-3
},
"trainer_kwargs": {
"max_epochs": 1
}
}}
strategy = {'filename': 'simple_strategy', 'funcname': 'simple_startegy', 'args': {}}
exp = Experiment()
exp.tmp_start_retiarii(graph_ir, training_approach,
applied_mutators, strategy,
exp_config)
\ No newline at end of file
...@@ -17,9 +17,10 @@ class ExperimentStartupInfo { ...@@ -17,9 +17,10 @@ class ExperimentStartupInfo {
private logDir: string = ''; private logDir: string = '';
private logLevel: string = ''; private logLevel: string = '';
private readonly: boolean = false; private readonly: boolean = false;
private dispatcherPipe: string | null = null;
private platform: string = ''; private platform: string = '';
public setStartupInfo(newExperiment: boolean, experimentId: string, basePort: number, platform: string, logDir?: string, logLevel?: string, readonly?: boolean): void { public setStartupInfo(newExperiment: boolean, experimentId: string, basePort: number, platform: string, logDir?: string, logLevel?: string, readonly?: boolean, dispatcherPipe?: string): void {
assert(!this.initialized); assert(!this.initialized);
assert(experimentId.trim().length > 0); assert(experimentId.trim().length > 0);
this.newExperiment = newExperiment; this.newExperiment = newExperiment;
...@@ -41,6 +42,10 @@ class ExperimentStartupInfo { ...@@ -41,6 +42,10 @@ class ExperimentStartupInfo {
if (readonly !== undefined) { if (readonly !== undefined) {
this.readonly = readonly; this.readonly = readonly;
} }
if (dispatcherPipe != undefined && dispatcherPipe.length > 0) {
this.dispatcherPipe = dispatcherPipe;
}
} }
public getExperimentId(): string { public getExperimentId(): string {
...@@ -84,6 +89,11 @@ class ExperimentStartupInfo { ...@@ -84,6 +89,11 @@ class ExperimentStartupInfo {
return this.readonly; return this.readonly;
} }
public getDispatcherPipe(): string | null {
assert(this.initialized);
return this.dispatcherPipe;
}
} }
function getExperimentId(): string { function getExperimentId(): string {
...@@ -107,16 +117,20 @@ function getExperimentStartupInfo(): ExperimentStartupInfo { ...@@ -107,16 +117,20 @@ function getExperimentStartupInfo(): ExperimentStartupInfo {
} }
function setExperimentStartupInfo( function setExperimentStartupInfo(
newExperiment: boolean, experimentId: string, basePort: number, platform: string, logDir?: string, logLevel?: string, readonly?: boolean): void { newExperiment: boolean, experimentId: string, basePort: number, platform: string, logDir?: string, logLevel?: string, readonly?: boolean, dispatcherPipe?: string): void {
component.get<ExperimentStartupInfo>(ExperimentStartupInfo) component.get<ExperimentStartupInfo>(ExperimentStartupInfo)
.setStartupInfo(newExperiment, experimentId, basePort, platform, logDir, logLevel, readonly); .setStartupInfo(newExperiment, experimentId, basePort, platform, logDir, logLevel, readonly, dispatcherPipe);
} }
function isReadonly(): boolean { function isReadonly(): boolean {
return component.get<ExperimentStartupInfo>(ExperimentStartupInfo).isReadonly(); return component.get<ExperimentStartupInfo>(ExperimentStartupInfo).isReadonly();
} }
function getDispatcherPipe(): string | null {
return component.get<ExperimentStartupInfo>(ExperimentStartupInfo).getDispatcherPipe();
}
export { export {
ExperimentStartupInfo, getBasePort, getExperimentId, isNewExperiment, getPlatform, getExperimentStartupInfo, ExperimentStartupInfo, getBasePort, getExperimentId, isNewExperiment, getPlatform, getExperimentStartupInfo,
setExperimentStartupInfo, isReadonly setExperimentStartupInfo, isReadonly, getDispatcherPipe
}; };
...@@ -126,7 +126,10 @@ class Logger { ...@@ -126,7 +126,10 @@ class Logger {
*/ */
private log(level: string, param: any[]): void { private log(level: string, param: any[]): void {
if (!this.readonly) { if (!this.readonly) {
const logContent = `[${(new Date()).toLocaleString()}] ${level} ${format(param)}\n`; const time = new Date();
const localTime = new Date(time.getTime() - time.getTimezoneOffset() * 60000);
const timeStr = localTime.toISOString().slice(0, -5).replace('T', ' ');
const logContent = `[${timeStr}] ${level} ${format(param)}\n`;
if (this.writable && this.bufferSerialEmitter) { if (this.writable && this.bufferSerialEmitter) {
const buffer: WritableStreamBuffer = new WritableStreamBuffer(); const buffer: WritableStreamBuffer = new WritableStreamBuffer();
buffer.write(logContent); buffer.write(logContent);
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
import * as assert from 'assert'; import * as assert from 'assert';
import { ChildProcess } from 'child_process'; import { ChildProcess } from 'child_process';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import * as net from 'net';
import { Readable, Writable } from 'stream'; import { Readable, Writable } from 'stream';
import { NNIError } from '../common/errors'; import { NNIError } from '../common/errors';
import { getLogger, Logger } from '../common/log'; import { getLogger, Logger } from '../common/log';
...@@ -62,10 +63,10 @@ class IpcInterface { ...@@ -62,10 +63,10 @@ class IpcInterface {
* @param proc the process to wrap * @param proc the process to wrap
* @param acceptCommandTypes set of accepted commands for this process * @param acceptCommandTypes set of accepted commands for this process
*/ */
constructor(proc: ChildProcess, acceptCommandTypes: Set<string>) { constructor(outStream: Writable, inStream: Readable, acceptCommandTypes: Set<string>) {
this.acceptCommandTypes = acceptCommandTypes; this.acceptCommandTypes = acceptCommandTypes;
this.outgoingStream = <Writable>proc.stdio[ipcOutgoingFd]; this.outgoingStream = outStream;
this.incomingStream = <Readable>proc.stdio[ipcIncomingFd]; this.incomingStream = inStream;
this.eventEmitter = new EventEmitter(); this.eventEmitter = new EventEmitter();
this.readBuffer = Buffer.alloc(0); this.readBuffer = Buffer.alloc(0);
...@@ -132,7 +133,14 @@ class IpcInterface { ...@@ -132,7 +133,14 @@ class IpcInterface {
* @param process_ the tuner process * @param process_ the tuner process
*/ */
function createDispatcherInterface(process: ChildProcess): IpcInterface { function createDispatcherInterface(process: ChildProcess): IpcInterface {
return new IpcInterface(process, new Set([...CommandType.TUNER_COMMANDS, ...CommandType.ASSESSOR_COMMANDS])); const outStream = <Writable>process.stdio[ipcOutgoingFd];
const inStream = <Readable>process.stdio[ipcIncomingFd];
return new IpcInterface(outStream, inStream, new Set([...CommandType.TUNER_COMMANDS, ...CommandType.ASSESSOR_COMMANDS]));
} }
export { IpcInterface, createDispatcherInterface, encodeCommand, decodeCommand }; function createDispatcherPipeInterface(pipePath: string): IpcInterface {
const client = net.createConnection(pipePath);
return new IpcInterface(client, client, new Set([...CommandType.TUNER_COMMANDS, ...CommandType.ASSESSOR_COMMANDS]));
}
export { IpcInterface, createDispatcherInterface, createDispatcherPipeInterface, encodeCommand, decodeCommand };
...@@ -9,7 +9,7 @@ import { Deferred } from 'ts-deferred'; ...@@ -9,7 +9,7 @@ import { Deferred } from 'ts-deferred';
import * as component from '../common/component'; import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore'; import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
import { NNIError } from '../common/errors'; import { NNIError } from '../common/errors';
import { getExperimentId } from '../common/experimentStartupInfo'; import { getExperimentId, getDispatcherPipe } from '../common/experimentStartupInfo';
import { getLogger, Logger } from '../common/log'; import { getLogger, Logger } from '../common/log';
import { import {
ExperimentParams, ExperimentProfile, Manager, ExperimentStatus, ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
...@@ -24,7 +24,7 @@ import { ...@@ -24,7 +24,7 @@ import {
INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING, INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
} from './commands'; } from './commands';
import { createDispatcherInterface, IpcInterface } from './ipcInterface'; import { createDispatcherInterface, createDispatcherPipeInterface, IpcInterface } from './ipcInterface';
/** /**
* NNIManager which implements Manager interface * NNIManager which implements Manager interface
...@@ -71,6 +71,11 @@ class NNIManager implements Manager { ...@@ -71,6 +71,11 @@ class NNIManager implements Manager {
this.criticalError(NNIError.FromError(err, 'Job metrics error: ')); this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
}); });
}; };
const pipe = getDispatcherPipe();
if (pipe !== null) {
this.dispatcher = createDispatcherPipeInterface(pipe);
}
} }
public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> { public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
...@@ -694,7 +699,7 @@ class NNIManager implements Manager { ...@@ -694,7 +699,7 @@ class NNIManager implements Manager {
} }
private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> { private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
this.log.debug(`NNIManager received trial job metrics: ${metric}`); this.log.debug(`NNIManager received trial job metrics: ${JSON.stringify(metric)}`);
if (this.trialJobs.has(metric.id)){ if (this.trialJobs.has(metric.id)){
await this.dataStore.storeMetricData(metric.id, metric.data); await this.dataStore.storeMetricData(metric.id, metric.data);
if (this.dispatcher === undefined) { if (this.dispatcher === undefined) {
......
...@@ -30,9 +30,9 @@ import { DLTSTrainingService } from './training_service/dlts/dltsTrainingService ...@@ -30,9 +30,9 @@ import { DLTSTrainingService } from './training_service/dlts/dltsTrainingService
function initStartupInfo( function initStartupInfo(
startExpMode: string, experimentId: string, basePort: number, platform: string, startExpMode: string, experimentId: string, basePort: number, platform: string,
logDirectory: string, experimentLogLevel: string, readonly: boolean): void { logDirectory: string, experimentLogLevel: string, readonly: boolean, dispatcherPipe: string): void {
const createNew: boolean = (startExpMode === ExperimentStartUpMode.NEW); const createNew: boolean = (startExpMode === ExperimentStartUpMode.NEW);
setExperimentStartupInfo(createNew, experimentId, basePort, platform, logDirectory, experimentLogLevel, readonly); setExperimentStartupInfo(createNew, experimentId, basePort, platform, logDirectory, experimentLogLevel, readonly, dispatcherPipe);
} }
async function initContainer(foreground: boolean, platformMode: string, logFileName?: string): Promise<void> { async function initContainer(foreground: boolean, platformMode: string, logFileName?: string): Promise<void> {
...@@ -163,7 +163,9 @@ if (!('true' || 'false').includes(readonlyArg.toLowerCase())) { ...@@ -163,7 +163,9 @@ if (!('true' || 'false').includes(readonlyArg.toLowerCase())) {
} }
const readonly = readonlyArg.toLowerCase() == 'true' ? true : false; const readonly = readonlyArg.toLowerCase() == 'true' ? true : false;
initStartupInfo(startMode, experimentId, port, mode, logDir, logLevel, readonly); const dispatcherPipe: string = parseArg(['--dispatcher_pipe']);
initStartupInfo(startMode, experimentId, port, mode, logDir, logLevel, readonly, dispatcherPipe);
mkDirP(getLogDir()) mkDirP(getLogDir())
.then(async () => { .then(async () => {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment