Unverified Commit 4074f12c authored by guoshzhao's avatar guoshzhao Committed by GitHub
Browse files

Monitor: Initialization - Add Monitor and MonitorRecord class (#240)

**Description**
Add the initial version of Monitor.

**Major Revision**
- Add `Monitor` class to launch background process for monitoring.
- Add `MonitorRecord` class to save the data one time capturing.
parent cc70f9c1
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Exposes interfaces of SuperBench Monitor."""
from superbench.monitor.monitor import Monitor
from superbench.monitor.record import MonitorRecord
__all__ = ['Monitor', 'MonitorRecord']
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""Module of the Monitor class."""
import os
import time
import glob
import sched
import multiprocessing
from superbench.common.utils import logger, run_command
from superbench.common.utils import device_manager as dm
from superbench.monitor.record import MonitorRecord
class Monitor(multiprocessing.Process):
"""The monitor class to collect system metrics periodically."""
def __init__(self, container_name, sample_duration, sample_freq, output_file):
"""Constructor.
Args:
container_name (str): container name that need to monitor, None means the current env.
sample_duration (int): calculate the average metirc during sample_duration seconds.
sample_freq (int): do sampling every sample_freq seconds.
output_file (str): output file in jsonline format.
"""
multiprocessing.Process.__init__(self)
self.__container_name = container_name
self.__sample_duration = sample_duration
self.__sample_freq = sample_freq
self.__output_file = output_file
self.__scheduler = sched.scheduler(time.time, time.sleep)
self.__running = multiprocessing.Value('i', 0)
self.__online_cpus = os.sysconf(os.sysconf_names['SC_NPROCESSORS_ONLN'])
self.__unit_MiByte = 1024 * 1024 * 1.0
self.__output_handler = open(self.__output_file, 'a')
def __preprocess(self):
"""Preprocess/preparation operations before the monitoring.
Return:
True if __preprocess() succeed.
"""
if self.__container_name is not None:
output = run_command('docker ps -qf name={}'.format(self.__container_name))
if output.returncode != 0:
logger.error(
'Failed to get the container id - container name: {}, error message: {}'.format(
self.__container_name, output.stderr
)
)
return False
container_id = output.stdout
output = run_command('docker inspect -f {{.State.Pid}} {}'.format(container_id))
if output.returncode != 0:
logger.error(
'Failed to get the container pid - container id: {}, error message: {}'.format(
container_id, output.stderr
)
)
return False
container_pid = output.stdout
try:
self._cpu_file = glob.glob('/sys/fs/cgroup/cpuacct/docker/{}*/cpuacct.stat'.format(container_id))[0]
self._mem_file = glob.glob(
'/sys/fs/cgroup/memory/docker/{}*/memory.usage_in_bytes'.format(container_id)
)[0]
self._net_file = '/proc/{}/net/dev'.format(container_pid)
except BaseException as e:
logger.error(
'Faild to get the cpu/mem/net file - container: {}, error message: {}'.format(
self.__container_name, str(e)
)
)
return False
else:
self._cpu_file = '/sys/fs/cgroup/cpuacct/cpuacct.stat'
self._mem_file = '/sys/fs/cgroup/memory/memory.usage_in_bytes'
self._net_file = '/proc/net/dev'
return True
def run(self):
"""Method representing the process’s activity.
Return:
True if launching the process succeed.
"""
if self.__running.value == 0:
if not self.__preprocess():
return False
try:
logger.info('Start monitoring.')
self.__running.value = 1
self.__sample()
self.__scheduler.run()
except BaseException as e:
logger.error('Failed to launch the monitor process - error message: {}'.format(str(e)))
self.stop()
return False
else:
logger.error('Monitor is still running')
return True
def stop(self):
"""Method stopping the process’s activity."""
self.__running.value = 0
list(map(self.__scheduler.cancel, self.__scheduler.queue))
self.join()
self.__output_handler.close()
def __sample(self):
"""Method sampling system metrics."""
if self.__running.value == 1:
self.__scheduler.enter(self.__sample_freq, 1, self.__sample, ())
# Sampling
record = MonitorRecord()
self.__sample_host_metrics(record)
self.__sample_gpu_metrics(record)
self.__output_handler.write('{}\n'.format(record.to_string()))
self.__output_handler.flush()
def __sample_host_metrics(self, record):
"""Method sampling the host metrics.
Args:
record (MonitorRecord): record instance to save the metrics.
"""
# First round of capturing.
system_ticks_s = self.__get_total_cpu_ticks()
container_ticks_s = self.__get_process_cpu_ticks()
start_time = time.time()
net_bytes_s = self.__get_network_bytes()
time.sleep(self.__sample_duration)
# Second round of capturing.
system_ticks_e = self.__get_total_cpu_ticks()
container_ticks_e = self.__get_process_cpu_ticks()
end_time = time.time()
net_bytes_e = self.__get_network_bytes()
# Calculate CPU usage.
cpu_usage = (container_ticks_e -
container_ticks_s) * 1.0 / (system_ticks_e - system_ticks_s) * self.__online_cpus * 100
record.cpu_usage = cpu_usage
# Calculate network bandwidth.
net_receive = dict()
net_transmit = dict()
for device in net_bytes_s:
net_receive[
'{}_receive_bw'.format(device)
] = ((net_bytes_e[device][0] - net_bytes_s[device][0]) / (end_time - start_time) / self.__unit_MiByte)
net_transmit[
'{}_transmit_bw'.format(device)
] = ((net_bytes_e[device][1] - net_bytes_s[device][1]) / (end_time - start_time) / self.__unit_MiByte)
record.net_receive = net_receive
record.net_transmit = net_transmit
def __sample_gpu_metrics(self, record):
"""Method sampling the gpu metrics.
Args:
record (MonitorRecord): record instance to save the metrics.
"""
device_count = dm.device_manager.get_device_count()
for i in range(device_count):
record.gpu_usage.append(dm.device_manager.get_device_utilization(i))
record.gpu_temperature.append(dm.device_manager.get_device_temperature(i))
record.gpu_power_limit.append(dm.device_manager.get_device_power_limit(i))
mem_used, mem_total = dm.device_manager.get_device_memory(i)
record.gpu_mem_used.append(mem_used)
record.gpu_mem_total.append(mem_total)
corrected_ecc, uncorrected_ecc = dm.device_manager.get_device_ecc_error(i)
record.gpu_corrected_ecc.append(corrected_ecc)
record.gpu_uncorrected_ecc.append(uncorrected_ecc)
record.gpu_remap_info.append(dm.device_manager.get_device_row_remapped_info(i))
def __get_total_cpu_ticks(self):
"""Method to get the total cpu ticks.
Return:
The total cpu ticks, None means fail to get the data.
"""
try:
with open('/proc/stat', 'r') as f:
for line in f.readlines():
if line.startswith('cpu '):
items = line.split()
total_clock_ticks = 0
for item in items[1:8]:
total_clock_ticks += int(item)
return total_clock_ticks
except BaseException as e:
logger.error('Failed to read total cpu ticks information - error message: {}'.format(str(e)))
return None
def __get_process_cpu_ticks(self):
"""Method to get the process cpu ticks.
Return:
The process cpu ticks, None means fail to get the data.
"""
user_time = 0
system_time = 0
try:
with open(self._cpu_file, 'r') as f:
for line in f:
items = line.split()
if items[0] == 'user':
user_time = int(items[1])
elif items[1] == 'system':
system_time = int(items[1])
return user_time + system_time
except BaseException as e:
logger.error('Failed to read process cpu ticks information - error message: {}'.format(str(e)))
return None
def __get_network_bytes(self):
"""Method to get the network traffic information, unit: bytes.
Return:
The bytes transferred on the network, None means fail to get the data.
"""
net_info = dict()
try:
with open(self._net_file, 'r') as f:
for line in f:
items = line.split()
if len(items) != 17:
continue
else:
receive_bytes = int(items[1])
transmit_bytes = int(items[9])
net_info[items[0].strip()[:-1]] = [receive_bytes, transmit_bytes]
return net_info
except BaseException as e:
logger.error('Failed to read network traffic information - error message: {}'.format(str(e)))
return None
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""A module for data of monitor."""
import json
import numbers
from datetime import datetime
class MonitorRecord:
"""Record class to save all monitoring data."""
def __init__(self):
"""Constructor."""
self.__time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
self.__cpu_usage = None
self.__mem_used = None
self.__mem_total = None
self.__gpu_usage = list()
self.__gpu_temperature = list()
self.__gpu_power_limit = list()
self.__gpu_mem_used = list()
self.__gpu_mem_total = list()
self.__gpu_corrected_ecc = list()
self.__gpu_uncorrected_ecc = list()
self.__gpu_remap_info = list()
self.__net_receive = dict()
self.__net_transmit = dict()
@property
def time(self):
"""Decoration function to access __time."""
return self.__time
@property
def cpu_usage(self):
"""Decoration function to access __cpu_usage."""
return self.__cpu_usage
@cpu_usage.setter
def cpu_usage(self, usage):
"""Set the cpu usage.
Args:
usage (float): cpu usage.
"""
self.__cpu_usage = usage
@property
def mem_used(self):
"""Decoration function to access __mem_used."""
return self.__mem_used
@mem_used.setter
def mem_used(self, mem_used):
"""Set the used host memory, unit: MB.
Args:
mem_used (float): used host memory.
"""
self.__mem_used = mem_used
@property
def mem_total(self):
"""Decoration function to access __mem_total."""
return self.__mem_total
@mem_total.setter
def mem_total(self, mem_total):
"""Set the total host memory, unit: MB.
Args:
mem_total (float): total host memory.
"""
self.__mem_total = mem_total
@property
def gpu_usage(self):
"""Decoration function to access __gpu_usage."""
return self.__gpu_usage
@gpu_usage.setter
def gpu_usage(self, gpu_usage):
"""Set the gpu usage.
Args:
gpu_usage (list): list of gpu usage.
"""
self.__gpu_usage = gpu_usage
@property
def gpu_temperature(self):
"""Decoration function to access __gpu_temperature."""
return self.__gpu_temperature
@gpu_temperature.setter
def gpu_temperature(self, gpu_temperature):
"""Set the gpu temperature, unit: Celsius.
Args:
gpu_temperature (list): list of gpu temperature.
"""
self.__gpu_temperature = gpu_temperature
@property
def gpu_power_limit(self):
"""Decoration function to access __gpu_power_limit."""
return self.__gpu_power_limit
@gpu_power_limit.setter
def gpu_power_limit(self, gpu_power_limit):
"""Set the gpu power limit, unit: Watt.
Args:
gpu_power_limit (list): list of gpu power limit.
"""
self.__gpu_power_limit = gpu_power_limit
@property
def gpu_mem_used(self):
"""Decoration function to access __gpu_mem_used."""
return self.__gpu_mem_used
@gpu_mem_used.setter
def gpu_mem_used(self, gpu_mem_used):
"""Set the used gpu memory, unit: MB.
Args:
gpu_mem_used (list): list of used gpu memory.
"""
self.__gpu_mem_used = gpu_mem_used
@property
def gpu_mem_total(self):
"""Decoration function to access __gpu_mem_total."""
return self.__gpu_mem_total
@gpu_mem_total.setter
def gpu_mem_total(self, gpu_mem_total):
"""Set the total gpu memory, unit: MB.
Args:
gpu_mem_total (list): list of total gpu memory.
"""
self.__gpu_mem_total = gpu_mem_total
@property
def gpu_corrected_ecc(self):
"""Decoration function to access __gpu_corrected_ecc."""
return self.__gpu_corrected_ecc
@gpu_corrected_ecc.setter
def gpu_corrected_ecc(self, gpu_corrected_ecc):
"""Set the count of corrected (single bit) ecc error.
Args:
gpu_corrected_ecc (list): list of gpu corrected ecc error.
"""
self.__gpu_corrected_ecc = gpu_corrected_ecc
@property
def gpu_uncorrected_ecc(self):
"""Decoration function to access __gpu_uncorrected_ecc."""
return self.__gpu_uncorrected_ecc
@gpu_uncorrected_ecc.setter
def gpu_uncorrected_ecc(self, gpu_uncorrected_ecc):
"""Set the count of uncorrected (double bit) ecc error.
Args:
gpu_uncorrected_ecc (list): list of gpu uncorrected ecc error.
"""
self.__gpu_uncorrected_ecc = gpu_uncorrected_ecc
@property
def gpu_remap_info(self):
"""Decoration function to access __gpu_remap_info."""
return self.__gpu_remap_info
@gpu_remap_info.setter
def gpu_remap_info(self, gpu_remap_info):
"""Set the gpu remap_info.
Args:
gpu_remap_info (list): list of gpu remap_info.
"""
self.__gpu_remap_info = gpu_remap_info
@property
def net_receive(self):
"""Decoration function to access __net_receive."""
return self.__net_receive
@net_receive.setter
def net_receive(self, net_receive):
"""Set the network receive bandwidth, unit: Bytes/s.
Args:
net_receive (dict): receive bandwidth for all devices.
"""
self.__net_receive = net_receive
@property
def net_transmit(self):
"""Decoration function to access __net_transmit."""
return self.__net_transmit
@net_transmit.setter
def net_transmit(self, net_transmit):
"""Set the network transmit bandwidth, unit: Bytes/s.
Args:
net_transmit (dict): transmit bandwidth for all devices.
"""
self.__net_transmit = net_transmit
def to_string(self):
"""Serialize the MonitorRecord object to string.
Return:
The serialized string of MonitorRecord object.
"""
formatted_obj = dict()
for key, value in self.__dict__.items():
# The name of internal member is like '_MonitorRecord__name'.
# For the result object return to caller, need to reformat the 'name' as key.
formatted_key = key.split('__')[1]
if isinstance(value, numbers.Number) or isinstance(value, str):
formatted_obj[formatted_key] = value
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, numbers.Number):
formatted_obj['{}:{}'.format(formatted_key, i)] = item
elif isinstance(item, dict):
for k, v in item.items():
formatted_obj['{}:{}'.format(k, i)] = v
elif isinstance(value, dict):
for k, v in value.items():
formatted_obj[k] = v
return json.dumps(formatted_obj)
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for Monitor module."""
import numbers
from tests.helper import decorator
from superbench.monitor import Monitor
from superbench.monitor import MonitorRecord
@decorator.cuda_test
def test_monitor():
"""Test the module Monitor."""
monitor = Monitor(None, 1, 10, 'file')
monitor._Monitor__preprocess()
record = MonitorRecord()
monitor._Monitor__sample_host_metrics(record)
assert (isinstance(record.cpu_usage, numbers.Number))
assert (record.net_receive)
assert (record.net_transmit)
for key, value in record.net_receive.items():
assert ('_receive_bw' in key)
isinstance(value, numbers.Number)
for key, value in record.net_transmit.items():
assert ('_transmit_bw' in key)
isinstance(value, numbers.Number)
monitor._Monitor__sample_gpu_metrics(record)
gpu_list_metrics = [
record.gpu_usage, record.gpu_temperature, record.gpu_power_limit, record.gpu_mem_used, record.gpu_mem_total,
record.gpu_corrected_ecc, record.gpu_uncorrected_ecc
]
for metric in gpu_list_metrics:
assert (metric)
for value in metric:
isinstance(value, numbers.Number)
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for MonitorRecord module."""
import re
import json
from superbench.monitor import MonitorRecord
def test_monitor_record():
"""Test the module MonitorRecord."""
mr = MonitorRecord()
mr.cpu_usage = 80
mr.mem_used = 100
mr.mem_total = 1024
mr.gpu_usage = [90, 80, 86, 72, 79, 81, 94, 85]
mr.gpu_temperature = [62, 75, 69, 63, 72, 77, 80, 71]
mr.gpu_power_limit = [400, 400, 400, 350, 400, 400, 400, 400]
mr.gpu_mem_used = [2550, 2680, 2543, 2588, 2612, 2603, 2515, 2593]
mr.gpu_mem_total = [16777216, 16777216, 16777216, 16777216, 16777216, 16777216, 16777216, 16777216]
mr.gpu_corrected_ecc = [0, 0, 0, 0, 0, 0, 0, 0]
mr.gpu_uncorrected_ecc = [0, 0, 0, 0, 0, 0, 0, 0]
gpu_remap_info = {
'gpu_remap_correctable_error': 0,
'gpu_remap_uncorrectable_error': 0,
'gpu_remap_max': 640,
'gpu_remap_high': 0,
'gpu_remap_partial': 0,
'gpu_remap_low': 0,
'gpu_remap_none': 0
}
gpu_remap_infos = list()
for i in range(8):
gpu_remap_infos.append(gpu_remap_info)
mr.gpu_remap_info = gpu_remap_infos
mr.net_receive = {'eth0_receive_bw': 100, 'ib0_receive_bw': 1000}
mr.net_transmit = {'eth0_transmit_bw': 80, 'ib0_transmit_bw': 800}
expected_record = {
'time': '2021-11-18 06:24:00',
'cpu_usage': 80,
'mem_used': 100,
'mem_total': 1024,
'gpu_usage:0': 90,
'gpu_usage:1': 80,
'gpu_usage:2': 86,
'gpu_usage:3': 72,
'gpu_usage:4': 79,
'gpu_usage:5': 81,
'gpu_usage:6': 94,
'gpu_usage:7': 85,
'gpu_temperature:0': 62,
'gpu_temperature:1': 75,
'gpu_temperature:2': 69,
'gpu_temperature:3': 63,
'gpu_temperature:4': 72,
'gpu_temperature:5': 77,
'gpu_temperature:6': 80,
'gpu_temperature:7': 71,
'gpu_power_limit:0': 400,
'gpu_power_limit:1': 400,
'gpu_power_limit:2': 400,
'gpu_power_limit:3': 350,
'gpu_power_limit:4': 400,
'gpu_power_limit:5': 400,
'gpu_power_limit:6': 400,
'gpu_power_limit:7': 400,
'gpu_mem_used:0': 2550,
'gpu_mem_used:1': 2680,
'gpu_mem_used:2': 2543,
'gpu_mem_used:3': 2588,
'gpu_mem_used:4': 2612,
'gpu_mem_used:5': 2603,
'gpu_mem_used:6': 2515,
'gpu_mem_used:7': 2593,
'gpu_mem_total:0': 16777216,
'gpu_mem_total:1': 16777216,
'gpu_mem_total:2': 16777216,
'gpu_mem_total:3': 16777216,
'gpu_mem_total:4': 16777216,
'gpu_mem_total:5': 16777216,
'gpu_mem_total:6': 16777216,
'gpu_mem_total:7': 16777216,
'gpu_corrected_ecc:0': 0,
'gpu_corrected_ecc:1': 0,
'gpu_corrected_ecc:2': 0,
'gpu_corrected_ecc:3': 0,
'gpu_corrected_ecc:4': 0,
'gpu_corrected_ecc:5': 0,
'gpu_corrected_ecc:6': 0,
'gpu_corrected_ecc:7': 0,
'gpu_uncorrected_ecc:0': 0,
'gpu_uncorrected_ecc:1': 0,
'gpu_uncorrected_ecc:2': 0,
'gpu_uncorrected_ecc:3': 0,
'gpu_uncorrected_ecc:4': 0,
'gpu_uncorrected_ecc:5': 0,
'gpu_uncorrected_ecc:6': 0,
'gpu_uncorrected_ecc:7': 0,
'gpu_remap_correctable_error:0': 0,
'gpu_remap_uncorrectable_error:0': 0,
'gpu_remap_max:0': 640,
'gpu_remap_high:0': 0,
'gpu_remap_partial:0': 0,
'gpu_remap_low:0': 0,
'gpu_remap_none:0': 0,
'gpu_remap_correctable_error:1': 0,
'gpu_remap_uncorrectable_error:1': 0,
'gpu_remap_max:1': 640,
'gpu_remap_high:1': 0,
'gpu_remap_partial:1': 0,
'gpu_remap_low:1': 0,
'gpu_remap_none:1': 0,
'gpu_remap_correctable_error:2': 0,
'gpu_remap_uncorrectable_error:2': 0,
'gpu_remap_max:2': 640,
'gpu_remap_high:2': 0,
'gpu_remap_partial:2': 0,
'gpu_remap_low:2': 0,
'gpu_remap_none:2': 0,
'gpu_remap_correctable_error:3': 0,
'gpu_remap_uncorrectable_error:3': 0,
'gpu_remap_max:3': 640,
'gpu_remap_high:3': 0,
'gpu_remap_partial:3': 0,
'gpu_remap_low:3': 0,
'gpu_remap_none:3': 0,
'gpu_remap_correctable_error:4': 0,
'gpu_remap_uncorrectable_error:4': 0,
'gpu_remap_max:4': 640,
'gpu_remap_high:4': 0,
'gpu_remap_partial:4': 0,
'gpu_remap_low:4': 0,
'gpu_remap_none:4': 0,
'gpu_remap_correctable_error:5': 0,
'gpu_remap_uncorrectable_error:5': 0,
'gpu_remap_max:5': 640,
'gpu_remap_high:5': 0,
'gpu_remap_partial:5': 0,
'gpu_remap_low:5': 0,
'gpu_remap_none:5': 0,
'gpu_remap_correctable_error:6': 0,
'gpu_remap_uncorrectable_error:6': 0,
'gpu_remap_max:6': 640,
'gpu_remap_high:6': 0,
'gpu_remap_partial:6': 0,
'gpu_remap_low:6': 0,
'gpu_remap_none:6': 0,
'gpu_remap_correctable_error:7': 0,
'gpu_remap_uncorrectable_error:7': 0,
'gpu_remap_max:7': 640,
'gpu_remap_high:7': 0,
'gpu_remap_partial:7': 0,
'gpu_remap_low:7': 0,
'gpu_remap_none:7': 0,
'eth0_receive_bw': 100,
'ib0_receive_bw': 1000,
'eth0_transmit_bw': 80,
'ib0_transmit_bw': 800
}
result = mr.to_string()
result = re.sub(r'\"\d+-\d+-\d+ \d+:\d+:\d+\"', '\"2021-11-18 06:24:00\"', result)
assert (json.loads(result) == expected_record)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment