Unverified Commit bfaa1c83 authored by Yifan Xiong's avatar Yifan Xiong Committed by GitHub
Browse files

Support multiple IB/GPU in ib validation (#363)

**Description**

Support multiple IB/GPU devices run simultaneously in ib validation benchmark.

**Major Revisions**
- Revise ib_validation_performance.cc so that multiple processes per node could be used to launch multiple perftest commands simultaneously. For each node pair in the config, number of processes per node will run in parallel.
- Revise ib_validation_performance.py to correct file paths and adjust parameters to specify different NICs/GPUs/NUMA nodes.
- Fix env issues in Dockerfile for end-to-end test.
- Update ib-traffic configuration examples in config files.
- Update unit tests and docs accordingly.

Closes #326.
parent 0f7b057a
......@@ -115,6 +115,10 @@ ENV PATH="${PATH}" \
ANSIBLE_DEPRECATION_WARNINGS=FALSE \
ANSIBLE_COLLECTIONS_PATH=/usr/share/ansible/collections
RUN echo PATH="$PATH" > /etc/environment && \
echo LD_LIBRARY_PATH="$LD_LIBRARY_PATH" >> /etc/environment && \
echo SB_MICRO_PATH="$SB_MICRO_PATH" >> /etc/environment
WORKDIR ${SB_HOME}
ADD third_party third_party
......
......@@ -35,16 +35,17 @@ RUN apt-get update && \
libaio-dev \
libboost-program-options-dev \
libcap2 \
libnuma-dev \
libpci-dev \
libtinfo5 \
libtool \
lshw \
net-tools \
libnuma-dev \
numactl \
openssh-client \
openssh-server \
pciutils \
rsync \
util-linux \
vim \
wget \
......@@ -113,6 +114,10 @@ ENV PATH="${PATH}:/opt/rocm/hip/bin/" \
ANSIBLE_DEPRECATION_WARNINGS=FALSE \
ANSIBLE_COLLECTIONS_PATH=/usr/share/ansible/collections
RUN echo PATH="$PATH" > /etc/environment && \
echo LD_LIBRARY_PATH="$LD_LIBRARY_PATH" >> /etc/environment && \
echo SB_MICRO_PATH="$SB_MICRO_PATH" >> /etc/environment
WORKDIR ${SB_HOME}
ADD third_party third_party
......
......@@ -34,17 +34,18 @@ RUN apt-get update && \
libaio-dev \
libboost-program-options-dev \
libcap2 \
libnuma-dev \
libpci-dev \
libssl-dev \
libtinfo5 \
libtool \
lshw \
net-tools \
libnuma-dev \
libssl-dev \
numactl \
openssh-client \
openssh-server \
pciutils \
rsync \
util-linux \
vim \
wget \
......@@ -129,6 +130,10 @@ ENV PATH="${PATH}:/opt/rocm/hip/bin/" \
ANSIBLE_DEPRECATION_WARNINGS=FALSE \
ANSIBLE_COLLECTIONS_PATH=/usr/share/ansible/collections
RUN echo PATH="$PATH" > /etc/environment && \
echo LD_LIBRARY_PATH="$LD_LIBRARY_PATH" >> /etc/environment && \
echo SB_MICRO_PATH="$SB_MICRO_PATH" >> /etc/environment
WORKDIR ${SB_HOME}
ADD third_party third_party
......
......@@ -138,7 +138,7 @@ The supported percentiles are 50, 90, 95, 99, and 99.9.
#### Metrics
| Name | Unit | Description |
|---------------------------------------------------------|-----------|-----------------------------------------------------------------------------|
|-----------------------------------------------------|-----------|--------------------------------------------------------------------------|
| ort-inference/{precision}_{model}_time | time (ms) | The mean latency to execute one batch of inference. |
| ort-inference/{precision}_{model}_time_{percentile} | time (ms) | The {percentile}th percentile latency to execute one batch of inference. |
......@@ -152,7 +152,7 @@ Supports the use of double unit types and the use of tensor cores.
#### Metrics
| Name | Unit | Description |
|--------------------------|------------|-------------------------------------------------------------------------------------|
|-------------------------|----------|------------------------------------------------------------------------------------|
| gpu-burn/time | time (s) | The runtime for gpu-burn test. |
| gpu-burn/gpu_[0-9]_pass | yes/no | The result of the gpu-burn test for each GPU (1: yes, 0: no). |
| gpu-burn/abort | yes/no | Whether or not GPU-burn test aborted before returning GPU results (1: yes, 0: no). |
......@@ -305,9 +305,9 @@ Each row in the config is one round, and all pairs of nodes in a row run ib comm
#### Metrics
| Metrics | Unit | Description |
|---------------------------------------------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ib-traffic/${command}_${line}_${pair}_${server}_${client}_bw | bandwidth (GB/s) | The max bandwidth of ib command (ib_write_bw, ib_send_bw, ib_read_bw) run between the ${pair}<sup>th</sup> node pair in the ${line}<sup>th</sup> line of the config, ${server} and ${client} are the hostname of server and client |
| ib-traffic/${command}_${line}_${pair}_${server}_${client}_lat | time (us) | The max latency of ib command (ib_write_lat, ib_send_lat, ib_read_lat) run between the ${pair}<sup>th</sup> node pair in the ${line}<sup>th</sup> line of the config, ${server} and ${client} are the hostname of server and client |
|------------------------------------------------------------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ib-traffic/ib\_write\_bw\_${line}\_${pair}:${server}\_${client} | bandwidth (GB/s) | The max bandwidth of perftest (ib_write_bw, ib_send_bw, ib_read_bw) run between the ${pair}<sup>th</sup> node pair in the ${line}<sup>th</sup> line of the config, ${server} and ${client} are the hostname of server and client. |
| ib-traffic/ib\_write\_lat\_${line}\_${pair}:${server}\_${client} | time (us) | The max latency of perftest (ib_write_lat, ib_send_lat, ib_read_lat) run between the ${pair}<sup>th</sup> node pair in the ${line}<sup>th</sup> line of the config, ${server} and ${client} are the hostname of server and client. |
## Computation-communication Benchmarks
......
......@@ -6,7 +6,6 @@
import os
from superbench.common.utils import logger
from superbench.common.utils import network
from superbench.benchmarks import BenchmarkRegistry, ReturnCode
from superbench.common.devices import GPU
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke
......@@ -28,7 +27,7 @@ def __init__(self, name, parameters=''):
'ib_write_bw', 'ib_read_bw', 'ib_send_bw', 'ib_write_lat', 'ib_read_lat', 'ib_send_lat'
]
self.__patterns = ['one-to-one', 'one-to-many', 'many-to-one']
self.__config_path = os.getcwd() + '/config.txt'
self.__config_path = os.path.join(os.getcwd(), 'config.txt')
self.__config = []
def add_parser_arguments(self):
......@@ -36,59 +35,71 @@ def add_parser_arguments(self):
super().add_parser_arguments()
self._parser.add_argument(
'--ib_index',
type=int,
default=0,
'--ib_dev',
type=str,
default='mlx5_0',
required=False,
help='The IB device, e.g., mlx5_0, mlx5_$LOCAL_RANK, mlx5_$((LOCAL_RANK/2)), etc.',
)
self._parser.add_argument(
'--gpu_dev',
type=str,
default=None,
required=False,
help='The GPU device, e.g., 0, $LOCAL_RANK, $((LOCAL_RANK/2)), etc.',
)
self._parser.add_argument(
'--numa_dev',
type=str,
default=None,
required=False,
help='The index of ib device.',
help='The NUMA node to bind, e.g., 0, $LOCAL_RANK, $((LOCAL_RANK/2)), etc.',
)
# perftest configurations
self._parser.add_argument(
'--iters',
type=int,
default=5000,
required=False,
help='The iterations of running ib command',
help='The iterations of perftest command',
)
self._parser.add_argument(
'--msg_size',
type=int,
default=None,
required=False,
help='The message size of running ib command, e.g., 8388608.',
help='The message size of perftest command, e.g., 8388608.',
)
self._parser.add_argument(
'--bidirectional', action='store_true', default=False, help='Measure bidirectional bandwidth.'
)
self._parser.add_argument(
'--commands',
'--command',
type=str,
nargs='+',
default=['ib_write_bw'],
help='The ib command used to run, e.g., {}.'.format(' '.join(self.__support_ib_commands)),
default='ib_write_bw',
required=False,
help='The perftest command to use, e.g., {}.'.format(' '.join(self.__support_ib_commands)),
)
# customized configurations
self._parser.add_argument(
'--pattern',
type=str,
default='one-to-one',
required=False,
help='Test IB traffic pattern type, e.g., {}.'.format(''.join(self.__patterns)),
help='IB traffic pattern type, e.g., {}.'.format(''.join(self.__patterns)),
)
self._parser.add_argument(
'--config',
type=str,
default=None,
required=False,
help='The path of config file on the target machines',
)
self._parser.add_argument(
'--bidirectional', action='store_true', default=False, help='Measure bidirectional bandwidth.'
)
self._parser.add_argument(
'--gpu_index', type=int, default=None, required=False, help='Test Use GPUDirect with the gpu index.'
help='The path of config file on the target machines.',
)
self._parser.add_argument(
'--hostfile',
type=str,
default='/root/hostfile',
default=None,
required=False,
help='The path of hostfile on the target machines',
help='The path of hostfile on the target machines.',
)
def __one_to_many(self, n):
......@@ -191,25 +202,24 @@ def gen_traffic_pattern(self, n, mode, config_file_path):
for line in config:
f.write(line + '\n')
def __prepare_config(self, node_num):
def __prepare_config(self):
"""Prepare and read config file.
Args:
node_num (int): the number of nodes.
Returns:
True if the config is not empty and valid.
"""
try:
# Read the hostfile
if not self._args.hostfile:
self._args.hostfile = os.path.join(os.environ.get('SB_WORKSPACE', '.'), 'hostfile')
with open(self._args.hostfile, 'r') as f:
hosts = f.readlines()
# Generate the config file if not define
if self._args.config is None:
self.gen_traffic_pattern(node_num, self._args.pattern, self.__config_path)
self.gen_traffic_pattern(len(hosts), self._args.pattern, self.__config_path)
# Use the config file defined in args
else:
self.__config_path = self._args.config
# Read the hostfile
with open(self._args.hostfile, 'r') as f:
hosts = f.readlines()
# Read the config file and check if it's empty and valid
with open(self.__config_path, 'r') as f:
lines = f.readlines()
......@@ -240,7 +250,7 @@ def __prepare_general_ib_command_params(self):
Str of ib command params if arguments are valid, otherwise False.
"""
# Format the ib command type
self._args.commands = [command.lower() for command in self._args.commands]
self._args.command = self._args.command.lower()
# Add message size for ib command
msg_size = ''
if self._args.msg_size is None:
......@@ -248,29 +258,20 @@ def __prepare_general_ib_command_params(self):
else:
msg_size = '-s ' + str(self._args.msg_size)
# Add GPUDirect for ib command
gpu_enable = ''
if self._args.gpu_index is not None:
gpu_dev = ''
if self._args.gpu_dev is not None:
gpu = GPU()
if gpu.vendor == 'nvidia':
gpu_enable = ' --use_cuda={gpu_index}'.format(gpu_index=str(self._args.gpu_index))
gpu_dev = f'--use_cuda={self._args.gpu_dev}'
elif gpu.vendor == 'amd':
gpu_enable = ' --use_rocm={gpu_index}'.format(gpu_index=str(self._args.gpu_index))
gpu_dev = f'--use_rocm={self._args.gpu_dev}'
else:
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
logger.error('No GPU found - benchmark: {}'.format(self._name))
return False
# Generate ib command params
try:
command_params = '-F --iters={iter} -d {device} {size}{gpu}'.format(
iter=str(self._args.iters),
device=network.get_ib_devices()[self._args.ib_index].split(':')[0],
size=msg_size,
gpu=gpu_enable
)
except BaseException as e:
self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE)
logger.error('Getting ib devices failure - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False
command_params = f'-F -n {self._args.iters} -d {self._args.ib_dev} {msg_size} {gpu_dev}'
command_params = f'{command_params.strip()} --report_gbits'
return command_params
def _preprocess(self):
......@@ -282,17 +283,8 @@ def _preprocess(self):
if not super()._preprocess():
return False
# Check MPI environment
self._args.pattern = self._args.pattern.lower()
if os.getenv('OMPI_COMM_WORLD_SIZE'):
node_num = int(os.getenv('OMPI_COMM_WORLD_SIZE'))
else:
self._result.set_return_code(ReturnCode.MICROBENCHMARK_MPI_INIT_FAILURE)
logger.error('No MPI environment - benchmark: {}.'.format(self._name))
return False
# Generate and check config
if not self.__prepare_config(node_num):
if not self.__prepare_config():
return False
# Prepare general params for ib commands
......@@ -300,25 +292,24 @@ def _preprocess(self):
if not command_params:
return False
# Generate commands
for ib_command in self._args.commands:
if ib_command not in self.__support_ib_commands:
if self._args.command not in self.__support_ib_commands:
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
logger.error(
'Unsupported ib command - benchmark: {}, command: {}, expected: {}.'.format(
self._name, ib_command, ' '.join(self.__support_ib_commands)
self._name, self._args.command, ' '.join(self.__support_ib_commands)
)
)
return False
else:
ib_command_prefix = '{command} {command_params}'.format(
command=ib_command, command_params=command_params
)
if 'bw' in ib_command and self._args.bidirectional:
ib_command_prefix = f'{os.path.join(self._args.bin_dir, self._args.command)} {command_params}'
if self._args.numa_dev is not None:
ib_command_prefix = f'numactl -N {self._args.numa_dev} {ib_command_prefix}'
if 'bw' in self._args.command and self._args.bidirectional:
ib_command_prefix += ' -b'
command = os.path.join(self._args.bin_dir, self._bin_name)
command += ' --hostfile ' + self._args.hostfile
command += ' --cmd_prefix ' + '\"' + ib_command_prefix + '\"'
command += ' --cmd_prefix ' + "'" + ib_command_prefix + "'"
command += ' --input_config ' + self.__config_path
self._commands.append(command)
......@@ -336,7 +327,7 @@ def _process_raw_result(self, cmd_idx, raw_output): # noqa: C901
Return:
True if the raw output string is valid and result can be extracted.
"""
self._result.add_raw_data('raw_output_' + self._args.commands[cmd_idx], raw_output, self._args.log_raw_data)
self._result.add_raw_data('raw_output_' + self._args.command, raw_output, self._args.log_raw_data)
# If it's invoked by MPI and rank is not 0, no result is expected
if os.getenv('OMPI_COMM_WORLD_RANK'):
......@@ -346,10 +337,8 @@ def _process_raw_result(self, cmd_idx, raw_output): # noqa: C901
valid = False
content = raw_output.splitlines()
line_index = 0
config_index = 0
command = self._args.commands[cmd_idx]
suffix = command.split('_')[-1]
command = self._args.command
try:
result_index = -1
for index, line in enumerate(content):
......@@ -360,25 +349,18 @@ def _process_raw_result(self, cmd_idx, raw_output): # noqa: C901
valid = False
else:
content = content[result_index:]
for line in content:
line = list(filter(None, line.strip().split(',')))
pair_index = 0
for item in line:
metric = '{command}_{line}_{pair}_{host}_{suffix}'.format(
command=command,
line=str(line_index),
pair=pair_index,
host=self.__config[config_index],
suffix=suffix
)
value = float(item)
for line_index, line in enumerate(content):
line_result = list(filter(None, line.strip().split(',')))
for pair_index, pair_result in enumerate(line_result):
rank_results = list(filter(None, pair_result.strip().split(' ')))
for rank_index, rank_result in enumerate(rank_results):
metric = f'{command}_{line_index}_{pair_index}:{self.__config[config_index]}:{rank_index}'
value = float(rank_result)
if 'bw' in command:
value = value / 1000
value = value / 8.0
self._result.add_result(metric, value)
valid = True
config_index += 1
pair_index += 1
line_index += 1
except Exception:
valid = False
if valid is False or config_index != len(self.__config):
......
......@@ -2,7 +2,7 @@
// Licensed under the MIT License.
// IB validation tool is a tool to validate IB traffic of different pattern in multi nodes flexibly
// input
// input:
// cmd_prefix: the prefix of command to run
// input_config: the path of input config file, the format of config file is as the following,
// each row will run in parallel, different rows will run in sequence, each pair is (server index, client index)
......@@ -46,6 +46,8 @@ int g_world_size;
int g_world_rank;
char g_processor_name[MPI_MAX_PROCESSOR_NAME];
int local_size;
// The struct to store command line arguments
struct Args {
// The prefix of command to run
......@@ -68,9 +70,9 @@ void load_args(int argc, char *argv[], Args &args) {
"ib command prefix")(
"input_config,i", boost::program_options::value<std::string>(&args.input_config)->default_value("config.txt"),
"the path of input config file")(
"hostfile,h", boost::program_options::value<std::string>(&args.hostfile)->default_value("/root/hostfile"),
"the path of hostfile")("output_path,o",
boost::program_options::value<std::string>(&args.output_path)->default_value(""),
"hostfile,h", boost::program_options::value<std::string>(&args.hostfile)->default_value("hostfile"),
"the path of hostfile")(
"output_path,o", boost::program_options::value<std::string>(&args.output_path)->default_value("output.csv"),
"custom the path of the output csv file")("help", "print help info");
boost::program_options::variables_map vm;
......@@ -82,7 +84,7 @@ void load_args(int argc, char *argv[], Args &args) {
return;
}
if (g_world_rank == ROOT_RANK) {
printf("The predix of cmd to run is: %s\n", args.cmd_prefix.c_str());
printf("The prefix of cmd to run is: %s\n", args.cmd_prefix.c_str());
printf("Load the config file from: %s\n", args.input_config.c_str());
printf("Output will be saved to: %s\n", args.output_path.c_str());
}
......@@ -113,10 +115,10 @@ vector<vector<std::pair<int, int>>> load_config(string filename = "config.txt")
vector<std::pair<int, int>> run_pairs_in_parallel;
// split line to pair by ";"
boost::split(run_in_parallel, single_line, boost::is_any_of(";"), boost::token_compress_on);
vector<int> s_occurrence(g_world_size, 0), occurrence(g_world_size, 0);
vector<int> s_occurrence(g_world_size / local_size, 0), occurrence(g_world_size / local_size, 0);
for (const auto &pair : run_in_parallel) {
// split pair by ","
int quote = pair.find(',');
size_t quote = pair.find(',');
if (quote == pair.npos) {
throw std::runtime_error("Error: Invalid config format.");
}
......@@ -126,16 +128,16 @@ vector<vector<std::pair<int, int>>> load_config(string filename = "config.txt")
occurrence[first]++;
occurrence[second]++;
s_occurrence[first]++;
// limit the maximum threads of each rank no more than 65535 and server threads no more than 25000 at
// the same time because by default a server can use (32768-60999) ports
if (s_occurrence[first] == SERVER_MAX_THREADS || occurrence[second] == MAX_THREADS ||
occurrence[first] == MAX_THREADS) {
// limit the maximum threads of each node no more than 65535 and server threads no more than 25000 at
// the same time because by default a node can use (32768-60999) ports
if (s_occurrence[first] * local_size >= SERVER_MAX_THREADS ||
occurrence[second] * local_size >= MAX_THREADS || occurrence[first] * local_size >= MAX_THREADS) {
if (g_world_rank == ROOT_RANK)
std::cout << "Warning: split the line due to the limit of maximum threads nums" << std::endl;
run_in_total.emplace_back(run_pairs_in_parallel);
run_pairs_in_parallel.clear();
occurrence.assign(g_world_size, 0);
s_occurrence.assign(g_world_size, 0);
occurrence.assign(g_world_size / local_size, 0);
s_occurrence.assign(g_world_size / local_size, 0);
}
run_pairs_in_parallel.emplace_back(first, second);
}
......@@ -213,25 +215,27 @@ int get_available_port() {
// Get and broadcast ports for each run
vector<int> prepare_ports(const vector<std::pair<int, int>> &run_pairs_in_parallel) {
int pair_count = run_pairs_in_parallel.size();
vector<int> ports(pair_count);
vector<int> ports(run_pairs_in_parallel.size() * local_size);
for (int index = 0; index < run_pairs_in_parallel.size(); index++) {
for (size_t index = 0; index < run_pairs_in_parallel.size(); index++) {
int server_index = run_pairs_in_parallel[index].first;
for (int rank = 0; rank < local_size; rank++) {
// server get a free port and send to rank ROOT_RANK
if (server_index == g_world_rank) {
if (server_index * local_size + rank == g_world_rank) {
int port = get_available_port();
MPI_Send(&port, 1, MPI_INT, ROOT_RANK, index, MPI_COMM_WORLD);
MPI_Send(&port, 1, MPI_INT, ROOT_RANK, index * local_size + rank, MPI_COMM_WORLD);
}
// rank ROOT_RANK recv port from server
if (g_world_rank == ROOT_RANK) {
int port;
MPI_Recv(&port, 1, MPI_INT, server_index, index, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
ports[index] = port;
MPI_Recv(&port, 1, MPI_INT, server_index * local_size + rank, index * local_size + rank, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
ports[index * local_size + rank] = port;
}
}
}
// rank ROOT_RANK broadcast ports to all ranks
MPI_Bcast(ports.data(), pair_count, MPI_INT, ROOT_RANK, MPI_COMM_WORLD);
MPI_Bcast(ports.data(), ports.size(), MPI_INT, ROOT_RANK, MPI_COMM_WORLD);
return ports;
}
......@@ -248,12 +252,13 @@ void gather_hostnames(vector<string> &hostnames, string filename) {
} else {
throw std::runtime_error("Error: Failed to open hostfile.");
}
if (hostnames.size() != g_world_size) {
if (int(hostnames.size()) < g_world_size / local_size) {
throw std::runtime_error("Error: Invalid hostfile.");
}
}
// Parse raw output of ib command
// TODO: does not work latency tests
float process_raw_output(string output) {
float res = -1.0;
try {
......@@ -298,30 +303,29 @@ float run_cmd(string cmd_prefix, int port, bool server, string hostname) {
// The ranks in vector of (server, client) run commands parallel
vector<float> run_cmd_parallel(string cmd_prefix, const vector<std::pair<int, int>> &run_pairs_in_parallel,
const vector<int> &ports, const vector<string> &hostnames) {
int size = run_pairs_in_parallel.size();
// invoke function to run cmd in multi threads mode for each rank in the pairs
unordered_map<int, std::future<float>> threads;
int flag;
for (int index = 0; index < size; index++) {
int server_index = run_pairs_in_parallel[index].first;
int client_index = run_pairs_in_parallel[index].second;
for (size_t index = 0; index < run_pairs_in_parallel.size(); index++) {
for (int rank = 0; rank < local_size; rank++) {
int rank_index = index * local_size + rank,
server_index = run_pairs_in_parallel[index].first * local_size + rank,
client_index = run_pairs_in_parallel[index].second * local_size + rank;
if (server_index == g_world_rank) {
flag = index;
MPI_Send(&flag, 1, MPI_INT, client_index, index + 2 * size, MPI_COMM_WORLD);
threads[2 * index] =
(std::async(std::launch::async, run_cmd, cmd_prefix, ports[index], true, hostnames[server_index]));
}
MPI_Send(&flag, 1, MPI_INT, client_index, rank_index, MPI_COMM_WORLD);
threads[2 * rank_index] = (std::async(std::launch::async, run_cmd, cmd_prefix, ports[rank_index], true,
hostnames[server_index / local_size]));
}
for (int index = 0; index < size; index++) {
int server_index = run_pairs_in_parallel[index].first;
int client_index = run_pairs_in_parallel[index].second;
if (client_index == g_world_rank) {
// in case that client starts before server
MPI_Recv(&flag, 1, MPI_INT, server_index, index + 2 * size, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
threads[2 * index + 1] =
(std::async(std::launch::async, run_cmd, cmd_prefix, ports[index], false, hostnames[server_index]));
MPI_Recv(&flag, 1, MPI_INT, server_index, rank_index, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
threads[2 * rank_index + 1] = (std::async(std::launch::async, run_cmd, cmd_prefix, ports[rank_index],
false, hostnames[server_index / local_size]));
}
}
}
// send the result of client to rank ROOT_RANK
for (auto &thread : threads) {
std::future_status status;
......@@ -341,10 +345,14 @@ vector<float> run_cmd_parallel(string cmd_prefix, const vector<std::pair<int, in
// rank ROOT_RANK recv results
vector<float> results;
if (g_world_rank == ROOT_RANK) {
results.resize(size);
for (int index = 0; index < size; index++) {
int client_index = run_pairs_in_parallel[index].second;
MPI_Recv(&results[index], 1, MPI_INT, client_index, index * 2 + 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
results.resize(run_pairs_in_parallel.size() * local_size);
for (size_t index = 0; index < run_pairs_in_parallel.size(); index++) {
for (int rank = 0; rank < local_size; rank++) {
int rank_index = index * local_size + rank,
client_index = run_pairs_in_parallel[index].second * local_size + rank;
MPI_Recv(&results[rank_index], 1, MPI_FLOAT, client_index, 2 * rank_index + 1, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
}
}
return results;
......@@ -366,9 +374,9 @@ vector<vector<float>> run_benchmark(const Args &args, vector<vector<std::pair<in
// output the results to stdout of ROOT_RANK by default
if (g_world_rank == ROOT_RANK) {
std::cout << "results from rank ROOT_RANK: " << std::endl;
for (vector<float> results_single_line : results) {
for (float res : results_single_line) {
std::cout << res << ",";
for (vector<float> line : results) {
for (size_t i = 0; i < line.size(); i++) {
std::cout << line[i] << ((i + 1) % local_size ? " " : ",");
}
std::cout << endl;
}
......@@ -383,7 +391,7 @@ void output_to_file(const std::string cmd_prefix, const vector<vector<std::pair<
if (!out) {
throw std::runtime_error("Error: failed to open output file.");
}
// output command predix
// output command prefix
out << "command prefix: " << cmd_prefix << std::endl;
// output config file contents
out << "config:" << std::endl;
......@@ -396,8 +404,8 @@ void output_to_file(const std::string cmd_prefix, const vector<vector<std::pair<
// output results
out << "results:" << std::endl;
for (auto &line : results) {
for (int i = 0; i < line.size(); i++) {
out << line[i] << ",";
for (size_t i = 0; i < line.size(); i++) {
out << line[i] << ((i + 1) % local_size ? " " : ",");
}
out << std::endl;
}
......@@ -423,6 +431,18 @@ int main(int argc, char **argv) {
Args args;
load_args(argc, argv, args);
// Handle local size and rank
#if defined(OPEN_MPI)
local_size = atoi(getenv("OMPI_COMM_WORLD_LOCAL_SIZE"));
boost::replace_all(args.cmd_prefix, "LOCAL_RANK", "OMPI_COMM_WORLD_LOCAL_RANK");
#elif defined(MPICH)
local_size = atoi(getenv("MPI_LOCALNRANKS"));
boost::replace_all(args.cmd_prefix, "LOCAL_RANK", "MPI_LOCALRANKID");
#else
local_size = atoi(getenv("LOCAL_SIZE"));
std::cout << "Warning: unknown mpi used." << std::endl;
#endif
// Load and parse running config from file
vector<vector<std::pair<int, int>>> config = load_config(args.input_config);
......
......@@ -163,7 +163,12 @@ superbench:
enable: false
modes:
- name: mpi
proc_num: 1
proc_num: 8
parameters:
msg_size: 8388608
ib_dev: mlx5_$LOCAL_RANK
gpu_dev: $LOCAL_RANK
numa_dev: $((LOCAL_RANK/2))
gpcnet-network-test:
enable: false
modes:
......
......@@ -169,7 +169,12 @@ superbench:
enable: false
modes:
- name: mpi
proc_num: 1
proc_num: 8
parameters:
msg_size: 8388608
ib_dev: mlx5_$LOCAL_RANK
gpu_dev: $LOCAL_RANK
numa_dev: $((LOCAL_RANK/2))
gpcnet-network-test:
enable: false
modes:
......
......@@ -154,7 +154,12 @@ superbench:
enable: false
modes:
- name: mpi
proc_num: 1
proc_num: 8
parameters:
msg_size: 8388608
ib_dev: mlx5_$LOCAL_RANK
gpu_dev: $LOCAL_RANK
numa_dev: $((LOCAL_RANK/2))
gpcnet-network-test:
enable: false
modes:
......
......@@ -22,10 +22,11 @@
vars:
workspace: '{{ ansible_user_dir }}/sb-workspace'
container: sb-workspace
skip_docker: '{{ no_docker | default(false) }}'
sb_nodes: '{{ hostvars.values() | map(attribute="ansible_hostname") | sort }}'
sb_env: |
# sb env
HOST_WS={{ ansible_user_dir }}/sb-workspace
SB_WORKSPACE='{{ workspace if skip_docker else "/root" }}'
# pytorch env
NNODES={{ sb_nodes | length }}
NODE_RANK={{ lookup('ansible.utils.index_of', sb_nodes, 'eq', ansible_hostname) }}
......
......@@ -398,7 +398,7 @@ def _run_proc(self, benchmark_name, mode, vars):
fcmd = "docker exec {env_list} sb-workspace bash -c '{command}'"
if self._docker_config.skip:
fcmd = "bash -c '{env_list} && cd $HOST_WS && {command}'"
fcmd = "bash -c '{env_list} && cd $SB_WORKSPACE && {command}'"
ansible_runner_config = self._ansible_client.get_shell_config(
fcmd.format(env_list=env_list, command=self.__get_mode_command(benchmark_name, mode, timeout))
)
......
......@@ -26,7 +26,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
"""Hook method for deconstructing the class fixture after running all tests in the class."""
p = Path('hostfile')
for p in [Path('hostfile'), Path('config.txt')]:
if p.is_file():
p.unlink()
super().tearDownClass()
......@@ -118,8 +118,7 @@ def read_config(filename):
Path(test_config_file).unlink()
@mock.patch('superbench.common.devices.GPU.vendor', new_callable=mock.PropertyMock)
@mock.patch('superbench.common.utils.network.get_ib_devices')
def test_ib_traffic_performance(self, mock_ib_devices, mock_gpu):
def test_ib_traffic_performance(self, mock_gpu):
"""Test ib-traffic benchmark."""
# Test without ib devices
# Check registry.
......@@ -130,58 +129,50 @@ def test_ib_traffic_performance(self, mock_ib_devices, mock_gpu):
# Check preprocess
# Negative cases
parameters = '--ib_index 0 --iters 2000 --pattern one-to-one --hostfile hostfile'
parameters = '--ib_dev 0 --iters 2000 --pattern one-to-one --hostfile hostfile'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
mock_ib_devices.return_value = None
ret = benchmark._preprocess()
assert (ret is False)
assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_MPI_INIT_FAILURE)
# no hostfile
assert (benchmark.return_code == ReturnCode.INVALID_ARGUMENT)
hosts = ['node0\n', 'node1\n', 'node2\n', 'node3\n']
with open('hostfile', 'w') as f:
f.writelines(hosts)
os.environ['OMPI_COMM_WORLD_SIZE'] = '4'
parameters = '--ib_index 0 --iters 2000 --pattern one-to-one --hostfile hostfile'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
mock_ib_devices.return_value = None
ret = benchmark._preprocess()
assert (ret is False)
assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE)
# Positive cases
os.environ['OMPI_COMM_WORLD_SIZE'] = '3'
parameters = '--ib_index 0 --iters 2000 --pattern one-to-one --hostfile hostfile'
parameters = '--ib_dev 0 --iters 2000 --pattern one-to-one --hostfile hostfile'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
mock_ib_devices.return_value = ['mlx5_0']
ret = benchmark._preprocess()
assert (ret is True)
# Generate config
parameters = '--ib_index 0 --iters 2000 --msg_size 33554432 --hostfile hostfile'
parameters = '--ib_dev mlx5_0 --iters 2000 --msg_size 33554432 --hostfile hostfile'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
os.environ['OMPI_COMM_WORLD_SIZE'] = '4'
mock_ib_devices.return_value = ['mlx5_0']
ret = benchmark._preprocess()
Path('config.txt').unlink()
assert (ret)
expect_command = 'ib_validation --hostfile hostfile --cmd_prefix "ib_write_bw -F ' + \
'--iters=2000 -d mlx5_0 -s 33554432" --input_config ' + os.getcwd() + '/config.txt'
expect_command = "ib_validation --hostfile hostfile --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits' --input_config " + \
os.getcwd() + '/config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
parameters = '--ib_index 0 --iters 2000 --pattern one-to-one --hostfile hostfile --gpu_index 0'
parameters = '--ib_dev mlx5_0 --iters 2000 --pattern one-to-one --hostfile hostfile --gpu_dev 0'
mock_gpu.return_value = 'nvidia'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
ret = benchmark._preprocess()
expect_command = 'ib_validation --hostfile hostfile --cmd_prefix "ib_write_bw -F ' + \
'--iters=2000 -d mlx5_0 -a --use_cuda=0" --input_config ' + os.getcwd() + '/config.txt'
expect_command = "ib_validation --hostfile hostfile --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -a --use_cuda=0 --report_gbits' --input_config " + \
os.getcwd() + '/config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
mock_gpu.return_value = 'amd'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
ret = benchmark._preprocess()
expect_command = 'ib_validation --hostfile hostfile --cmd_prefix "ib_write_bw -F ' + \
'--iters=2000 -d mlx5_0 -a --use_rocm=0" --input_config ' + os.getcwd() + '/config.txt'
expect_command = expect_command.replace('cuda', 'rocm')
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
......@@ -190,20 +181,19 @@ def test_ib_traffic_performance(self, mock_ib_devices, mock_gpu):
with open('test_config.txt', 'w') as f:
for line in config:
f.write(line + '\n')
parameters = '--ib_index 0 --iters 2000 --msg_size 33554432 --config test_config.txt --hostfile hostfile'
parameters = '--ib_dev mlx5_0 --iters 2000 --msg_size 33554432 --config test_config.txt --hostfile hostfile'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
os.environ['OMPI_COMM_WORLD_SIZE'] = '2'
mock_ib_devices.return_value = ['mlx5_0']
ret = benchmark._preprocess()
Path('test_config.txt').unlink()
assert (ret)
expect_command = 'ib_validation --hostfile hostfile --cmd_prefix "ib_write_bw -F ' + \
'--iters=2000 -d mlx5_0 -s 33554432" --input_config test_config.txt'
expect_command = "ib_validation --hostfile hostfile --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits' --input_config test_config.txt"
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
raw_output_0 = """
The predix of cmd to run is: ib_write_bw -a -d ibP257p0s0
The prefix of cmd to run is: ib_write_bw -a -d ibP257p0s0
Load the config file from: config.txt
Output will be saved to:
config:
......@@ -219,7 +209,7 @@ def test_ib_traffic_performance(self, mock_ib_devices, mock_gpu):
23435.3,22766.5
"""
raw_output_1 = """
The predix of cmd to run is: ib_write_bw -F --iters=2000 -d mlx5_0 -s 33554432
The prefix of cmd to run is: ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432
Load the config file from: config.txt
Output will be saved to:
config:
......@@ -263,7 +253,7 @@ def test_ib_traffic_performance(self, mock_ib_devices, mock_gpu):
assert (benchmark._bin_name == 'ib_validation')
# Check parameters specified in BenchmarkContext.
assert (benchmark._args.ib_index == 0)
assert (benchmark._args.ib_dev == 'mlx5_0')
assert (benchmark._args.iters == 2000)
assert (benchmark._args.msg_size == 33554432)
assert (benchmark._args.commands == ['ib_write_bw'])
assert (benchmark._args.command == 'ib_write_bw')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment