Unverified Commit 719a427f authored by Ziyue Yang's avatar Ziyue Yang Committed by GitHub
Browse files

Benchmarks: Microbenchmark - Add distributed inference benchmark cpp implementation (#586)

**Description**
Add distributed inference benchmark cpp implementation.
parent 1f5031bd
......@@ -418,7 +418,7 @@ Test the performance of large scale matmul operation with multiple GPUs:
#### Introduction
Test the performance of distributed model inference.
Test the performance of distributed model inference. Support both PyTorch implementation and cpp implementation.
#### Metrics
......
......@@ -12,7 +12,7 @@
from superbench.common.utils import logger
from superbench.benchmarks import DistributedImpl, DistributedBackend, BenchmarkRegistry, ReturnCode, Precision
from superbench.benchmarks.micro_benchmarks import MicroBenchmark
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke
from superbench.benchmarks.context import Enum
from superbench.benchmarks.reducer import ReduceType
......@@ -168,7 +168,7 @@ def forward(self, x):
return activation_out
class DistInference(MicroBenchmark):
class DistInference(MicroBenchmarkWithInvoke):
"""The base class of micro-benchmarks."""
def __init__(self, name, parameters=''):
"""Constructor.
......@@ -182,7 +182,9 @@ def __init__(self, name, parameters=''):
self.__local_rank = 0
torch.backends.cudnn.benchmark = True
self.__device = None
self.__cuda_available = False
# For cpp impl path
self._bin_name = 'dist_inference'
def __timer(self):
"""Returns the current time which ensures all previous CUDA events have been finished.
......@@ -193,7 +195,6 @@ def __timer(self):
Return:
Current time in second.
"""
if self.__cuda_available:
torch.cuda.synchronize()
return time.time()
......@@ -201,6 +202,12 @@ def add_parser_arguments(self):
"""Add the specified arguments."""
super().add_parser_arguments()
self._parser.add_argument(
'--use_pytorch',
action='store_true',
required=False,
help='Whether to use pytorch implementation. If not, cpp implementation will be used.',
)
self._parser.add_argument(
'--batch_size',
type=int,
......@@ -222,6 +229,20 @@ def add_parser_arguments(self):
required=False,
help='Hidden size.',
)
self._parser.add_argument(
'--alpha',
type=float,
default=1.0,
required=False,
help='Coefficient alpha in D = alpha*(A*B) + beta*(C).',
)
self._parser.add_argument(
'--beta',
type=float,
default=1.0,
required=False,
help='Coefficient beta in D = alpha*(A*B) + beta*(C).',
)
self._parser.add_argument(
'--num_layers',
type=int,
......@@ -285,6 +306,12 @@ def add_parser_arguments(self):
required=False,
help='Distributed backends. E.g. {}.'.format(' '.join(DistributedBackend.get_values())),
)
self._parser.add_argument(
'--use_cuda_graph',
action='store_true',
required=False,
help='Whether to launch kernels in CUDA graph mode.',
)
def _preprocess(self):
"""Preprocess/preparation operations before the benchmarking.
......@@ -295,32 +322,41 @@ def _preprocess(self):
if not super()._preprocess():
return False
if self._args.use_pytorch:
# Initialize PyTorch if pytorch impl path
if self._args.distributed_impl != DistributedImpl.DDP:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
logger.error(
return self._set_error_code_and_print_error_msg(
ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE,
'Unsupported distributed implementation - model: {}, distributed implementation: {}.'.format(
self._name, self._args.distributed_impl
)
)
return False
try:
torch.distributed.init_process_group(backend=self._args.distributed_backend.value)
self.__world_size = int(os.environ['WORLD_SIZE'])
self.__local_rank = int(os.environ['LOCAL_RANK'])
assert (torch.cuda.is_available())
except BaseException as e:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
torch.distributed.destroy_process_group()
logger.error('Initialize distributed env failed - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False
return self._set_error_code_and_print_error_msg(
ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE,
'Initialize distributed env failed - benchmark: {}, message: {}.'.format(self._name, str(e))
)
if torch.cuda.is_available():
torch.cuda.set_device(self.__local_rank)
self.__device = torch.device('cuda:{}'.format(self.__local_rank))
self.__cuda_available = True
else:
self.__device = torch.device('cpu:{}'.format(self.__local_rank))
self.__cuda_available = False
# Assemble commands if cpp impl path
self.__bin_path = os.path.join(self._args.bin_dir, self._bin_name)
args = '-m %d -n %d -k %d' % (self._args.hidden_size, self._args.batch_size, self._args.input_size)
args += ' --alpha %g --beta %g' % (self._args.alpha, self._args.beta)
args += ' --num_layers %d --num_warmups %d --num_iters %d' % \
(self._args.num_layers, self._args.num_warmup, self._args.num_steps)
if self._args.use_cuda_graph:
args += ' --use_cuda_graph'
self._commands = ['%s %s' % (self.__bin_path, args)]
return True
......@@ -347,7 +383,6 @@ def _prepare_model(
self.__device
)
model = model.to(dtype=getattr(torch, precision.value))
if self.__cuda_available:
model = model.cuda()
return model
......@@ -401,6 +436,8 @@ def _benchmark(self):
Return:
True if _benchmark succeeds.
"""
if self._args.use_pytorch:
# Execute PyTorch model if pytorch impl path
batch_size = self._args.batch_size
input_size = self._args.input_size
hidden_size = self._args.hidden_size
......@@ -425,7 +462,8 @@ def _benchmark(self):
# Prepare model
model = self._prepare_model(
input_size, hidden_size, num_layers, computation, communication, activation, precision, self.__world_size
input_size, hidden_size, num_layers, computation, communication, activation, precision,
self.__world_size
)
# Run model
......@@ -433,6 +471,43 @@ def _benchmark(self):
# Process data and return
return self._process_data(step_times)
else:
# Execute commands if cpp impl path
if not super()._benchmark():
return False
return True
def _process_raw_result(self, cmd_idx, raw_output):
"""Function to parse raw results and save the summarized results.
self._result.add_raw_data() and self._result.add_result() need to be called to save the results.
Args:
cmd_idx (int): the index of command corresponding with the raw_output.
raw_output (str): raw output string of the micro-benchmark.
Return:
True if the raw output string is valid and result can be extracted.
"""
self._result.add_raw_data('raw_output_' + str(cmd_idx), raw_output, self._args.log_raw_data)
try:
output_lines = [x.strip() for x in raw_output.strip().splitlines()]
step_time = None
for output_line in output_lines:
if ' ms per iteration' in output_line:
step_time = float(output_line.split(' ms per iteration')[0].split()[-1])
break
return self._process_numeric_result(
'step_times', [step_time], reduce_type=ReduceType.MAX, cal_percentile=True
)
except BaseException as e:
return self._set_error_code_and_print_error_msg(
ReturnCode.MICROBENCHMARK_RESULT_PARSING_FAILURE,
'The result format is invalid - round: {}, benchmark: {}, raw output: {}, message: {}.'.format(
self._curr_run_index, self._name, raw_output, str(e)
)
)
def _postprocess(self):
"""Postprocess/cleanup operations after the benchmarking.
......@@ -443,14 +518,26 @@ def _postprocess(self):
if not super()._postprocess():
return False
if self._args.use_pytorch:
try:
torch.distributed.destroy_process_group()
except BaseException as e:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_DESTROY_FAILURE)
logger.error('Post process failed - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False
return self._set_error_code_and_print_error_msg(
ReturnCode.DISTRIBUTED_SETTING_DESTROY_FAILURE,
'Post process failed - benchmark: {}, message: {}.'.format(self._name, str(e))
)
return True
def _set_error_code_and_print_error_msg(self, error_code, error_msg):
"""Set error code and print error log upon error.
Return:
False, representing error.
"""
self._result.set_return_code(error_code)
logger.error(error_msg)
return False
BenchmarkRegistry.register_benchmark('pytorch-dist-inference', DistInference, parameters='')
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
cmake_minimum_required(VERSION 3.18)
project(dist_inference LANGUAGES CXX)
find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_INCLUDE_PATH})
find_package(CUDAToolkit QUIET)
# Cuda environment
if(CUDAToolkit_FOUND)
message(STATUS "Found CUDA: " ${CUDAToolkit_VERSION})
include(../cuda_common.cmake)
add_executable(dist_inference dist_inference.cu)
set_property(TARGET dist_inference PROPERTY CUDA_ARCHITECTURES ${NVCC_ARCHS_SUPPORTED})
target_link_libraries(dist_inference MPI::MPI_CXX nccl cublasLt)
else()
# ROCm environment
include(../rocm_common.cmake)
find_package(hip QUIET)
if(hip_FOUND)
message(STATUS "Found ROCm: " ${HIP_VERSION})
# Convert cuda code to hip code in cpp
execute_process(COMMAND hipify-perl -print-stats -o dist_inference.cpp dist_inference.cu WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/)
# link hip device lib
add_executable(dist_inference dist_inference.cpp)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2 -DROCM_USE_FLOAT16=1")
target_link_libraries(dist_inference MPI::MPI_CXX rccl hipblaslt hip::device)
else()
message(FATAL_ERROR "No CUDA or ROCm environment found.")
endif()
endif()
install(TARGETS dist_inference RUNTIME DESTINATION bin)
......@@ -33,6 +33,8 @@ superbench:
node_num: 1
env:
NCCL_ASYNC_ERROR_HANDLING: '0'
frameworks:
- pytorch
common_model_config: &common_model_config
duration: 0
num_warmup: 64
......
......@@ -29,6 +29,8 @@ superbench:
node_num: 1
env:
NCCL_ASYNC_ERROR_HANDLING: '0'
frameworks:
- pytorch
common_model_config: &common_model_config
duration: 0
num_warmup: 64
......
......@@ -3,12 +3,15 @@
"""Tests for distributed inference benchmark."""
import numbers
import unittest
from tests.helper import decorator
from tests.helper.testcase import BenchmarkTestCase
import tests.benchmarks.utils as utils
from superbench.benchmarks \
import BenchmarkRegistry, Framework, BenchmarkType, ReturnCode, Precision, DistributedImpl, DistributedBackend
import BenchmarkRegistry, Framework, BenchmarkType, ReturnCode, Precision, DistributedImpl, DistributedBackend, \
Platform
from superbench.benchmarks.micro_benchmarks.dist_inference \
import DistInference, ComputationKernelType, CommunicationKernelType, ActivationKernelType
from superbench.common.utils import network
......@@ -20,7 +23,9 @@
@decorator.pytorch_test
def test_pytorch_dist_inference_normal():
"""Test pytorch-dist-inference benchmark on distributed normal case."""
context = BenchmarkRegistry.create_benchmark_context('dist-inference', parameters='', framework=Framework.PYTORCH)
context = BenchmarkRegistry.create_benchmark_context(
'dist-inference', parameters='--use_pytorch', framework=Framework.PYTORCH
)
world_size = 2
assert (BenchmarkRegistry.is_benchmark_context_valid(context))
results = utils.simulated_ddp_distributed_benchmark(context, world_size)
......@@ -33,9 +38,12 @@ def test_pytorch_dist_inference_normal():
assert (benchmark.type == BenchmarkType.MICRO)
# Check predefined parameters of dist-inference benchmark.
assert (benchmark._args.use_pytorch is True)
assert (benchmark._args.batch_size == 64)
assert (benchmark._args.input_size == 1024)
assert (benchmark._args.hidden_size == 1024)
assert (benchmark._args.alpha == 1.0)
assert (benchmark._args.beta == 1.0)
assert (benchmark._args.num_layers == 1)
assert (benchmark._args.computation_kernel == ComputationKernelType.MATMUL)
assert (benchmark._args.communication_kernel == CommunicationKernelType.ALLREDUCE)
......@@ -45,6 +53,7 @@ def test_pytorch_dist_inference_normal():
assert (benchmark._args.num_steps == 10000)
assert (benchmark._args.distributed_impl == DistributedImpl.DDP)
assert (benchmark._args.distributed_backend == DistributedBackend.NCCL)
assert (benchmark._args.use_cuda_graph is False)
# Check results and metrics.
assert (benchmark.run_count == 1)
......@@ -52,14 +61,16 @@ def test_pytorch_dist_inference_normal():
# step_times
assert (len(benchmark.raw_data) == 1)
# return code + (avg, 50th, 90th, 95th, 99th, 99.9th)
assert (len(benchmark.result) == 7)
assert (7 == len(benchmark.result))
@decorator.cuda_test
@decorator.pytorch_test
def test_pytorch_dist_inference_fake_distributed():
"""Test pytorch-dist-inference benchmark on single gpu."""
context = BenchmarkRegistry.create_benchmark_context('dist-inference', parameters='', framework=Framework.PYTORCH)
context = BenchmarkRegistry.create_benchmark_context(
'dist-inference', parameters='--use_pytorch', framework=Framework.PYTORCH
)
port = network.get_free_port()
assert (port)
utils.setup_simulated_ddp_distributed_env(1, 0, port)
......@@ -72,9 +83,12 @@ def test_pytorch_dist_inference_fake_distributed():
assert (benchmark.type == BenchmarkType.MICRO)
# Check predefined parameters of dist-inference benchmark.
assert (benchmark._args.use_pytorch is True)
assert (benchmark._args.batch_size == 64)
assert (benchmark._args.input_size == 1024)
assert (benchmark._args.hidden_size == 1024)
assert (benchmark._args.alpha == 1.0)
assert (benchmark._args.beta == 1.0)
assert (benchmark._args.num_layers == 1)
assert (benchmark._args.computation_kernel == ComputationKernelType.MATMUL)
assert (benchmark._args.communication_kernel == CommunicationKernelType.ALLREDUCE)
......@@ -84,6 +98,7 @@ def test_pytorch_dist_inference_fake_distributed():
assert (benchmark._args.num_steps == 10000)
assert (benchmark._args.distributed_impl == DistributedImpl.DDP)
assert (benchmark._args.distributed_backend == DistributedBackend.NCCL)
assert (benchmark._args.use_cuda_graph is False)
# Check results and metrics.
assert (benchmark.run_count == 1)
......@@ -94,3 +109,127 @@ def test_pytorch_dist_inference_fake_distributed():
assert (len(benchmark.result) == 7)
utils.clean_simulated_ddp_distributed_env()
class DistInferenceCppImplTest(BenchmarkTestCase, unittest.TestCase):
"""Test class for pytorch-dist-inference benchmark."""
@classmethod
def setUpClass(cls):
"""Hook method for setting up class fixture before running tests in the class."""
super().setUpClass()
cls.createMockEnvs(cls)
cls.createMockFiles(cls, ['bin/dist_inference'])
def _test_dist_inference_command_generation(self, platform):
"""Test pytorch-dist-inference cpp impl benchmark command generation."""
benchmark_name = 'pytorch-dist-inference'
(benchmark_class,
predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, platform)
assert (benchmark_class)
batch_size = 1
input_size = 2
hidden_size = 3
alpha = 4.0
beta = 5.0
num_layers = 6
num_warmup = 7
num_steps = 8
wrapper_params_format_str = \
'--batch_size %d --input_size %d --hidden_size %d ' \
'--alpha %g --beta %g --num_layers %d --num_warmup %d --num_steps %d --use_cuda_graph'
parameters = wrapper_params_format_str % (
batch_size, input_size, hidden_size, alpha, beta, num_layers, num_warmup, num_steps
)
benchmark = benchmark_class(benchmark_name, parameters=parameters)
# Check basic information
assert (benchmark)
ret = benchmark._preprocess()
assert (ret is True)
assert (benchmark.return_code == ReturnCode.SUCCESS)
assert (benchmark.name == benchmark_name)
assert (benchmark.type == BenchmarkType.MICRO)
# Check parameters specified in BenchmarkContext.
assert (benchmark._args.use_pytorch is False)
assert (benchmark._args.batch_size == batch_size)
assert (benchmark._args.input_size == input_size)
assert (benchmark._args.hidden_size == hidden_size)
assert (benchmark._args.alpha == alpha)
assert (benchmark._args.beta == beta)
assert (benchmark._args.num_layers == num_layers)
assert (benchmark._args.num_warmup == num_warmup)
assert (benchmark._args.num_steps == num_steps)
assert (benchmark._args.use_cuda_graph is True)
# Check command
assert (1 == len(benchmark._commands))
for cmd in benchmark._commands:
m, n, k = hidden_size, batch_size, input_size
bench_params_format_str = \
'%s -m %d -n %d -k %d --alpha %g --beta %g ' + \
'--num_layers %d --num_warmups %d --num_iters %d --use_cuda_graph'
assert (
cmd == (
bench_params_format_str %
(benchmark._DistInference__bin_path, m, n, k, alpha, beta, num_layers, num_warmup, num_steps)
)
)
@decorator.cuda_test
def test_dist_inference_command_generation_cuda(self):
"""Test pytorch-dist-inference cpp impl benchmark command generation, CUDA case."""
self._test_dist_inference_command_generation(Platform.CUDA)
@decorator.rocm_test
def test_dist_inference_command_generation_rocm(self):
"""Test pytorch-dist-inference cpp impl benchmark command generation, ROCm case."""
self._test_dist_inference_command_generation(Platform.ROCM)
@decorator.load_data('tests/data/dist_inference.log')
def _test_dist_inference_result_parsing(self, platform, test_raw_output):
"""Test pytorch-dist-inference cpp impl benchmark result parsing."""
benchmark_name = 'pytorch-dist-inference'
(benchmark_class,
predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, platform)
assert (benchmark_class)
benchmark = benchmark_class(benchmark_name, parameters='')
assert (benchmark)
ret = benchmark._preprocess()
assert (ret is True)
assert (benchmark.return_code == ReturnCode.SUCCESS)
assert (benchmark.name == 'pytorch-dist-inference')
assert (benchmark.type == BenchmarkType.MICRO)
# Positive case - valid raw output.
assert (benchmark._process_raw_result(0, test_raw_output))
assert (benchmark.return_code == ReturnCode.SUCCESS)
# step_times
assert (len(benchmark.raw_data) == 2)
# return code + (avg, 50th, 90th, 95th, 99th, 99.9th)
test_latency = float(test_raw_output.splitlines()[-1].split(' ms per iteration')[0].split()[-1])
assert (7 == len(benchmark.result))
for output_key in benchmark.result:
if output_key == 'return_code':
assert (benchmark.result[output_key] == [0])
else:
assert (output_key.startswith('step_times'))
assert (len(benchmark.result[output_key]) == 1)
assert (isinstance(benchmark.result[output_key][0], numbers.Number))
assert (test_latency == benchmark.result[output_key][0])
# Negative case - invalid raw output.
assert (benchmark._process_raw_result(1, 'Invalid raw output') is False)
assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_RESULT_PARSING_FAILURE)
@decorator.cuda_test
def test_dist_inference_result_parsing_cuda(self):
"""Test pytorch-dist-inference cpp impl benchmark result parsing, CUDA case."""
self._test_dist_inference_result_parsing(Platform.CUDA)
@decorator.rocm_test
def test_dist_inference_result_parsing_rocm(self):
"""Test pytorch-dist-inference cpp impl benchmark result parsing, ROCm case."""
self._test_dist_inference_result_parsing(Platform.ROCM)
Parameters: m=80, n=128, k=128, alpha=1.000000, beta=1.000000, num_layers=50, num_warmups=20, num_iters=100, use_cuda_graph=0
Time: 173 ms in total, 1.73 ms per iteration, 0.0346 ms per layer
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment