Unverified Commit 352ae0c9 authored by Yifan Xiong's avatar Yifan Xiong Committed by GitHub
Browse files

Fix port conflict in ib loopback (#375)

Fix potential port conflict due to race condition between time-to-check
to time-to-use, by binding the port all through.

Modify the function to resolve flake8 C901 while keeping the logic same.
parent 16b6385d
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
"""Module of the IB loopback benchmarks.""" """Module of the IB loopback benchmarks."""
import os import os
import socket
from pathlib import Path from pathlib import Path
from superbench.common.utils import logger from superbench.common.utils import logger
...@@ -47,8 +48,14 @@ def __init__(self, name, parameters=''): ...@@ -47,8 +48,14 @@ def __init__(self, name, parameters=''):
super().__init__(name, parameters) super().__init__(name, parameters)
self._bin_name = 'run_perftest_loopback' self._bin_name = 'run_perftest_loopback'
self.__sock_fds = []
self.__support_ib_commands = {'write': 'ib_write_bw', 'read': 'ib_read_bw', 'send': 'ib_send_bw'} self.__support_ib_commands = {'write': 'ib_write_bw', 'read': 'ib_read_bw', 'send': 'ib_send_bw'}
def __del__(self):
"""Destructor."""
for fd in self.__sock_fds:
fd.close()
def add_parser_arguments(self): def add_parser_arguments(self):
"""Add the specified arguments.""" """Add the specified arguments."""
super().add_parser_arguments() super().add_parser_arguments()
...@@ -121,10 +128,7 @@ def _preprocess(self): ...@@ -121,10 +128,7 @@ def _preprocess(self):
Return: Return:
True if _preprocess() succeed. True if _preprocess() succeed.
""" """
if not super()._preprocess(): if not super()._preprocess() or not self.__get_arguments_from_env():
return False
if not self.__get_arguments_from_env():
return False return False
# Format the arguments # Format the arguments
...@@ -146,33 +150,39 @@ def _preprocess(self): ...@@ -146,33 +150,39 @@ def _preprocess(self):
) )
) )
return False return False
else:
try: try:
command = os.path.join(self._args.bin_dir, self._bin_name) self.__sock_fds.append(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
numa_cores = get_numa_cores(self._args.numa) # grep SO_REUSE /usr/include/asm-generic/socket.h
if len(numa_cores) < 2: self.__sock_fds[-1].setsockopt(socket.SOL_SOCKET, getattr(socket, 'SO_REUSEADDR', 2), 1)
self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) self.__sock_fds[-1].setsockopt(socket.SOL_SOCKET, getattr(socket, 'SO_REUSEPORT', 15), 1)
logger.error('Getting numa core devices failure - benchmark: {}.'.format(self._name)) self.__sock_fds[-1].bind(('127.0.0.1', 0))
return False except OSError as e:
if len(numa_cores) >= 4: self._result.set_return_code(ReturnCode.RUNTIME_EXCEPTION_ERROR)
server_core = int(numa_cores[-1]) logger.error('Error when binding port - benchmark: %s, message: %s.', self._name, e)
client_core = int(numa_cores[-3]) return False
else: try:
server_core = int(numa_cores[-1]) ib_devices = network.get_ib_devices()
client_core = int(numa_cores[-2]) except BaseException as e:
command += ' ' + str(server_core) + ' ' + str(client_core) self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE)
command += ' ' + os.path.join(self._args.bin_dir, self.__support_ib_commands[ib_command]) logger.error('Getting ib devices failure - benchmark: {}, message: {}.'.format(self._name, str(e)))
command += command_mode + ' -F' return False
command += ' --iters=' + str(self._args.iters) numa_cores = get_numa_cores(self._args.numa)
command += ' -d ' + network.get_ib_devices()[self._args.ib_index].split(':')[0] if not numa_cores or len(numa_cores) < 2:
command += ' -p ' + str(network.get_free_port()) self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE)
command += ' -x ' + str(self._args.gid_index) logger.error('Getting numa core devices failure - benchmark: {}.'.format(self._name))
command += ' --report_gbits' return False
self._commands.append(command) command = os.path.join(self._args.bin_dir, self._bin_name)
except BaseException as e: command += ' ' + str(numa_cores[-1]) + ' ' + str(numa_cores[-3 + int((len(numa_cores) < 4))])
self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) command += ' ' + os.path.join(self._args.bin_dir, self.__support_ib_commands[ib_command])
logger.error('Getting ib devices failure - benchmark: {}, message: {}.'.format(self._name, str(e))) command += command_mode + ' -F'
return False command += ' --iters=' + str(self._args.iters)
command += ' -d ' + ib_devices[self._args.ib_index].split(':')[0]
command += ' -p ' + str(self.__sock_fds[-1].getsockname()[1])
command += ' -x ' + str(self._args.gid_index)
command += ' --report_gbits'
self._commands.append(command)
return True return True
def _process_raw_result(self, cmd_idx, raw_output): def _process_raw_result(self, cmd_idx, raw_output):
......
...@@ -37,10 +37,9 @@ def test_ib_loopback_util(self): ...@@ -37,10 +37,9 @@ def test_ib_loopback_util(self):
assert (isinstance(numa_cores[i], numbers.Number)) assert (isinstance(numa_cores[i], numbers.Number))
@decorator.load_data('tests/data/ib_loopback_all_sizes.log') @decorator.load_data('tests/data/ib_loopback_all_sizes.log')
@mock.patch('superbench.common.utils.network.get_free_port')
@mock.patch('superbench.benchmarks.micro_benchmarks.ib_loopback_performance.get_numa_cores') @mock.patch('superbench.benchmarks.micro_benchmarks.ib_loopback_performance.get_numa_cores')
@mock.patch('superbench.common.utils.network.get_ib_devices') @mock.patch('superbench.common.utils.network.get_ib_devices')
def test_ib_loopback_all_sizes(self, raw_output, mock_ib_devices, mock_numa_cores, mock_port): def test_ib_loopback_all_sizes(self, raw_output, mock_ib_devices, mock_numa_cores):
"""Test ib-loopback benchmark for all sizes.""" """Test ib-loopback benchmark for all sizes."""
# Test without ib devices # Test without ib devices
# Check registry. # Check registry.
...@@ -69,15 +68,15 @@ def test_ib_loopback_all_sizes(self, raw_output, mock_ib_devices, mock_numa_core ...@@ -69,15 +68,15 @@ def test_ib_loopback_all_sizes(self, raw_output, mock_ib_devices, mock_numa_core
mock_ib_devices.return_value = ['mlx5_0'] mock_ib_devices.return_value = ['mlx5_0']
mock_numa_cores.return_value = [0, 1, 2, 3] mock_numa_cores.return_value = [0, 1, 2, 3]
mock_port.return_value = 10000
os.environ['PROC_RANK'] = '0' os.environ['PROC_RANK'] = '0'
os.environ['IB_DEVICES'] = '0,2,4,6' os.environ['IB_DEVICES'] = '0,2,4,6'
os.environ['NUMA_NODES'] = '1,0,3,2' os.environ['NUMA_NODES'] = '1,0,3,2'
ret = benchmark._preprocess() ret = benchmark._preprocess()
assert (ret) assert (ret)
port = benchmark._IBLoopbackBenchmark__sock_fds[-1].getsockname()[1]
expect_command = 'run_perftest_loopback 3 1 ' + benchmark._args.bin_dir + \ expect_command = 'run_perftest_loopback 3 1 ' + benchmark._args.bin_dir + \
'/ib_write_bw -a -F --iters=2000 -d mlx5_0 -p 10000 -x 0 --report_gbits' f'/ib_write_bw -a -F --iters=2000 -d mlx5_0 -p {port} -x 0 --report_gbits'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1] command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command) assert (command == expect_command)
...@@ -110,10 +109,9 @@ def test_ib_loopback_all_sizes(self, raw_output, mock_ib_devices, mock_numa_core ...@@ -110,10 +109,9 @@ def test_ib_loopback_all_sizes(self, raw_output, mock_ib_devices, mock_numa_core
assert (benchmark._args.commands == ['write']) assert (benchmark._args.commands == ['write'])
@decorator.load_data('tests/data/ib_loopback_8M_size.log') @decorator.load_data('tests/data/ib_loopback_8M_size.log')
@mock.patch('superbench.common.utils.network.get_free_port')
@mock.patch('superbench.benchmarks.micro_benchmarks.ib_loopback_performance.get_numa_cores') @mock.patch('superbench.benchmarks.micro_benchmarks.ib_loopback_performance.get_numa_cores')
@mock.patch('superbench.common.utils.network.get_ib_devices') @mock.patch('superbench.common.utils.network.get_ib_devices')
def test_ib_loopback_8M_size(self, raw_output, mock_ib_devices, mock_numa_cores, mock_port): def test_ib_loopback_8M_size(self, raw_output, mock_ib_devices, mock_numa_cores):
"""Test ib-loopback benchmark for 8M size.""" """Test ib-loopback benchmark for 8M size."""
# Test without ib devices # Test without ib devices
# Check registry. # Check registry.
...@@ -142,12 +140,12 @@ def test_ib_loopback_8M_size(self, raw_output, mock_ib_devices, mock_numa_cores, ...@@ -142,12 +140,12 @@ def test_ib_loopback_8M_size(self, raw_output, mock_ib_devices, mock_numa_cores,
mock_ib_devices.return_value = ['mlx5_0'] mock_ib_devices.return_value = ['mlx5_0']
mock_numa_cores.return_value = [0, 1, 2, 3] mock_numa_cores.return_value = [0, 1, 2, 3]
mock_port.return_value = 10000
ret = benchmark._preprocess() ret = benchmark._preprocess()
assert (ret) assert (ret)
port = benchmark._IBLoopbackBenchmark__sock_fds[-1].getsockname()[1]
expect_command = 'run_perftest_loopback 3 1 ' + benchmark._args.bin_dir + \ expect_command = 'run_perftest_loopback 3 1 ' + benchmark._args.bin_dir + \
'/ib_write_bw -s 8388608 -F --iters=2000 -d mlx5_0 -p 10000 -x 0 --report_gbits' f'/ib_write_bw -s 8388608 -F --iters=2000 -d mlx5_0 -p {port} -x 0 --report_gbits'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1] command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command) assert (command == expect_command)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment