Unverified Commit b2875179 authored by Yifan Xiong's avatar Yifan Xiong Committed by GitHub
Browse files

Fix issues in ib validation benchmark (#370)

Fix several issues in ib validation benchmark:
* continue running when timeout in the middle, instead of aborting whole mpi process
* make timeout parameter configurable, set default to 120 seconds
* avoid mixture of stdio and iostream when print to stdout
* set default message size to 8M which will saturate ib in most cases
* fix hostfile path issue so that it can be auto found in different cases
parent e00a8180
......@@ -55,6 +55,13 @@ def add_parser_arguments(self):
required=False,
help='The NUMA node to bind, e.g., 0, $LOCAL_RANK, $((LOCAL_RANK/2)), etc.',
)
self._parser.add_argument(
'--timeout',
type=int,
default=120,
required=False,
help='Timeout in seconds for each perftest command in case ib is too slow.',
)
# perftest configurations
self._parser.add_argument(
'--iters',
......@@ -66,7 +73,7 @@ def add_parser_arguments(self):
self._parser.add_argument(
'--msg_size',
type=int,
default=None,
default=8388608,
required=False,
help='The message size of perftest command, e.g., 8388608.',
)
......@@ -252,11 +259,7 @@ def __prepare_general_ib_command_params(self):
# Format the ib command type
self._args.command = self._args.command.lower()
# Add message size for ib command
msg_size = ''
if self._args.msg_size is None:
msg_size = '-a'
else:
msg_size = '-s ' + str(self._args.msg_size)
msg_size = f'-s {self._args.msg_size}' if self._args.msg_size > 0 else '-a'
# Add GPUDirect for ib command
gpu_dev = ''
if self._args.gpu_dev is not None:
......@@ -308,9 +311,9 @@ def _preprocess(self):
ib_command_prefix += ' -b'
command = os.path.join(self._args.bin_dir, self._bin_name)
command += ' --hostfile ' + self._args.hostfile
command += ' --cmd_prefix ' + "'" + ib_command_prefix + "'"
command += ' --input_config ' + self.__config_path
command += f' --timeout {self._args.timeout} ' + \
f'--hostfile {self._args.hostfile} --input_config {self.__config_path}'
self._commands.append(command)
return True
......
......@@ -20,8 +20,6 @@
#include <iostream>
#include <regex>
#include <stdexcept>
#include <stdio.h>
#include <string.h>
#include <string>
#include <thread>
#include <unordered_map>
......@@ -50,6 +48,8 @@ int local_size;
// The struct to store command line arguments
struct Args {
// Timeout for each command
int timeout;
// The prefix of command to run
std::string cmd_prefix;
// The path of input config file
......@@ -64,7 +64,8 @@ struct Args {
void load_args(int argc, char *argv[], Args &args) {
// Get and parse command line arguments
boost::program_options::options_description opt("all options");
opt.add_options()(
opt.add_options()("timeout,t", boost::program_options::value<int>(&args.timeout)->default_value(120),
"timeout of each command")(
"cmd_prefix,c",
boost::program_options::value<std::string>(&args.cmd_prefix)->default_value("ib_write_bw -s 33554432 -d ib0"),
"ib command prefix")(
......@@ -84,9 +85,10 @@ void load_args(int argc, char *argv[], Args &args) {
return;
}
if (g_world_rank == ROOT_RANK) {
printf("The prefix of cmd to run is: %s\n", args.cmd_prefix.c_str());
printf("Load the config file from: %s\n", args.input_config.c_str());
printf("Output will be saved to: %s\n", args.output_path.c_str());
std::cout << "Timeout for each command is: " << args.timeout << std::endl;
std::cout << "The prefix of cmd to run is: " << args.cmd_prefix << std::endl;
std::cout << "Load the config file from: " << args.input_config << std::endl;
std::cout << "Output will be saved to: " << args.output_path << std::endl;
}
}
......@@ -278,7 +280,7 @@ float process_raw_output(string output) {
}
// Run ib command on server/client with server hostname
float run_cmd(string cmd_prefix, int port, bool server, string hostname) {
float run_cmd(string cmd_prefix, int timeout, int port, bool server, string hostname) {
// client sleep 1s in case that client starts before server
if (!server) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
......@@ -287,7 +289,7 @@ float run_cmd(string cmd_prefix, int port, bool server, string hostname) {
string command, output;
try {
// exec command in termimal
command = cmd_prefix + " -p " + to_string(port);
command = "timeout " + to_string(timeout) + " " + cmd_prefix + " -p " + to_string(port);
command = server ? command : command + " " + hostname;
output = exec(command.c_str());
} catch (const std::exception &e) {
......@@ -301,7 +303,7 @@ float run_cmd(string cmd_prefix, int port, bool server, string hostname) {
}
// The ranks in vector of (server, client) run commands parallel
vector<float> run_cmd_parallel(string cmd_prefix, const vector<std::pair<int, int>> &run_pairs_in_parallel,
vector<float> run_cmd_parallel(string cmd_prefix, int timeout, const vector<std::pair<int, int>> &run_pairs_in_parallel,
const vector<int> &ports, const vector<string> &hostnames) {
// invoke function to run cmd in multi threads mode for each rank in the pairs
unordered_map<int, std::future<float>> threads;
......@@ -314,14 +316,15 @@ vector<float> run_cmd_parallel(string cmd_prefix, const vector<std::pair<int, in
if (server_index == g_world_rank) {
flag = index;
MPI_Send(&flag, 1, MPI_INT, client_index, rank_index, MPI_COMM_WORLD);
threads[2 * rank_index] = (std::async(std::launch::async, run_cmd, cmd_prefix, ports[rank_index], true,
hostnames[server_index / local_size]));
threads[2 * rank_index] = (std::async(std::launch::async, run_cmd, cmd_prefix, timeout,
ports[rank_index], true, hostnames[server_index / local_size]));
}
if (client_index == g_world_rank) {
// in case that client starts before server
MPI_Recv(&flag, 1, MPI_INT, server_index, rank_index, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
threads[2 * rank_index + 1] = (std::async(std::launch::async, run_cmd, cmd_prefix, ports[rank_index],
false, hostnames[server_index / local_size]));
threads[2 * rank_index + 1] =
(std::async(std::launch::async, run_cmd, cmd_prefix, timeout, ports[rank_index], false,
hostnames[server_index / local_size]));
}
}
}
......@@ -330,10 +333,9 @@ vector<float> run_cmd_parallel(string cmd_prefix, const vector<std::pair<int, in
for (auto &thread : threads) {
std::future_status status;
float client_result = -1.0;
status = thread.second.wait_for(std::chrono::seconds(300));
status = thread.second.wait_for(std::chrono::seconds(timeout));
if (status == std::future_status::timeout) {
cout << "Error: thread timeout" << endl;
throw "Error: thread timeout";
std::cout << "Error: thread timeout" << std::endl;
} else if (status == std::future_status::ready) {
client_result = thread.second.get();
}
......@@ -367,7 +369,7 @@ vector<vector<float>> run_benchmark(const Args &args, vector<vector<std::pair<in
// Insert barrier to sync before each run
MPI_Barrier(MPI_COMM_WORLD);
// run commands parallel for single line of config
vector<float> results_single_line = run_cmd_parallel(args.cmd_prefix, line, ports, hostnames);
vector<float> results_single_line = run_cmd_parallel(args.cmd_prefix, args.timeout, line, ports, hostnames);
// collect results for each run
results.push_back(results_single_line);
}
......
......@@ -26,7 +26,7 @@
sb_nodes: '{{ hostvars.values() | map(attribute="ansible_hostname") | sort }}'
sb_env: |
# sb env
SB_WORKSPACE='{{ workspace if skip_docker else "/root" }}'
SB_WORKSPACE={{ workspace if skip_docker else '/root' }}
# pytorch env
NNODES={{ sb_nodes | length }}
NODE_RANK={{ lookup('ansible.utils.index_of', sb_nodes, 'eq', ansible_hostname) }}
......
......@@ -86,7 +86,7 @@ def __validate_sb_config(self): # noqa: C901
'btl_tcp_if_exclude': 'lo,docker0',
'coll_hcoll_enable': 0,
}
for key in ['PATH', 'LD_LIBRARY_PATH', 'SB_MICRO_PATH']:
for key in ['PATH', 'LD_LIBRARY_PATH', 'SB_MICRO_PATH', 'SB_WORKSPACE']:
self._sb_benchmarks[name].modes[idx].env.setdefault(key, None)
def __get_enabled_benchmarks(self):
......
......@@ -140,6 +140,12 @@ def test_ib_traffic_performance(self, mock_gpu):
with open('hostfile', 'w') as f:
f.writelines(hosts)
parameters = '--ib_dev 0 --msg_size invalid --pattern one-to-one --hostfile hostfile'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
ret = benchmark._preprocess()
assert (ret is False)
assert (benchmark.return_code == ReturnCode.INVALID_ARGUMENT)
# Positive cases
os.environ['OMPI_COMM_WORLD_SIZE'] = '3'
parameters = '--ib_dev 0 --iters 2000 --pattern one-to-one --hostfile hostfile'
......@@ -154,19 +160,19 @@ def test_ib_traffic_performance(self, mock_gpu):
ret = benchmark._preprocess()
Path('config.txt').unlink()
assert (ret)
expect_command = "ib_validation --hostfile hostfile --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits' --input_config " + \
os.getcwd() + '/config.txt'
expect_command = "ib_validation --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits' " + \
f'--timeout 120 --hostfile hostfile --input_config {os.getcwd()}/config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
parameters = '--ib_dev mlx5_0 --iters 2000 --pattern one-to-one --hostfile hostfile --gpu_dev 0'
parameters = '--ib_dev mlx5_0 --msg_size 0 --iters 2000 --pattern one-to-one --hostfile hostfile --gpu_dev 0'
mock_gpu.return_value = 'nvidia'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
ret = benchmark._preprocess()
expect_command = "ib_validation --hostfile hostfile --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -a --use_cuda=0 --report_gbits' --input_config " + \
os.getcwd() + '/config.txt'
expect_command = "ib_validation --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -a --use_cuda=0 --report_gbits' " + \
f'--timeout 120 --hostfile hostfile --input_config {os.getcwd()}/config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
mock_gpu.return_value = 'amd'
......@@ -181,14 +187,16 @@ def test_ib_traffic_performance(self, mock_gpu):
with open('test_config.txt', 'w') as f:
for line in config:
f.write(line + '\n')
parameters = '--ib_dev mlx5_0 --iters 2000 --msg_size 33554432 --config test_config.txt --hostfile hostfile'
parameters = '--ib_dev mlx5_0 --timeout 180 --iters 2000 --msg_size 33554432 ' + \
'--config test_config.txt --hostfile hostfile'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
os.environ['OMPI_COMM_WORLD_SIZE'] = '2'
ret = benchmark._preprocess()
Path('test_config.txt').unlink()
assert (ret)
expect_command = "ib_validation --hostfile hostfile --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits' --input_config test_config.txt"
expect_command = "ib_validation --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits' " + \
'--timeout 180 --hostfile hostfile --input_config test_config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment