Unverified Commit 6330df2f authored by liuzhe-lz's avatar liuzhe-lz Committed by GitHub
Browse files

Fix launch from Python log (#3263)

parent 098d89af
import atexit import atexit
import logging import logging
from pathlib import Path
import socket import socket
from subprocess import Popen from subprocess import Popen
from threading import Thread from threading import Thread
...@@ -15,6 +16,7 @@ from nni.tuner import Tuner ...@@ -15,6 +16,7 @@ from nni.tuner import Tuner
from .config import ExperimentConfig from .config import ExperimentConfig
from . import launcher from . import launcher
from . import management
from .pipe import Pipe from .pipe import Pipe
from . import rest from . import rest
from ..tools.nnictl.command_utils import kill_command from ..tools.nnictl.command_utils import kill_command
...@@ -76,6 +78,7 @@ class Experiment: ...@@ -76,6 +78,7 @@ class Experiment:
def __init__(self, tuner: Tuner, config=None, training_service=None): def __init__(self, tuner: Tuner, config=None, training_service=None):
self.config: ExperimentConfig self.config: ExperimentConfig
self.id: Optional[str] = None
self.port: Optional[int] = None self.port: Optional[int] = None
self.tuner: Tuner = tuner self.tuner: Tuner = tuner
self._proc: Optional[Popen] = None self._proc: Optional[Popen] = None
...@@ -108,10 +111,15 @@ class Experiment: ...@@ -108,10 +111,15 @@ class Experiment:
""" """
atexit.register(self.stop) atexit.register(self.stop)
if debug: self.id = management.generate_experiment_id()
logging.getLogger('nni').setLevel(logging.DEBUG)
self._proc, self._pipe = launcher.start_experiment(self.config, port, debug) if self.config.experiment_working_directory is not None:
log_dir = Path(self.config.experiment_working_directory, self.id, 'log')
else:
log_dir = Path.home() / f'nni-experiments/{self.id}/log'
nni.runtime.log.start_experiment_log(self.id, log_dir, debug)
self._proc, self._pipe = launcher.start_experiment(self.id, self.config, port, debug)
assert self._proc is not None assert self._proc is not None
assert self._pipe is not None assert self._pipe is not None
...@@ -119,7 +127,7 @@ class Experiment: ...@@ -119,7 +127,7 @@ class Experiment:
# dispatcher must be launched after pipe initialized # dispatcher must be launched after pipe initialized
# the logic to launch dispatcher in background should be refactored into dispatcher api # the logic to launch dispatcher in background should be refactored into dispatcher api
self._dispatcher = MsgDispatcher(self.tuner, None) self._dispatcher = self._create_dispatcher()
self._dispatcher_thread = Thread(target=self._dispatcher.run) self._dispatcher_thread = Thread(target=self._dispatcher.run)
self._dispatcher_thread.start() self._dispatcher_thread.start()
...@@ -129,10 +137,11 @@ class Experiment: ...@@ -129,10 +137,11 @@ class Experiment:
if interface.family == socket.AF_INET: if interface.family == socket.AF_INET:
ips.append(interface.address) ips.append(interface.address)
ips = [f'http://{ip}:{port}' for ip in ips if ip] ips = [f'http://{ip}:{port}' for ip in ips if ip]
msg = 'Web UI URLs: ' + colorama.Fore.CYAN + ' '.join(ips) msg = 'Web UI URLs: ' + colorama.Fore.CYAN + ' '.join(ips) + colorama.Style.RESET_ALL
_logger.info(msg) _logger.info(msg)
# TODO: register experiment management metadata def _create_dispatcher(self): # overrided by retiarii, temporary solution
return MsgDispatcher(self.tuner, None)
def stop(self) -> None: def stop(self) -> None:
...@@ -142,6 +151,8 @@ class Experiment: ...@@ -142,6 +151,8 @@ class Experiment:
_logger.info('Stopping experiment, please wait...') _logger.info('Stopping experiment, please wait...')
atexit.unregister(self.stop) atexit.unregister(self.stop)
if self.id is not None:
nni.runtime.log.stop_experiment_log(self.id)
if self._proc is not None: if self._proc is not None:
kill_command(self._proc.pid) kill_command(self._proc.pid)
if self._pipe is not None: if self._pipe is not None:
...@@ -150,6 +161,7 @@ class Experiment: ...@@ -150,6 +161,7 @@ class Experiment:
self._dispatcher.stopping = True self._dispatcher.stopping = True
self._dispatcher_thread.join(timeout=1) self._dispatcher_thread.join(timeout=1)
self.id = None
self.port = None self.port = None
self._proc = None self._proc = None
self._pipe = None self._pipe = None
......
...@@ -14,7 +14,6 @@ import nni_node ...@@ -14,7 +14,6 @@ import nni_node
from .config import ExperimentConfig from .config import ExperimentConfig
from .config import convert from .config import convert
from . import management
from .pipe import Pipe from .pipe import Pipe
from . import rest from . import rest
from ..tools.nnictl.config_utils import Experiments from ..tools.nnictl.config_utils import Experiments
...@@ -22,7 +21,7 @@ from ..tools.nnictl.config_utils import Experiments ...@@ -22,7 +21,7 @@ from ..tools.nnictl.config_utils import Experiments
_logger = logging.getLogger('nni.experiment') _logger = logging.getLogger('nni.experiment')
def start_experiment(config: ExperimentConfig, port: int, debug: bool) -> Tuple[Popen, Pipe]: def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bool) -> Tuple[Popen, Pipe]:
pipe = None pipe = None
proc = None proc = None
...@@ -30,10 +29,9 @@ def start_experiment(config: ExperimentConfig, port: int, debug: bool) -> Tuple[ ...@@ -30,10 +29,9 @@ def start_experiment(config: ExperimentConfig, port: int, debug: bool) -> Tuple[
_ensure_port_idle(port) _ensure_port_idle(port)
if config.training_service.platform == 'openpai': if config.training_service.platform == 'openpai':
_ensure_port_idle(port + 1, 'OpenPAI requires an additional port') _ensure_port_idle(port + 1, 'OpenPAI requires an additional port')
exp_id = management.generate_experiment_id()
try: try:
_logger.info('Creating experiment %s%s', colorama.Fore.CYAN, exp_id) _logger.info('Creating experiment %s', colorama.Fore.CYAN + exp_id + colorama.Style.RESET_ALL)
pipe = Pipe(exp_id) pipe = Pipe(exp_id)
start_time, proc = _start_rest_server(config, port, debug, exp_id, pipe.path) start_time, proc = _start_rest_server(config, port, debug, exp_id, pipe.path)
_logger.info('Connecting IPC pipe...') _logger.info('Connecting IPC pipe...')
......
import atexit
import logging import logging
import socket
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
...@@ -8,10 +6,7 @@ from subprocess import Popen ...@@ -8,10 +6,7 @@ from subprocess import Popen
from threading import Thread from threading import Thread
from typing import Any, Optional from typing import Any, Optional
import colorama from ..experiment import Experiment, TrainingServiceConfig
import psutil
from ..experiment import Experiment, TrainingServiceConfig, launcher
from ..experiment.config.base import ConfigBase, PathLike from ..experiment.config.base import ConfigBase, PathLike
from ..experiment.config import util from ..experiment.config import util
from ..experiment.pipe import Pipe from ..experiment.pipe import Pipe
...@@ -159,34 +154,11 @@ class RetiariiExperiment(Experiment): ...@@ -159,34 +154,11 @@ class RetiariiExperiment(Experiment):
debug debug
Whether to start in debug mode. Whether to start in debug mode.
""" """
atexit.register(self.stop) super().start(port, debug)
if debug:
logging.getLogger('nni').setLevel(logging.DEBUG)
self._proc, self._pipe = launcher.start_experiment(self.config, port, debug)
assert self._proc is not None
assert self._pipe is not None
self.port = port # port will be None if start up failed
# dispatcher must be created after pipe initialized
# the logic to launch dispatcher in background should be refactored into dispatcher api
self._dispatcher_thread = Thread(target=self._dispatcher.run)
self._dispatcher_thread.start()
self._start_strategy() self._start_strategy()
ips = [self.config.nni_manager_ip] def _create_dispatcher(self):
for interfaces in psutil.net_if_addrs().values(): return self._dispatcher
for interface in interfaces:
if interface.family == socket.AF_INET:
ips.append(interface.address)
ips = [f'http://{ip}:{port}' for ip in ips if ip]
msg = 'Web UI URLs: ' + colorama.Fore.CYAN + ' '.join(ips)
_logger.info(msg)
# TODO: register experiment management metadata
def run(self, config: RetiariiExeConfig = None, port: int = 8080, debug: bool = False) -> str: def run(self, config: RetiariiExeConfig = None, port: int = 8080, debug: bool = False) -> str:
""" """
......
...@@ -12,6 +12,13 @@ import colorama ...@@ -12,6 +12,13 @@ import colorama
from .env_vars import dispatcher_env_vars, trial_env_vars from .env_vars import dispatcher_env_vars, trial_env_vars
handlers = {}
log_format = '[%(asctime)s] %(levelname)s (%(name)s/%(threadName)s) %(message)s'
time_format = '%Y-%m-%d %H:%M:%S'
formatter = Formatter(log_format, time_format)
def init_logger() -> None: def init_logger() -> None:
""" """
This function will (and should only) get invoked on the first time of importing nni (no matter which submodule). This function will (and should only) get invoked on the first time of importing nni (no matter which submodule).
...@@ -37,6 +44,8 @@ def init_logger() -> None: ...@@ -37,6 +44,8 @@ def init_logger() -> None:
_init_logger_standalone() _init_logger_standalone()
logging.getLogger('filelock').setLevel(logging.WARNING)
def init_logger_experiment() -> None: def init_logger_experiment() -> None:
""" """
...@@ -44,18 +53,19 @@ def init_logger_experiment() -> None: ...@@ -44,18 +53,19 @@ def init_logger_experiment() -> None:
This function will get invoked after `init_logger()`. This function will get invoked after `init_logger()`.
""" """
formatter.format = _colorful_format colorful_formatter = Formatter(log_format, time_format)
colorful_formatter.format = _colorful_format
handlers['_default_'].setFormatter(colorful_formatter)
log_path = _prepare_log_dir(dispatcher_env_vars.NNI_LOG_DIRECTORY) / 'dispatcher.log' def start_experiment_log(experiment_id: str, log_directory: Path, debug: bool) -> None:
_setup_root_logger(FileHandler(log_path), logging.DEBUG) log_path = _prepare_log_dir(log_directory) / 'dispatcher.log'
log_level = logging.DEBUG if debug else logging.INFO
_register_handler(FileHandler(log_path), log_level, experiment_id)
def stop_experiment_log(experiment_id: str) -> None:
if experiment_id in handlers:
logging.getLogger().removeHandler(handlers.pop(experiment_id))
time_format = '%Y-%m-%d %H:%M:%S'
formatter = Formatter(
'[%(asctime)s] %(levelname)s (%(name)s/%(threadName)s) %(message)s',
time_format
)
def _init_logger_dispatcher() -> None: def _init_logger_dispatcher() -> None:
log_level_map = { log_level_map = {
...@@ -69,26 +79,20 @@ def _init_logger_dispatcher() -> None: ...@@ -69,26 +79,20 @@ def _init_logger_dispatcher() -> None:
log_path = _prepare_log_dir(dispatcher_env_vars.NNI_LOG_DIRECTORY) / 'dispatcher.log' log_path = _prepare_log_dir(dispatcher_env_vars.NNI_LOG_DIRECTORY) / 'dispatcher.log'
log_level = log_level_map.get(dispatcher_env_vars.NNI_LOG_LEVEL, logging.INFO) log_level = log_level_map.get(dispatcher_env_vars.NNI_LOG_LEVEL, logging.INFO)
_setup_root_logger(FileHandler(log_path), log_level) _register_handler(FileHandler(log_path), log_level)
def _init_logger_trial() -> None: def _init_logger_trial() -> None:
log_path = _prepare_log_dir(trial_env_vars.NNI_OUTPUT_DIR) / 'trial.log' log_path = _prepare_log_dir(trial_env_vars.NNI_OUTPUT_DIR) / 'trial.log'
log_file = open(log_path, 'w') log_file = open(log_path, 'w')
_setup_root_logger(StreamHandler(log_file), logging.INFO) _register_handler(StreamHandler(log_file), logging.INFO)
if trial_env_vars.NNI_PLATFORM == 'local': if trial_env_vars.NNI_PLATFORM == 'local':
sys.stdout = _LogFileWrapper(log_file) sys.stdout = _LogFileWrapper(log_file)
def _init_logger_standalone() -> None: def _init_logger_standalone() -> None:
_setup_nni_logger(StreamHandler(sys.stdout), logging.INFO) _register_handler(StreamHandler(sys.stdout), logging.INFO)
# Following line does not affect NNI loggers, but without this user's logger won't
# print log even it's level is set to INFO, so we do it for user's convenience.
# If this causes any issue in future, remove it and use `logging.info()` instead of
# `logging.getLogger('xxx').info()` in all examples.
logging.basicConfig()
def _prepare_log_dir(path: Optional[str]) -> Path: def _prepare_log_dir(path: Optional[str]) -> Path:
...@@ -98,20 +102,18 @@ def _prepare_log_dir(path: Optional[str]) -> Path: ...@@ -98,20 +102,18 @@ def _prepare_log_dir(path: Optional[str]) -> Path:
ret.mkdir(parents=True, exist_ok=True) ret.mkdir(parents=True, exist_ok=True)
return ret return ret
def _setup_root_logger(handler: Handler, level: int) -> None: def _register_handler(handler: Handler, level: int, tag: str = '_default_') -> None:
_setup_logger('', handler, level) assert tag not in handlers
handlers[tag] = handler
def _setup_nni_logger(handler: Handler, level: int) -> None:
_setup_logger('nni', handler, level)
def _setup_logger(name: str, handler: Handler, level: int) -> None:
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger = logging.getLogger(name) logger = logging.getLogger()
logger.addHandler(handler) logger.addHandler(handler)
logger.setLevel(level) logger.setLevel(level)
logger.propagate = False
def _colorful_format(record): def _colorful_format(record):
time = formatter.formatTime(record, time_format)
if not record.name.startswith('nni.'):
return '[{}] ({}) {}'.format(time, record.name, record.msg % record.args)
if record.levelno >= logging.ERROR: if record.levelno >= logging.ERROR:
color = colorama.Fore.RED color = colorama.Fore.RED
elif record.levelno >= logging.WARNING: elif record.levelno >= logging.WARNING:
...@@ -121,7 +123,6 @@ def _colorful_format(record): ...@@ -121,7 +123,6 @@ def _colorful_format(record):
else: else:
color = colorama.Fore.BLUE color = colorama.Fore.BLUE
msg = color + (record.msg % record.args) + colorama.Style.RESET_ALL msg = color + (record.msg % record.args) + colorama.Style.RESET_ALL
time = formatter.formatTime(record, time_format)
if record.levelno < logging.INFO: if record.levelno < logging.INFO:
return '[{}] {}:{} {}'.format(time, record.threadName, record.name, msg) return '[{}] {}:{} {}'.format(time, record.threadName, record.name, msg)
else: else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment