Unverified Commit 1338c512 authored by J-shang's avatar J-shang Committed by GitHub
Browse files

support shared storage for reusable mode (#3354)

parent 715b1899
......@@ -823,6 +823,79 @@ Optional. Bool. default: ``false``. It's an experimental feature.
If it's true, NNI will reuse OpenPAI jobs to run as many as possible trials. It can save time of creating new jobs. User needs to make sure each trial can run independent in same job, for example, avoid loading checkpoint from previous trials.
sharedStorage
^^^^^^^^^^^^^
storageType
^^^^^^^^^^^
Required. String.
The type of the storage, support ``NFS`` and ``AzureBlob``.
localMountPoint
^^^^^^^^^^^^^^^
Required. String.
The absolute path that the storage has been or will be mounted in local.
remoteMountPoint
^^^^^^^^^^^^^^^^
Required. String.
The absolute path that the storage will be mounted in remote.
localMounted
^^^^^^^^^^^^
Required. String.
One of ``usermount``, ``nnimount`` or ``nomount``. ``usermount`` means you have already mount this storage on localMountPoint. ``nnimount`` means nni will try to mount this storage on localMountPoint. ``nomount`` means storage will not mount in local machine, will support partial storages in the future.
nfsServer
^^^^^^^^^
Optional. String.
Required if using NFS storage. The NFS server host.
exportedDirectory
^^^^^^^^^^^^^^^^^
Optional. String.
Required if using NFS storage. The exported directory of NFS server.
storageAccountName
^^^^^^^^^^^^^^^^^^
Optional. String.
Required if using AzureBlob storage. The azure storage account name.
storageAccountKey
^^^^^^^^^^^^^^^^^
Optional. String.
Required if using AzureBlob storage and ``resourceGroupName`` not set. The azure storage account key.
resourceGroupName
^^^^^^^^^^^^^^^^^
Optional. String.
Required if using AzureBlob storage and ``storageAccountKey`` not set. The resource group that AzureBlob container belongs to.
containerName
^^^^^^^^^^^^^
Optional. String.
Required if using AzureBlob storage. The AzureBlob container name.
Examples
--------
......
**How to Use Shared Storage**
=============================
If you want to use your own storage during using NNI, shared storage can satisfy you.
Instead of using training service native storage, shared storage can bring you more convenience.
All the information generated by the experiment will be stored under ``/nni`` folder in your shared storage.
All the output produced by the trial will be located under ``/nni/{EXPERIMENT_ID}/trials/{TRIAL_ID}/nnioutput`` folder in your shared storage.
This saves you from finding for experiment-related information in various places.
Remember that your trial working directory is ``/nni/{EXPERIMENT_ID}/trials/{TRIAL_ID}``, so if you upload your data in this shared storage, you can open it like a local file in your trial code without downloading it.
And we will develop more practical features in the future based on shared storage.
.. note::
Shared storage is currently in the experimental stage. We suggest use AzureBlob under Ubuntu/CentOS/RHEL, and NFS under Ubuntu/CentOS/RHEL/Fedora/Debian for remote.
And make sure your local machine can mount NFS or fuse AzureBlob and has ``sudo`` permission on your remote runtime. We only support shared storage under training service with reuse mode for now.
Example
-------
If you want to use AzureBlob, add below to your config. Full config file see :githublink:`mnist-sharedstorage/config_azureblob.yml <examples/trials/mnist-sharedstorage/config_azureblob.yml>`.
.. code-block:: yaml
sharedStorage:
storageType: AzureBlob
localMountPoint: ${your/local/mount/point}
remoteMountPoint: ${your/remote/mount/point}
storageAccountName: ${replace_to_your_storageAccountName}
storageAccountKey: ${replace_to_your_storageAccountKey}
# If you did not set storageAccountKey, you need use `az login` with Azure CLI at first and set resourceGroupName.
# resourceGroupName: ${replace_to_your_resourceGroupName}
containerName: ${replace_to_your_containerName}
# usermount means you have already mount this storage on localMountPoint
# nnimount means nni will try to mount this storage on localMountPoint
# nomount means storage will not mount in local machine, will support partial storages in the future
localMounted: nnimount
If you want to use NFS, add below to your config. Full config file see :githublink:`mnist-sharedstorage/config_nfs.yml <examples/trials/mnist-sharedstorage/config_nfs.yml>`.
.. code-block:: yaml
sharedStorage:
storageType: NFS
localMountPoint: ${your/local/mount/point}
remoteMountPoint: ${your/remote/mount/point}
nfsServer: ${nfs-server-ip}
exportedDirectory: ${nfs/exported/directory}
# usermount means you have already mount this storage on localMountPoint
# nnimount means nni will try to mount this storage on localMountPoint
# nomount means storage will not mount in local machine, will support partial storages in the future
localMounted: nnimount
......@@ -12,3 +12,4 @@ References
SDK API References <sdk_reference>
Supported Framework Library <SupportedFramework_Library>
Launch from python <Tutorial/HowToLaunchFromPython>
Shared Storage <Tutorial/HowToUseSharedStorage>
authorName: default
experimentName: example_mnist
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
trainingServicePlatform: aml
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
image: msranni/nni
amlConfig:
subscriptionId: ${replace_to_your_subscriptionId}
resourceGroup: ${replace_to_your_resourceGroup}
workspaceName: ${replace_to_your_workspaceName}
computeTarget: ${replace_to_your_computeTarget}
sharedStorage:
storageType: AzureBlob
localMountPoint: ${your/local/mount/point}
remoteMountPoint: ${your/remote/mount/point}
storageAccountName: ${replace_to_your_storageAccountName}
storageAccountKey: ${replace_to_your_storageAccountKey}
# If you did not set storageAccountKey, you need use `az login` with Azure CLI at first and set resourceGroupName.
# resourceGroupName: ${replace_to_your_resourceGroupName}
containerName: ${replace_to_your_containerName}
# usermount means you have already mount this storage on localMountPoint
# nnimount means nni will try to mount this storage on localMountPoint
# nomount means storage will not mount in local machine, will support partial storages in the future
localMounted: nnimount
\ No newline at end of file
authorName: default
experimentName: example_mnist
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
trainingServicePlatform: aml
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
image: msranni/nni
amlConfig:
subscriptionId: ${replace_to_your_subscriptionId}
resourceGroup: ${replace_to_your_resourceGroup}
workspaceName: ${replace_to_your_workspaceName}
computeTarget: ${replace_to_your_computeTarget}
sharedStorage:
storageType: NFS
localMountPoint: ${your/local/mount/point}
remoteMountPoint: ${your/remote/mount/point}
nfsServer: ${nfs-server-ip}
exportedDirectory: ${nfs/exported/directory}
# usermount means you have already mount this storage on localMountPoint
# nnimount means nni will try to mount this storage on localMountPoint
# nomount means storage will not mount in local machine, will support partial storages in the future
localMounted: nnimount
\ No newline at end of file
"""
A deep MNIST classifier using convolutional layers.
This file is a modification of the official pytorch mnist example:
https://github.com/pytorch/examples/blob/master/mnist/main.py
"""
import os
import argparse
import logging
import nni
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from nni.utils import merge_parameter
from torchvision import datasets, transforms
logger = logging.getLogger('mnist_AutoML')
class Net(nn.Module):
def __init__(self, hidden_size):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4*4*50, hidden_size)
self.fc2 = nn.Linear(hidden_size, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 4*4*50)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
if (args['batch_num'] is not None) and batch_idx >= args['batch_num']:
break
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args['log_interval'] == 0:
logger.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
def test(args, model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
# sum up batch loss
test_loss += F.nll_loss(output, target, reduction='sum').item()
# get the index of the max log-probability
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
accuracy = 100. * correct / len(test_loader.dataset)
logger.info('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset), accuracy))
return accuracy
def main(args):
use_cuda = not args['no_cuda'] and torch.cuda.is_available()
torch.manual_seed(args['seed'])
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
data_dir = args['data_dir']
train_loader = torch.utils.data.DataLoader(
datasets.MNIST(data_dir, train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=args['batch_size'], shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(data_dir, train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=1000, shuffle=True, **kwargs)
hidden_size = args['hidden_size']
model = Net(hidden_size=hidden_size).to(device)
optimizer = optim.SGD(model.parameters(), lr=args['lr'],
momentum=args['momentum'])
for epoch in range(1, args['epochs'] + 1):
train(args, model, device, train_loader, optimizer, epoch)
test_acc = test(args, model, device, test_loader)
# report intermediate result
nni.report_intermediate_result(test_acc)
logger.debug('test accuracy %g', test_acc)
logger.debug('Pipe send intermediate result done.')
# report final result
nni.report_final_result(test_acc)
logger.debug('Final result is %g', test_acc)
logger.debug('Send final result done.')
def get_params():
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument("--data_dir", type=str,
default='./data', help="data directory")
parser.add_argument('--batch_size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument("--batch_num", type=int, default=None)
parser.add_argument("--hidden_size", type=int, default=512, metavar='N',
help='hidden layer size (default: 512)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--no_cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--log_interval', type=int, default=1000, metavar='N',
help='how many batches to wait before logging training status')
args, _ = parser.parse_known_args()
return args
if __name__ == '__main__':
try:
# get parameters form tuner
tuner_params = nni.get_next_parameter()
logger.debug(tuner_params)
params = vars(merge_parameter(get_params(), tuner_params))
print(params)
main(params)
except Exception as exception:
logger.exception(exception)
raise
{
"batch_size": {"_type":"choice", "_value": [16, 32, 64, 128]},
"hidden_size":{"_type":"choice","_value":[128, 256, 512, 1024]},
"lr":{"_type":"choice","_value":[0.0001, 0.001, 0.01, 0.1]},
"momentum":{"_type":"uniform","_value":[0, 1]}
}
......@@ -142,6 +142,17 @@ common_schema = {
Optional('gpuIndices'): Or(int, And(str, lambda x: len([int(i) for i in x.split(',')]) > 0), error='gpuIndex format error!'),
Optional('maxTrialNumPerGpu'): setType('maxTrialNumPerGpu', int),
Optional('useActiveGpu'): setType('useActiveGpu', bool)
},
Optional('sharedStorage'): {
'storageType': setChoice('storageType', 'NFS', 'AzureBlob'),
Optional('localMountPoint'): setType('localMountPoint', str),
Optional('remoteMountPoint'): setType('remoteMountPoint', str),
Optional('nfsServer'): setType('nfsServer', str),
Optional('storageAccountName'): setType('storageAccountName', str),
Optional('storageAccountKey'): setType('storageAccountKey', str),
Optional('containerName'): setType('containerName', str),
Optional('resourceGroupName'): setType('resourceGroupName', str),
Optional('localMounted'): setChoice('localMounted', 'usermount', 'nnimount', 'nomount')
}
}
......
......@@ -314,6 +314,19 @@ def set_hybrid_config(experiment_config, port, config_file_name):
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_shared_storage(experiment_config, port, config_file_name):
if 'sharedStorage' in experiment_config:
response = rest_put(cluster_metadata_url(port), json.dumps({'shared_storage_config': experiment_config['sharedStorage']}), REST_TIME_OUT)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
return True, None
def set_experiment(experiment_config, mode, port, config_file_name):
'''Call startExperiment (rest POST /experiment) with yaml file content'''
request_data = dict()
......@@ -442,6 +455,8 @@ def set_platform_config(platform, experiment_config, port, config_file_name, res
else:
raise Exception(ERROR_INFO % 'Unsupported platform!')
exit(1)
if config_result:
config_result, err_msg = set_shared_storage(experiment_config, port, config_file_name)
if config_result:
print_normal('Successfully set {0} config!'.format(platform))
else:
......
......@@ -623,8 +623,9 @@ class NNIManager implements Manager {
this.currSubmittedTrialNum++;
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
await this.storeExperimentProfile();
this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
this.trialJobs.set(trialJobDetail.id, Snapshot);
const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
if (trialJobDetailSnapshot != undefined) {
await this.dataStore.storeTrialJobEvent(
......
......@@ -191,6 +191,18 @@ export namespace ValidationSchemas {
}),
remote_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
reuse: joi.boolean()
}),
shared_storage_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
storageType: joi.string(),
localMountPoint: joi.string(),
remoteMountPoint: joi.string(),
nfsServer: joi.string(),
exportedDirectory: joi.string(),
storageAccountName: joi.string(),
storageAccountKey: joi.string(),
containerName: joi.string(),
resourceGroupName: joi.string(),
localMounted: joi.string()
})
}
};
......
......@@ -26,5 +26,6 @@ export enum TrialConfigMetadataKey {
LOG_COLLECTION = 'log_collection',
// Used to set platform for hybrid in reuse mode,
// temproarily change and will refactor config schema in the future
PLATFORM_LIST = 'platform_list'
PLATFORM_LIST = 'platform_list',
SHARED_STORAGE_CONFIG = 'shared_storage_config'
}
......@@ -79,6 +79,8 @@ export class EnvironmentInformation {
public environmentService?: EnvironmentService;
public useSharedStorage?: boolean;
constructor(id: string, name: string, envId?: string) {
this.log = getLogger();
this.id = id;
......
......@@ -16,6 +16,7 @@ import { AMLClusterConfig, AMLEnvironmentInformation, AMLTrialConfig } from '../
import { EnvironmentInformation, EnvironmentService } from '../environment';
import { EventEmitter } from "events";
import { AMLCommandChannel } from '../channels/amlCommandChannel';
import { SharedStorageService } from '../sharedStorage'
/**
......@@ -114,9 +115,20 @@ export class AMLEnvironmentService extends EnvironmentService {
}
const amlEnvironment: AMLEnvironmentInformation = environment as AMLEnvironmentInformation;
const environmentLocalTempFolder = path.join(this.experimentRootDir, "environment-temp");
environment.command = `import os\nos.system('mv envs outputs/envs && cd outputs && ${amlEnvironment.command}')`;
environment.useActiveGpu = this.amlClusterConfig.useActiveGpu;
environment.maxTrialNumberPerGpu = this.amlClusterConfig.maxTrialNumPerGpu;
if (!fs.existsSync(environmentLocalTempFolder)) {
await fs.promises.mkdir(environmentLocalTempFolder, {recursive: true});
}
if (amlEnvironment.useSharedStorage) {
const environmentRoot = component.get<SharedStorageService>(SharedStorageService).remoteWorkingRoot;
const remoteMountCommand = component.get<SharedStorageService>(SharedStorageService).remoteMountCommand;
amlEnvironment.command = `${remoteMountCommand} && cd ${environmentRoot} && ${amlEnvironment.command}`.replace(/"/g, `\\"`);
} else {
amlEnvironment.command = `mv envs outputs/envs && cd outputs && ${amlEnvironment.command}`;
}
amlEnvironment.command = `import os\nos.system('${amlEnvironment.command}')`;
amlEnvironment.useActiveGpu = this.amlClusterConfig.useActiveGpu;
amlEnvironment.maxTrialNumberPerGpu = this.amlClusterConfig.maxTrialNumPerGpu;
await fs.promises.writeFile(path.join(environmentLocalTempFolder, 'nni_script.py'), amlEnvironment.command, { encoding: 'utf8' });
const amlClient = new AMLClient(
this.amlClusterConfig.subscriptionId,
......
......@@ -14,6 +14,7 @@ import { EnvironmentInformation, EnvironmentService } from '../environment';
import { TrialConfig } from '../../common/trialConfig';
import { getExperimentRootDir, isAlive, getNewLine } from '../../../common/utils';
import { execMkdir, runScript, getScriptName, execCopydir } from '../../common/util';
import { SharedStorageService } from '../sharedStorage'
@component.Singleton
export class LocalEnvironmentService extends EnvironmentService {
......@@ -118,11 +119,22 @@ export class LocalEnvironmentService extends EnvironmentService {
if (this.localTrialConfig === undefined) {
throw new Error('Local trial config is not initialized');
}
const localTempFolder: string = path.join(this.experimentRootDir, "environment-temp", "envs");
// Need refactor, this temp folder path is not appropriate, there are two expId in this path
const sharedStorageService = component.get<SharedStorageService>(SharedStorageService);
if (environment.useSharedStorage && sharedStorageService.canLocalMounted) {
this.experimentRootDir = sharedStorageService.localWorkingRoot;
} else {
this.experimentRootDir = getExperimentRootDir();
}
const localEnvCodeFolder: string = path.join(this.experimentRootDir, "envs");
if (environment.useSharedStorage && !sharedStorageService.canLocalMounted) {
await sharedStorageService.storageService.copyDirectoryBack("envs", localEnvCodeFolder)
} else if (!environment.useSharedStorage) {
const localTempFolder: string = path.join(this.experimentRootDir, "environment-temp", "envs");
await execCopydir(localTempFolder, localEnvCodeFolder);
}
environment.runnerWorkingFolder = path.join(localEnvCodeFolder, environment.id);
await execMkdir(environment.runnerWorkingFolder);
await execCopydir(localTempFolder, localEnvCodeFolder);
environment.command = this.getScript(environment).join(getNewLine());
const scriptName: string = getScriptName('run');
await fs.promises.writeFile(path.join(localEnvCodeFolder, scriptName),
......
......@@ -14,6 +14,7 @@ import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { PAIClusterConfig } from '../../pai/paiConfig';
import { NNIPAITrialConfig } from '../../pai/paiConfig';
import { EnvironmentInformation, EnvironmentService } from '../environment';
import { SharedStorageService } from '../sharedStorage';
import { StorageService } from '../storageService';
......@@ -178,9 +179,15 @@ export class OpenPaiEnvironmentService extends EnvironmentService {
}
// Step 1. Prepare PAI job configuration
const environmentRoot = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}`;
let environmentRoot: string;
if (environment.useSharedStorage) {
environmentRoot = component.get<SharedStorageService>(SharedStorageService).remoteWorkingRoot;
environment.command = `${component.get<SharedStorageService>(SharedStorageService).remoteMountCommand} && cd ${environmentRoot} && ${environment.command}`;
} else {
environmentRoot = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}`;
environment.command = `cd ${environmentRoot} && ${environment.command}`;
}
environment.runnerWorkingFolder = `${environmentRoot}/envs/${environment.id}`;
environment.command = `cd ${environmentRoot} && ${environment.command}`;
environment.trackingUrl = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${environment.envId}`;
environment.useActiveGpu = this.paiClusterConfig.useActiveGpu;
environment.maxTrialNumberPerGpu = this.paiClusterConfig.maxTrialNumPerGpu;
......
......@@ -20,6 +20,7 @@ import {
} from '../../remote_machine/remoteMachineData';
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
import { RemoteMachineEnvironmentInformation } from '../remote/remoteConfig';
import { SharedStorageService } from '../sharedStorage'
@component.Singleton
......@@ -247,13 +248,20 @@ export class RemoteEnvironmentService extends EnvironmentService {
}
this.environmentExecutorManagerMap.set(environment.id, executorManager);
const executor = await this.getExecutor(environment.id);
environment.runnerWorkingFolder =
executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()),
'envs', environment.id)
if (environment.useSharedStorage) {
const environmentRoot = component.get<SharedStorageService>(SharedStorageService).remoteWorkingRoot;
environment.runnerWorkingFolder = executor.joinPath(environmentRoot, 'envs', environment.id)
const remoteMountCommand = component.get<SharedStorageService>(SharedStorageService).remoteMountCommand;
await executor.executeScript(remoteMountCommand, false, false);
} else {
environment.runnerWorkingFolder =
executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()),
'envs', environment.id)
}
environment.command = `cd ${environment.runnerWorkingFolder} && \
${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \
1>${environment.runnerWorkingFolder}/trialrunner_stdout 2>${environment.runnerWorkingFolder}/trialrunner_stderr \
&& echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`;
${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \
1>${environment.runnerWorkingFolder}/trialrunner_stdout 2>${environment.runnerWorkingFolder}/trialrunner_stderr \
&& echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`;
return Promise.resolve(true);
}
}
......
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import { StorageService } from './storageService'
export type SharedStorageType = 'NFS' | 'AzureBlob'
export type LocalMountedType = 'usermount' | 'nnimount' | 'nomount'
export interface SharedStorageConfig {
readonly storageType: SharedStorageType;
readonly localMountPoint?: string;
readonly remoteMountPoint: string;
}
export abstract class SharedStorageService {
public abstract config(key: string, value: string): Promise<void>;
public abstract get canLocalMounted(): boolean;
public abstract get storageService(): StorageService;
public abstract get localMountCommand(): string;
public abstract get remoteMountCommand(): string;
public abstract get localWorkingRoot(): string;
public abstract get remoteWorkingRoot(): string;
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as cpp from 'child-process-promise';
import * as path from 'path';
import { SharedStorageService, SharedStorageConfig, SharedStorageType, LocalMountedType } from '../sharedStorage'
import { MountedStorageService } from '../storages/mountedStorageService';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { getLogger, Logger } from '../../../common/log';
import { getExperimentId } from '../../../common/experimentStartupInfo';
const INSTALL_BLOBFUSE = `
#!/bin/bash
if [ -n "$(command -v blobfuse)" ]
then
exit 0
fi
if [ -n "$(command -v apt-get)" ]
then
sudo apt-get update
sudo apt-get install -y lsb-release
elif [ -n "$(command -v yum)" ]
then
sudo yum install -y redhat-lsb
else
echo "Unknown package management."
exit 1
fi
id=$(lsb_release -i | cut -c16- | sed s/[[:space:]]//g)
version=$(lsb_release -r | cut -c9- | sed s/[[:space:]]//g)
if [ $id = "Ubuntu" ]
then
wget https://packages.microsoft.com/config/ubuntu/$version/packages-microsoft-prod.deb
sudo dpkg -i packages-microsoft-prod.deb
sudo apt-get update
sudo apt-get install -y blobfuse fuse
elif [ $id = "CentOS" ] || [ $id = "RHEL" ]
then
sudo rpm -Uvh https://packages.microsoft.com/config/rhel/$(echo $version | cut -c1)/packages-microsoft-prod.rpm
sudo yum install -y blobfuse fuse
else
echo "Not support distributor."
exit 1
fi
`
class AzureBlobSharedStorageConfig implements SharedStorageConfig {
public storageType: SharedStorageType;
public localMountPoint?: string;
public remoteMountPoint: string;
public resourceGroupName?: string;
public storageAccountName: string;
public storageAccountKey?: string;
public containerName: string;
public localMounted: LocalMountedType;
constructor(storageType: SharedStorageType, remoteMountPoint: string, storageAccountName: string, containerName: string,
localMounted: LocalMountedType, localMountPoint?: string, resourceGroupName?: string, storageAccountKey?: string) {
this.storageType = storageType;
this.localMountPoint = localMountPoint;
this.remoteMountPoint = remoteMountPoint;
this.resourceGroupName = resourceGroupName;
this.storageAccountName = storageAccountName;
this.storageAccountKey = storageAccountKey;
this.containerName = containerName;
this.localMounted = localMounted;
}
}
export class AzureBlobSharedStorageService extends SharedStorageService {
private log: Logger;
private internalStorageService: MountedStorageService;
private experimentId: string;
private storageType?: SharedStorageType;
private storageAccountName?: string;
private storageAccountKey?: string;
private containerName?: string;
private localMountPoint?: string;
private remoteMountPoint?: string;
constructor() {
super();
this.log = getLogger();
this.internalStorageService = new MountedStorageService();
this.experimentId = getExperimentId();
}
public async config(key: string, value: string): Promise<void> {
if (key === TrialConfigMetadataKey.SHARED_STORAGE_CONFIG) {
const azureblobConfig = <AzureBlobSharedStorageConfig>JSON.parse(value);
this.localMountPoint = azureblobConfig.localMountPoint;
this.remoteMountPoint = azureblobConfig.remoteMountPoint;
this.storageType = azureblobConfig.storageType;
this.storageAccountName = azureblobConfig.storageAccountName;
this.containerName = azureblobConfig.containerName;
if (azureblobConfig.storageAccountKey !== undefined) {
this.storageAccountKey =azureblobConfig.storageAccountKey;
} else if (azureblobConfig.resourceGroupName !== undefined) {
await this.setAccountKey(azureblobConfig.resourceGroupName);
} else {
const errorMessage = `${this.storageType} Shared Storage: must set one of 'storageAccountKey' or 'resourceGroupName'.`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
if (azureblobConfig.localMounted === 'nnimount') {
await this.helpLocalMount();
} else if (azureblobConfig.localMounted === 'nomount') {
const errorMessage = `${this.storageType} Shared Storage: ${this.storageType} not Support 'nomount' yet.`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
if (this.canLocalMounted && this.localMountPoint) {
this.internalStorageService.initialize(this.localMountPoint, path.join(this.localMountPoint, 'nni', this.experimentId));
}
}
}
public get canLocalMounted(): boolean{
return true;
}
public get storageService(): MountedStorageService {
return this.internalStorageService;
}
public get localMountCommand(): string {
if (this.localMountPoint) {
return this.getCommand(this.localMountPoint);
} else {
this.log.error(`${this.storageType} Shared Storage: localMountPoint is not initialized.`);
return '';
}
}
public get remoteMountCommand(): string {
if (this.remoteMountPoint) {
return this.getCommand(this.remoteMountPoint);
} else {
this.log.error(`${this.storageType} Shared Storage: remoteMountPoint is not initialized.`);
return '';
}
}
private getCommand(mountPoint: string): string {
const install = `rm -f nni_install_fuseblob.sh && touch nni_install_fuseblob.sh && echo "${INSTALL_BLOBFUSE.replace(/\$/g, `\\$`).replace(/\n/g, `\\n`).replace(/"/g, `\\"`)}" >> nni_install_fuseblob.sh && bash nni_install_fuseblob.sh`;
const prepare = `sudo mkdir /mnt/resource/nniblobfusetmp -p && rm -f nni_fuse_connection.cfg && touch nni_fuse_connection.cfg && echo "accountName ${this.storageAccountName}\\naccountKey ${this.storageAccountKey}\\ncontainerName ${this.containerName}" >> nni_fuse_connection.cfg`;
const mount = `mkdir -p ${mountPoint} && sudo blobfuse ${mountPoint} --tmp-path=/mnt/resource/nniblobfusetmp --config-file=$(pwd)/nni_fuse_connection.cfg -o attr_timeout=240 -o entry_timeout=240 -o negative_timeout=120 -o allow_other`;
const clean = `rm -f nni_install_fuseblob.sh nni_fuse_connection.cfg`;
return `${install} && ${prepare} && ${mount} && ${clean}`;
}
public get localWorkingRoot(): string {
return `${this.localMountPoint}/nni/${this.experimentId}`;
}
public get remoteWorkingRoot(): string {
return `${this.remoteMountPoint}/nni/${this.experimentId}`;
}
private async helpLocalMount(): Promise<void> {
if (process.platform === 'win32') {
const errorMessage = `${this.storageType} Shared Storage: ${this.storageType} do not support mount under Windows yet.`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
try {
this.log.debug(`Local mount command is: ${this.localMountCommand}`);
const result = await cpp.exec(this.localMountCommand);
if (result.stderr) {
throw new Error(result.stderr);
}
} catch (error) {
const errorMessage: string = `${this.storageType} Shared Storage: Mount ${this.storageAccountName}/${this.containerName} to ${this.localMountPoint} failed, error is ${error}`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
return Promise.resolve();
}
private async setAccountKey(resourceGroupName: string): Promise<void> {
try {
const result = await cpp.exec(`az storage account keys list --resource-group ${resourceGroupName} --account-name ${this.storageAccountName} --query "[0].value" | tr -d '"'`);
if (result.stderr) {
throw Error(result.stderr);
} else {
this.storageAccountKey = result.stdout.trim();
}
} catch (error) {
const errorMessage: string = `${this.storageType} Shared Storage: get account key failed, error is ${error}`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment