Commit e129194a authored by Sugon_ldc's avatar Sugon_ldc
Browse files

add new model resnet50v1.5

parents
Pipeline #571 failed with stages
in 0 seconds
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pathlib import Path
import numpy as np
from PIL import Image
LOGGER = logging.getLogger(__name__)
def get_dataloader_fn(
*, data_dir: str, batch_size: int = 1, width: int = 224, height: int = 224, images_num: int = None,
precision: str = "fp32", classes: int = 1000
):
def _dataloader():
image_extensions = [".gif", ".png", ".jpeg", ".jpg"]
image_paths = sorted([p for p in Path(data_dir).rglob("*") if p.suffix.lower() in image_extensions])
if images_num is not None:
image_paths = image_paths[:images_num]
LOGGER.info(
f"Creating PIL dataloader on data_dir={data_dir} #images={len(image_paths)} "
f"image_size=({width}, {height}) batch_size={batch_size}"
)
onehot = np.eye(classes)
batch = []
for image_path in image_paths:
img = Image.open(image_path.as_posix()).convert("RGB")
img = img.resize((width, height))
img = (np.array(img).astype(np.float32) / 255) - np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(1, 1, 3)
img = img / np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(1, 1, 3)
true_class = np.array([int(image_path.parent.name)])
assert tuple(img.shape) == (height, width, 3)
img = img[np.newaxis, ...]
batch.append((img, image_path.as_posix(), true_class))
if len(batch) >= batch_size:
ids = [image_path for _, image_path, *_ in batch]
x = {"INPUT__0": np.ascontiguousarray(
np.transpose(np.concatenate([img for img, *_ in batch]),
(0, 3, 1, 2)).astype(np.float32 if precision == "fp32" else np.float16))}
y_real = {"OUTPUT__0": onehot[np.concatenate([class_ for *_, class_ in batch])].astype(
np.float32 if precision == "fp32" else np.float16
)}
batch = []
yield ids, x, y_real
return _dataloader
#!/usr/bin/python
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import torch
import argparse
import triton.deployer_lib as deployer_lib
def get_model_args(model_args):
""" the arguments initialize_model will receive """
parser = argparse.ArgumentParser()
## Required parameters by the model.
parser.add_argument(
"--config",
default="resnet50",
type=str,
required=True,
help="Network to deploy",
)
parser.add_argument(
"--checkpoint", default=None, type=str, help="The checkpoint of the model. "
)
parser.add_argument(
"--batch_size", default=1000, type=int, help="Batch size for inference"
)
parser.add_argument(
"--fp16", default=False, action="store_true", help="FP16 inference"
)
parser.add_argument(
"--dump_perf_data",
type=str,
default=None,
help="Directory to dump perf data sample for testing",
)
return parser.parse_args(model_args)
def initialize_model(args):
""" return model, ready to trace """
from image_classification.resnet import build_resnet
model = build_resnet(args.config, "fanin", 1000, fused_se=False)
if args.checkpoint:
state_dict = torch.load(args.checkpoint, map_location="cpu")
model.load_state_dict(
{k.replace("module.", ""): v for k, v in state_dict.items()}
)
model.load_state_dict(state_dict)
return model.half() if args.fp16 else model
def get_dataloader(args):
""" return dataloader for inference """
from image_classification.dataloaders import get_synthetic_loader
def data_loader():
loader, _ = get_synthetic_loader(None, 128, 1000, True, fp16=args.fp16)
processed = 0
for inp, _ in loader:
yield inp
processed += 1
if processed > 10:
break
return data_loader()
if __name__ == "__main__":
# don't touch this!
deployer, model_argv = deployer_lib.create_deployer(
sys.argv[1:]
) # deployer and returns removed deployer arguments
model_args = get_model_args(model_argv)
model = initialize_model(model_args)
dataloader = get_dataloader(model_args)
if model_args.dump_perf_data:
input_0 = next(iter(dataloader))
if model_args.fp16:
input_0 = input_0.half()
os.makedirs(model_args.dump_perf_data, exist_ok=True)
input_0.detach().cpu().numpy()[0].tofile(
os.path.join(model_args.dump_perf_data, "input__0")
)
deployer.deploy(dataloader, model)
#!/usr/bin/python
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import json
import torch
import argparse
import statistics
from collections import Counter
torch_type_to_triton_type = {
torch.bool: "TYPE_BOOL",
torch.int8: "TYPE_INT8",
torch.int16: "TYPE_INT16",
torch.int32: "TYPE_INT32",
torch.int64: "TYPE_INT64",
torch.uint8: "TYPE_UINT8",
torch.float16: "TYPE_FP16",
torch.float32: "TYPE_FP32",
torch.float64: "TYPE_FP64",
}
CONFIG_TEMPLATE = r"""
name: "{model_name}"
platform: "{platform}"
max_batch_size: {max_batch_size}
input [
{spec_inputs}
]
output [
{spec_outputs}
]
{dynamic_batching}
{model_optimizations}
instance_group [
{{
count: {engine_count}
kind: KIND_GPU
gpus: [ {gpu_list} ]
}}
]"""
INPUT_TEMPLATE = r"""
{{
name: "input__{num}"
data_type: {type}
dims: {dims}
{reshape}
}},"""
OUTPUT_TEMPLATE = r"""
{{
name: "output__{num}"
data_type: {type}
dims: {dims}
{reshape}
}},"""
MODEL_OPTIMIZATION_TEMPLATE = r"""
optimization {{
{execution_accelerator}
cuda {{
graphs: {capture_cuda_graph}
}}
}}"""
EXECUTION_ACCELERATOR_TEMPLATE = r"""
execution_accelerators {{
gpu_execution_accelerator: [
{{
name: "tensorrt"
}}
]
}},"""
def remove_empty_lines(text):
""" removes empty lines from text, returns the result """
ret = "".join([s for s in text.strip().splitlines(True) if s.strip()])
return ret
def create_deployer(argv):
""" takes a list of arguments, returns a deployer object and the list of unused arguments """
parser = argparse.ArgumentParser()
# required args
method = parser.add_mutually_exclusive_group(required=True)
method.add_argument(
"--ts-script",
action="store_true",
help="convert to torchscript using torch.jit.script",
)
method.add_argument(
"--ts-trace",
action="store_true",
help="convert to torchscript using torch.jit.trace",
)
method.add_argument(
"--onnx", action="store_true", help="convert to onnx using torch.onnx.export"
)
method.add_argument(
"--trt", action="store_true", help="convert to trt using tensorrt"
)
# triton related args
arguments = parser.add_argument_group("triton related flags")
arguments.add_argument(
"--triton-no-cuda", action="store_true", help="Use the CPU for tracing."
)
arguments.add_argument(
"--triton-model-name",
type=str,
default="model",
help="exports to appropriate directory structure for TRITON",
)
arguments.add_argument(
"--triton-model-version",
type=int,
default=1,
help="exports to appropriate directory structure for TRITON",
)
arguments.add_argument(
"--triton-max-batch-size",
type=int,
default=8,
help="Specifies the 'max_batch_size' in the TRITON model config.\
See the TRITON documentation for more info.",
)
arguments.add_argument(
"--triton-dyn-batching-delay",
type=float,
default=0,
help="Determines the dynamic_batching queue delay in milliseconds(ms) for\
the TRITON model config. Use '0' or '-1' to specify static batching.\
See the TRITON documentation for more info.",
)
arguments.add_argument(
"--triton-engine-count",
type=int,
default=1,
help="Specifies the 'instance_group' count value in the TRITON model config.\
See the TRITON documentation for more info.",
)
arguments.add_argument(
"--save-dir", type=str, default="./triton_models", help="Saved model directory"
)
# optimization args
arguments = parser.add_argument_group("optimization flags")
arguments.add_argument(
"--max_workspace_size",
type=int,
default=512 * 1024 * 1024,
help="set the size of the workspace for trt export",
)
arguments.add_argument(
"--trt-fp16",
action="store_true",
help="trt flag ---- export model in mixed precision mode",
)
arguments.add_argument(
"--capture-cuda-graph",
type=int,
default=1,
help="capture cuda graph for obtaining speedup. possible values: 0, 1. default: 1. ",
)
# remainder args
arguments.add_argument(
"model_arguments",
nargs=argparse.REMAINDER,
help="arguments that will be ignored by deployer lib and will be forwarded to your deployer script",
)
#
args = parser.parse_args(argv)
deployer = Deployer(args)
#
return deployer, args.model_arguments[1:]
class DeployerLibrary:
def __init__(self, args):
self.args = args
self.platform = None
def set_platform(self, platform):
""" sets the platform
:: platform :: "pytorch_libtorch" or "onnxruntime_onnx" or "tensorrt_plan"
"""
self.platform = platform
def build_trt_engine(self, model_file, shapes):
""" takes a path to an onnx file, and shape information, returns a trt engine
:: model_file :: path to an onnx model
:: shapes :: dictionary containing min shape, max shape, opt shape for the trt engine
"""
import tensorrt as trt
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
builder = trt.Builder(TRT_LOGGER)
builder.fp16_mode = self.args.trt_fp16
builder.max_batch_size = self.args.triton_max_batch_size
#
config = builder.create_builder_config()
config.max_workspace_size = self.args.max_workspace_size
if self.args.trt_fp16:
config.flags |= 1 << int(trt.BuilderFlag.FP16)
profile = builder.create_optimization_profile()
for s in shapes:
profile.set_shape(s["name"], min=s["min"], opt=s["opt"], max=s["max"])
config.add_optimization_profile(profile)
explicit_batch = 1 << (int)(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
network = builder.create_network(explicit_batch)
#
with trt.OnnxParser(network, TRT_LOGGER) as parser:
with open(model_file, "rb") as model:
parser.parse(model.read())
for i in range(parser.num_errors):
e = parser.get_error(i)
print("||||e", e)
engine = builder.build_engine(network, config=config)
return engine
def load_engine(self, engine_filepath):
""" loads a trt engine from engine_filepath, returns it """
import tensorrt as trt
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(engine_filepath, "rb") as f, trt.Runtime(TRT_LOGGER) as runtime:
engine = runtime.deserialize_cuda_engine(f.read())
return engine
def prepare_inputs(self, dataloader, device):
""" load sample inputs to device """
inputs = []
for batch in dataloader:
if type(batch) is torch.Tensor:
batch_d = batch.to(device)
batch_d = (batch_d,)
inputs.append(batch_d)
else:
batch_d = []
for x in batch:
assert type(x) is torch.Tensor, "input is not a tensor"
batch_d.append(x.to(device))
batch_d = tuple(batch_d)
inputs.append(batch_d)
return inputs
def get_list_of_shapes(self, l, fun):
""" returns the list of min/max shapes, depending on fun
:: l :: list of tuples of tensors
:: fun :: min or max
"""
tensor_tuple = l[0]
shapes = [list(x.shape) for x in tensor_tuple]
for tensor_tuple in l:
assert len(tensor_tuple) == len(
shapes
), "tensors with varying shape lengths are not supported"
for i, x in enumerate(tensor_tuple):
for j in range(len(x.shape)):
shapes[i][j] = fun(shapes[i][j], x.shape[j])
return shapes # a list of shapes
def get_tuple_of_min_shapes(self, l):
""" returns the tuple of min shapes
:: l :: list of tuples of tensors """
shapes = self.get_list_of_shapes(l, min)
min_batch = 1
shapes = [[min_batch, *shape[1:]] for shape in shapes]
shapes = tuple(shapes)
return shapes # tuple of min shapes
def get_tuple_of_max_shapes(self, l):
""" returns the tuple of max shapes
:: l :: list of tuples of tensors """
shapes = self.get_list_of_shapes(l, max)
max_batch = max(2, shapes[0][0])
shapes = [[max_batch, *shape[1:]] for shape in shapes]
shapes = tuple(shapes)
return shapes # tuple of max shapes
def get_tuple_of_opt_shapes(self, l):
""" returns the tuple of opt shapes
:: l :: list of tuples of tensors """
counter = Counter()
for tensor_tuple in l:
shapes = [tuple(x.shape) for x in tensor_tuple]
shapes = tuple(shapes)
counter[shapes] += 1
shapes = counter.most_common(1)[0][0]
return shapes # tuple of most common occuring shapes
def get_tuple_of_dynamic_shapes(self, l):
""" returns a tuple of dynamic shapes: variable tensor dimensions
(for ex. batch size) occur as -1 in the tuple
:: l :: list of tuples of tensors """
tensor_tuple = l[0]
shapes = [list(x.shape) for x in tensor_tuple]
for tensor_tuple in l:
err_msg = "tensors with varying shape lengths are not supported"
assert len(tensor_tuple) == len(shapes), err_msg
for i, x in enumerate(tensor_tuple):
for j in range(len(x.shape)):
if shapes[i][j] != x.shape[j] or j == 0:
shapes[i][j] = -1
shapes = tuple(shapes)
return shapes # tuple of dynamic shapes
def run_models(self, models, inputs):
""" run the models on inputs, return the outputs and execution times """
ret = []
for model in models:
torch.cuda.synchronize()
time_start = time.time()
outputs = []
for input in inputs:
with torch.no_grad():
output = model(*input)
if type(output) is torch.Tensor:
output = [output]
outputs.append(output)
torch.cuda.synchronize()
time_end = time.time()
t = time_end - time_start
ret.append(outputs)
ret.append(t)
return ret
def compute_tensor_stats(self, tensor):
return {
"std": tensor.std().item(),
"mean": tensor.mean().item(),
"max": tensor.max().item(),
"min": tensor.min().item(),
}
def compute_errors(self, outputs_A, outputs_B):
""" returns dictionary with errors statistics """
device = outputs_A[0][0][0].device
dtype = outputs_A[0][0][0].dtype
x_values = torch.zeros(0, device=device, dtype=dtype)
y_values = torch.zeros(0, device=device, dtype=dtype)
d_values = torch.zeros(0, device=device, dtype=dtype)
for output_A, output_B in zip(outputs_A, outputs_B):
for x, y in zip(output_A, output_B):
d = abs(x - y)
x_values = torch.cat((x_values, x), 0)
y_values = torch.cat((y_values, y), 0)
d_values = torch.cat((d_values, d), 0)
Error_stats = {
"Original": self.compute_tensor_stats(x_values),
"Converted": self.compute_tensor_stats(y_values),
"Absolute difference": self.compute_tensor_stats(d_values),
}
return Error_stats
def print_errors(self, Error_stats):
""" print various statistcs of Linf errors """
print()
print("conversion correctness test results")
print("-----------------------------------")
import pandas as pd
print(pd.DataFrame(Error_stats))
def write_config(
self, config_filename, input_shapes, input_types, output_shapes, output_types
):
""" writes TRTIS config file
:: config_filename :: the file to write the config file into
:: input_shapes :: tuple of dynamic shapes of the input tensors
:: input_types :: tuple of torch types of the input tensors
:: output_shapes :: tuple of dynamic shapes of the output tensors
:: output_types :: tuple of torch types of the output tensors
"""
assert self.platform is not None, "error - platform is not set"
config_template = CONFIG_TEMPLATE
input_template = INPUT_TEMPLATE
optimization_template = MODEL_OPTIMIZATION_TEMPLATE
accelerator_template = EXECUTION_ACCELERATOR_TEMPLATE
spec_inputs = r""""""
for i, (shape, typ) in enumerate(zip(input_shapes, input_types)):
d = {
"num": str(i),
"type": torch_type_to_triton_type[typ],
"dims": str([1])
if len(shape) == 1
else str(list(shape)[1:]), # first dimension is the batch size
}
d["reshape"] = "reshape: { shape: [ ] }" if len(shape) == 1 else ""
spec_inputs += input_template.format_map(d)
spec_inputs = spec_inputs[:-1]
output_template = OUTPUT_TEMPLATE
spec_outputs = r""""""
for i, (shape, typ) in enumerate(zip(output_shapes, output_types)):
d = {
"num": str(i),
"type": torch_type_to_triton_type[typ],
"dims": str([1])
if len(shape) == 1
else str(list(shape)[1:]), # first dimension is the batch size
}
d["reshape"] = "reshape: { shape: [ ] }" if len(shape) == 1 else ""
spec_outputs += output_template.format_map(d)
spec_outputs = spec_outputs[:-1]
batching_str = ""
max_batch_size = self.args.triton_max_batch_size
if self.args.triton_dyn_batching_delay >= 0:
# Use only full and half full batches
pref_batch_size = [int(max_batch_size / 2.0), max_batch_size]
if self.args.triton_dyn_batching_delay > 0:
dyn_batch_delay_str = f"max_queue_delay_microseconds: {int(self.args.triton_dyn_batching_delay * 1000.0)}"
else:
dyn_batch_delay_str = ""
batching_str = r"""
dynamic_batching {{
preferred_batch_size: [{0}]
{1}
}}""".format(
", ".join([str(x) for x in pref_batch_size]), dyn_batch_delay_str
)
accelerator_str = ""
d = {
"execution_accelerator": accelerator_str,
"capture_cuda_graph": str(self.args.capture_cuda_graph),
}
optimization_str = optimization_template.format_map(d)
config_values = {
"model_name": self.args.triton_model_name,
"platform": self.platform,
"max_batch_size": max_batch_size,
"spec_inputs": spec_inputs,
"spec_outputs": spec_outputs,
"dynamic_batching": batching_str,
"model_optimizations": optimization_str,
"gpu_list": ", ".join([str(x) for x in range(torch.cuda.device_count())]),
"engine_count": self.args.triton_engine_count,
}
# write config
with open(config_filename, "w") as file:
final_config_str = config_template.format_map(config_values)
final_config_str = remove_empty_lines(final_config_str)
file.write(final_config_str)
class Deployer:
def __init__(self, args):
self.args = args
self.lib = DeployerLibrary(args)
def deploy(self, dataloader, model):
""" deploy the model and test for correctness with dataloader """
if self.args.ts_script or self.args.ts_trace:
self.lib.set_platform("pytorch_libtorch")
print(
"deploying model "
+ self.args.triton_model_name
+ " in format "
+ self.lib.platform
)
self.to_triton_torchscript(dataloader, model)
elif self.args.onnx:
self.lib.set_platform("onnxruntime_onnx")
print(
"deploying model "
+ self.args.triton_model_name
+ " in format "
+ self.lib.platform
)
self.to_triton_onnx(dataloader, model)
elif self.args.trt:
self.lib.set_platform("tensorrt_plan")
print(
"deploying model "
+ self.args.triton_model_name
+ " in format "
+ self.lib.platform
)
self.to_triton_trt(dataloader, model)
else:
assert False, "error"
print("done")
def to_triton_trt(self, dataloader, model):
""" export the model to trt and test correctness on dataloader """
import tensorrt as trt
# setup device
if self.args.triton_no_cuda:
device = torch.device("cpu")
else:
device = torch.device("cuda")
# prepare model
model.to(device)
model.eval()
assert not model.training, "internal error - model should be in eval() mode! "
# prepare inputs
inputs = self.lib.prepare_inputs(dataloader, device)
# generate outputs
outputs = []
for input in inputs:
with torch.no_grad():
output = model(*input)
if type(output) is torch.Tensor:
output = [output]
outputs.append(output)
# generate input shapes - dynamic tensor shape support
input_shapes = self.lib.get_tuple_of_dynamic_shapes(inputs)
# generate output shapes - dynamic tensor shape support
output_shapes = self.lib.get_tuple_of_dynamic_shapes(outputs)
# generate input types
input_types = [x.dtype for x in inputs[0]]
# generate output types
output_types = [x.dtype for x in outputs[0]]
# get input names
rng = range(len(input_types))
input_names = ["input__" + str(num) for num in rng]
# get output names
rng = range(len(output_types))
output_names = ["output__" + str(num) for num in rng]
# prepare save path
model_folder = os.path.join(self.args.save_dir, self.args.triton_model_name)
version_folder = os.path.join(model_folder, str(self.args.triton_model_version))
if not os.path.exists(version_folder):
os.makedirs(version_folder)
final_model_path = os.path.join(version_folder, "model.plan")
# get indices of dynamic input and output shapes
dynamic_axes = {}
for input_name, shape in zip(input_names, input_shapes):
dynamic_axes[input_name] = [i for i, x in enumerate(shape) if x == -1]
for output_name, shape in zip(output_names, output_shapes):
dynamic_axes[output_name] = [i for i, x in enumerate(shape) if x == -1]
# export the model to onnx first
with torch.no_grad():
torch.onnx.export(
model,
inputs[0],
final_model_path,
verbose=False,
input_names=input_names,
output_names=output_names,
dynamic_axes=dynamic_axes,
opset_version=11,
)
# get shapes
min_shapes = self.lib.get_tuple_of_min_shapes(inputs)
opt_shapes = self.lib.get_tuple_of_opt_shapes(inputs)
max_shapes = self.lib.get_tuple_of_max_shapes(inputs)
zipped = zip(input_names, min_shapes, opt_shapes, max_shapes)
shapes = []
for name, min_shape, opt_shape, max_shape in zipped:
d = {"name": name, "min": min_shape, "opt": opt_shape, "max": max_shape}
shapes.append(d)
# build trt engine
engine = self.lib.build_trt_engine(final_model_path, shapes)
assert engine is not None, " trt export failure "
# write trt engine
with open(final_model_path, "wb") as f:
f.write(engine.serialize())
# load the model
engine = self.lib.load_engine(final_model_path)
class TRT_model:
def __init__(self, engine, input_names, output_names, output_types, device):
self.engine = engine
self.context = self.engine.create_execution_context()
self.input_names = input_names
self.output_names = output_names
self.output_types = output_types
self.device = device
def is_dimension_dynamic(self, dim):
return dim is None or dim <= 0
def is_shape_dynamic(self, shape):
return any([self.is_dimension_dynamic(dim) for dim in shape])
def __call__(self, *inputs):
# get input shapes
input_shapes = [x.shape for x in inputs]
# bindings
bindings = [None] * self.engine.num_bindings
# set input shapes, bind input tensors
zipped = zip(self.input_names, inputs)
for key, input in zipped:
idx = self.engine.get_binding_index(key)
bindings[idx] = input.data_ptr()
if self.engine.is_shape_binding(idx) and self.is_shape_dynamic(
self.context.get_shape(idx)
):
self.context.set_shape_input(idx, input)
elif self.is_shape_dynamic(self.engine.get_binding_shape(idx)):
self.context.set_binding_shape(idx, input.shape)
assert self.context.all_binding_shapes_specified, "trt error"
assert self.context.all_shape_inputs_specified, "trt error"
# calculate output shapes, allocate output tensors and bind them
outputs = []
zipped = zip(self.output_names, self.output_types)
for key, dtype in zipped:
idx = self.engine.get_binding_index(key)
shape = self.context.get_binding_shape(idx)
shape = tuple(shape)
assert -1 not in shape, "trt error"
tensor = torch.zeros(shape, dtype=dtype, device=self.device)
outputs.append(tensor)
bindings[idx] = outputs[-1].data_ptr()
# run inference
self.context.execute_v2(bindings=bindings)
# return the result
if len(outputs) == 1:
outputs = outputs[0]
return outputs
model_trt = TRT_model(engine, input_names, output_names, output_types, device)
# run both models on inputs
assert not model.training, "internal error - model should be in eval() mode! "
models = (model, model_trt)
outputs, time_model, outputs_trt, time_model_trt = self.lib.run_models(
models, inputs
)
# check for errors
Error_stats = self.lib.compute_errors(outputs, outputs_trt)
self.lib.print_errors(Error_stats)
print("time of error check of native model: ", time_model, "seconds")
print("time of error check of trt model: ", time_model_trt, "seconds")
print()
# write TRTIS config
config_filename = os.path.join(model_folder, "config.pbtxt")
self.lib.write_config(
config_filename, input_shapes, input_types, output_shapes, output_types
)
def name_onnx_nodes(self, model_path):
"""
Name all unnamed nodes in ONNX model
parameter model_path: path ONNX model
return: none
"""
model = onnx.load(model_path)
node_id = 0
for node in model.graph.node:
if len(node.name) == 0:
node.name = "unnamed_node_%d" % node_id
node_id += 1
# This check partially validates model
onnx.checker.check_model(model)
onnx.save(model, model_path)
# Only inference really checks ONNX model for some issues
# like duplicated node names
onnxruntime.InferenceSession(model_path, None)
def to_triton_onnx(self, dataloader, model):
""" export the model to onnx and test correctness on dataloader """
import onnx as local_onnx
global onnx
onnx = local_onnx
import onnxruntime as local_onnxruntime
global onnxruntime
onnxruntime = local_onnxruntime
# setup device
if self.args.triton_no_cuda:
device = torch.device("cpu")
else:
device = torch.device("cuda")
# prepare model
model.to(device)
model.eval()
assert not model.training, "internal error - model should be in eval() mode! "
# prepare inputs
inputs = self.lib.prepare_inputs(dataloader, device)
# generate outputs
outputs = []
for input in inputs:
with torch.no_grad():
output = model(*input)
if type(output) is torch.Tensor:
output = [output]
outputs.append(output)
# generate input shapes - dynamic tensor shape support
input_shapes = self.lib.get_tuple_of_dynamic_shapes(inputs)
# generate output shapes - dynamic tensor shape support
output_shapes = self.lib.get_tuple_of_dynamic_shapes(outputs)
# generate input types
input_types = [x.dtype for x in inputs[0]]
# generate output types
output_types = [x.dtype for x in outputs[0]]
# get input names
rng = range(len(input_types))
input_names = ["input__" + str(num) for num in rng]
# get output names
rng = range(len(output_types))
output_names = ["output__" + str(num) for num in rng]
# prepare save path
model_folder = os.path.join(self.args.save_dir, self.args.triton_model_name)
version_folder = os.path.join(model_folder, str(self.args.triton_model_version))
if not os.path.exists(version_folder):
os.makedirs(version_folder)
final_model_path = os.path.join(version_folder, "model.onnx")
# get indices of dynamic input and output shapes
dynamic_axes = {}
for input_name, input_shape in zip(input_names, input_shapes):
dynamic_axes[input_name] = [i for i, x in enumerate(input_shape) if x == -1]
for output_name, output_shape in zip(output_names, output_shapes):
dynamic_axes[output_name] = [
i for i, x in enumerate(output_shape) if x == -1
]
# export the model
assert not model.training, "internal error - model should be in eval() mode! "
with torch.no_grad():
torch.onnx.export(
model,
inputs[0],
final_model_path,
verbose=True,
input_names=input_names,
output_names=output_names,
dynamic_axes=dynamic_axes,
opset_version=11,
)
# syntactic error check
converted_model = onnx.load(final_model_path)
# check that the IR is well formed
onnx.checker.check_model(converted_model)
# Name unnamed nodes - it helps for some other processing tools
self.name_onnx_nodes(final_model_path)
converted_model = onnx.load(final_model_path)
# load the model
session = onnxruntime.InferenceSession(final_model_path, None)
class ONNX_model:
def __init__(self, session, input_names, device):
self.session = session
self.input_names = input_names
def to_numpy(self, tensor):
return (
tensor.detach().cpu().numpy()
if tensor.requires_grad
else tensor.cpu().numpy()
)
def __call__(self, *inputs):
inp = [
(input_name, inputs[i])
for i, input_name in enumerate(self.input_names)
]
inp = {input_name: self.to_numpy(x) for input_name, x in inp}
outputs = self.session.run(None, inp)
outputs = [torch.from_numpy(output) for output in outputs]
outputs = [output.to(device) for output in outputs]
if len(outputs) == 1:
outputs = outputs[0]
return outputs
# switch to eval mode
model_onnx = ONNX_model(session, input_names, device)
# run both models on inputs
assert not model.training, "internal error - model should be in eval() mode! "
models = (model, model_onnx)
outputs, time_model, outputs_onnx, time_model_onnx = self.lib.run_models(
models, inputs
)
# check for errors
Error_stats = self.lib.compute_errors(outputs, outputs_onnx)
self.lib.print_errors(Error_stats)
print("time of error check of native model: ", time_model, "seconds")
print("time of error check of onnx model: ", time_model_onnx, "seconds")
print()
# write TRTIS config
config_filename = os.path.join(model_folder, "config.pbtxt")
self.lib.write_config(
config_filename, input_shapes, input_types, output_shapes, output_types
)
def to_triton_torchscript(self, dataloader, model):
""" export the model to torchscript and test correctness on dataloader """
# setup device
if self.args.triton_no_cuda:
device = torch.device("cpu")
else:
device = torch.device("cuda")
# prepare model
model.to(device)
model.eval()
assert not model.training, "internal error - model should be in eval() mode! "
# prepare inputs
inputs = self.lib.prepare_inputs(dataloader, device)
# generate input shapes - dynamic tensor shape support
input_shapes = self.lib.get_tuple_of_dynamic_shapes(inputs)
# generate input types
input_types = [x.dtype for x in inputs[0]]
# prepare save path
model_folder = os.path.join(self.args.save_dir, self.args.triton_model_name)
version_folder = os.path.join(model_folder, str(self.args.triton_model_version))
if not os.path.exists(version_folder):
os.makedirs(version_folder)
final_model_path = os.path.join(version_folder, "model.pt")
# convert the model
with torch.no_grad():
if self.args.ts_trace: # trace it
model_ts = torch.jit.trace(model, inputs[0])
if self.args.ts_script: # script it
model_ts = torch.jit.script(model)
# save the model
torch.jit.save(model_ts, final_model_path)
# load the model
model_ts = torch.jit.load(final_model_path)
model_ts.eval() # WAR for bug : by default, model_ts gets loaded in training mode
# run both models on inputs
assert not model.training, "internal error - model should be in eval() mode! "
assert (
not model_ts.training
), "internal error - converted model should be in eval() mode! "
models = (model, model_ts)
outputs, time_model, outputs_ts, time_model_ts = self.lib.run_models(
models, inputs
)
# check for errors
Error_stats = self.lib.compute_errors(outputs, outputs_ts)
self.lib.print_errors(Error_stats)
print("time of error check of native model: ", time_model, "seconds")
print("time of error check of ts model: ", time_model_ts, "seconds")
print()
# generate output shapes - dynamic tensor shape support
output_shapes = self.lib.get_tuple_of_dynamic_shapes(outputs)
# generate output types
output_types = [x.dtype for x in outputs[0]]
# now we build the config for TRTIS
config_filename = os.path.join(model_folder, "config.pbtxt")
self.lib.write_config(
config_filename, input_shapes, input_types, output_shapes, output_types
)
0.5.0-2-gd556907
\ No newline at end of file
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
\ No newline at end of file
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import inspect
import logging
from typing import Any, Callable, Dict, Optional, Union
from .core import GET_ARGPARSER_FN_NAME, load_from_file
LOGGER = logging.getLogger(__name__)
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
def filter_fn_args(args: Union[dict, argparse.Namespace], fn: Callable) -> dict:
signature = inspect.signature(fn)
parameters_names = list(signature.parameters)
if isinstance(args, argparse.Namespace):
args = vars(args)
args = {k: v for k, v in args.items() if k in parameters_names}
return args
def add_args_for_fn_signature(parser, fn) -> argparse.ArgumentParser:
parser.conflict_handler = "resolve"
signature = inspect.signature(fn)
for parameter in signature.parameters.values():
if parameter.name in ["self", "args", "kwargs"]:
continue
argument_kwargs = {}
if parameter.annotation != inspect.Parameter.empty:
if parameter.annotation == bool:
argument_kwargs["type"] = str2bool
argument_kwargs["choices"] = [0, 1]
elif isinstance(parameter.annotation, type(Optional[Any])):
types = [type_ for type_ in parameter.annotation.__args__ if not isinstance(None, type_)]
if len(types) != 1:
raise RuntimeError(
f"Could not prepare argument parser for {parameter.name}: {parameter.annotation} in {fn}"
)
argument_kwargs["type"] = types[0]
else:
argument_kwargs["type"] = parameter.annotation
if parameter.default != inspect.Parameter.empty:
if parameter.annotation == bool:
argument_kwargs["default"] = str2bool(parameter.default)
else:
argument_kwargs["default"] = parameter.default
else:
argument_kwargs["required"] = True
name = parameter.name.replace("_", "-")
LOGGER.debug(f"Adding argument {name} with {argument_kwargs}")
parser.add_argument(f"--{name}", **argument_kwargs)
return parser
class ArgParserGenerator:
def __init__(self, cls_or_fn, module_path: Optional[str] = None):
self._cls_or_fn = cls_or_fn
self._handle = cls_or_fn if inspect.isfunction(cls_or_fn) else getattr(cls_or_fn, "__init__")
input_is_python_file = module_path and module_path.endswith(".py")
self._input_path = module_path if input_is_python_file else None
self._required_fn_name_for_signature_parsing = getattr(
cls_or_fn, "required_fn_name_for_signature_parsing", None
)
def update_argparser(self, parser):
name = self._handle.__name__
group_parser = parser.add_argument_group(name)
add_args_for_fn_signature(group_parser, fn=self._handle)
self._update_argparser(group_parser)
def get_args(self, args: argparse.Namespace):
filtered_args = filter_fn_args(args, fn=self._handle)
tmp_parser = argparse.ArgumentParser(allow_abbrev=False)
self._update_argparser(tmp_parser)
custom_names = [
p.dest.replace("-", "_") for p in tmp_parser._actions if not isinstance(p, argparse._HelpAction)
]
custom_params = {n: getattr(args, n) for n in custom_names}
filtered_args = {**filtered_args, **custom_params}
return filtered_args
def from_args(self, args: Union[argparse.Namespace, Dict]):
args = self.get_args(args)
LOGGER.info(f"Initializing {self._cls_or_fn.__name__}({args})")
return self._cls_or_fn(**args)
def _update_argparser(self, parser):
label = "argparser_update"
if self._input_path:
update_argparser_handle = load_from_file(self._input_path, label=label, target=GET_ARGPARSER_FN_NAME)
if update_argparser_handle:
update_argparser_handle(parser)
elif self._required_fn_name_for_signature_parsing:
fn_handle = load_from_file(
self._input_path, label=label, target=self._required_fn_name_for_signature_parsing
)
if fn_handle:
add_args_for_fn_signature(parser, fn_handle)
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import importlib
import logging
import os
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
import numpy as np
LOGGER = logging.getLogger(__name__)
DATALOADER_FN_NAME = "get_dataloader_fn"
GET_MODEL_FN_NAME = "get_model"
GET_SERVING_INPUT_RECEIVER_FN = "get_serving_input_receiver_fn"
GET_ARGPARSER_FN_NAME = "update_argparser"
class TensorSpec(NamedTuple):
name: str
dtype: str
shape: Tuple
class Parameter(Enum):
def __lt__(self, other: "Parameter") -> bool:
return self.value < other.value
class Accelerator(Parameter):
AMP = "amp"
CUDA = "cuda"
TRT = "trt"
class Precision(Parameter):
FP16 = "fp16"
FP32 = "fp32"
TF32 = "tf32" # Deprecated
class Format(Parameter):
TF_GRAPHDEF = "tf-graphdef"
TF_SAVEDMODEL = "tf-savedmodel"
TF_TRT = "tf-trt"
TF_ESTIMATOR = "tf-estimator"
TF_KERAS = "tf-keras"
ONNX = "onnx"
TRT = "trt"
TS_SCRIPT = "ts-script"
TS_TRACE = "ts-trace"
PYT = "pyt"
class Model(NamedTuple):
handle: object
precision: Optional[Precision]
inputs: Dict[str, TensorSpec]
outputs: Dict[str, TensorSpec]
def load_from_file(file_path, label, target):
spec = importlib.util.spec_from_file_location(name=label, location=file_path)
my_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(my_module) # pytype: disable=attribute-error
return getattr(my_module, target, None)
class BaseLoader(abc.ABC):
required_fn_name_for_signature_parsing: Optional[str] = None
@abc.abstractmethod
def load(self, model_path: Union[str, Path], **kwargs) -> Model:
"""
Loads and process model from file based on given set of args
"""
pass
class BaseSaver(abc.ABC):
required_fn_name_for_signature_parsing: Optional[str] = None
@abc.abstractmethod
def save(self, model: Model, model_path: Union[str, Path]) -> None:
"""
Save model to file
"""
pass
class BaseRunner(abc.ABC):
required_fn_name_for_signature_parsing: Optional[str] = None
@abc.abstractmethod
def init_inference(self, model: Model):
raise NotImplementedError
class BaseRunnerSession(abc.ABC):
def __init__(self, model: Model):
self._model = model
@abc.abstractmethod
def __enter__(self):
raise NotImplementedError()
@abc.abstractmethod
def __exit__(self, exc_type, exc_value, traceback):
raise NotImplementedError()
@abc.abstractmethod
def __call__(self, x: Dict[str, object]):
raise NotImplementedError()
def _set_env_variables(self) -> Dict[str, object]:
"""this method not remove values; fix it if needed"""
to_set = {}
old_values = {k: os.environ.pop(k, None) for k in to_set}
os.environ.update(to_set)
return old_values
def _recover_env_variables(self, old_envs: Dict[str, object]):
for name, value in old_envs.items():
if value is None:
del os.environ[name]
else:
os.environ[name] = str(value)
class BaseConverter(abc.ABC):
required_fn_name_for_signature_parsing: Optional[str] = None
@abc.abstractmethod
def convert(self, model: Model, dataloader_fn) -> Model:
raise NotImplementedError()
@staticmethod
def required_source_model_precision(requested_model_precision: Precision) -> Precision:
return requested_model_precision
class BaseMetricsCalculator(abc.ABC):
required_fn_name_for_signature_parsing: Optional[str] = None
@abc.abstractmethod
def calc(
self,
*,
ids: List[Any],
y_pred: Dict[str, np.ndarray],
x: Optional[Dict[str, np.ndarray]],
y_real: Optional[Dict[str, np.ndarray]],
) -> Dict[str, float]:
"""
Calculates error/accuracy metrics
Args:
ids: List of ids identifying each sample in the batch
y_pred: model output as dict where key is output name and value is output value
x: model input as dict where key is input name and value is input value
y_real: input ground truth as dict where key is output name and value is output value
Returns:
dictionary where key is metric name and value is its value
"""
pass
class ShapeSpec(NamedTuple):
min: Tuple
opt: Tuple
max: Tuple
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pathlib import Path
from typing import Dict, Iterable
import numpy as np
MB2B = 2 ** 20
B2MB = 1 / MB2B
FLUSH_THRESHOLD_B = 256 * MB2B
def pad_except_batch_axis(data: np.ndarray, target_shape_with_batch_axis: Iterable[int]):
assert all(
[current_size <= target_size for target_size, current_size in zip(target_shape_with_batch_axis, data.shape)]
), "target_shape should have equal or greater all dimensions comparing to data.shape"
padding = [(0, 0)] + [ # (0, 0) - do not pad on batch_axis (with index 0)
(0, target_size - current_size)
for target_size, current_size in zip(target_shape_with_batch_axis[1:], data.shape[1:])
]
return np.pad(data, padding, "constant", constant_values=np.nan)
class NpzWriter:
"""
Dumps dicts of numpy arrays into npz files
It can/shall be used as context manager:
```
with OutputWriter('mydir') as writer:
writer.write(outputs={'classes': np.zeros(8), 'probs': np.zeros((8, 4))},
labels={'classes': np.zeros(8)},
inputs={'input': np.zeros((8, 240, 240, 3)})
```
## Variable size data
Only dynamic of last axis is handled. Data is padded with np.nan value.
Also each generated file may have different size of dynamic axis.
"""
def __init__(self, output_dir, compress=False):
self._output_dir = Path(output_dir)
self._items_cache: Dict[str, Dict[str, np.ndarray]] = {}
self._items_counters: Dict[str, int] = {}
self._flush_threshold_b = FLUSH_THRESHOLD_B
self._compress = compress
@property
def cache_size(self):
return {name: sum([a.nbytes for a in data.values()]) for name, data in self._items_cache.items()}
def _append_to_cache(self, prefix, data):
if data is None:
return
if not isinstance(data, dict):
raise ValueError(f"{prefix} data to store shall be dict")
cached_data = self._items_cache.get(prefix, {})
for name, value in data.items():
assert isinstance(
value, (list, np.ndarray)
), f"Values shall be lists or np.ndarrays; current type {type(value)}"
if not isinstance(value, np.ndarray):
value = np.array(value)
assert value.dtype.kind in ["S", "U"] or not np.any(
np.isnan(value)
), f"Values with np.nan is not supported; {name}={value}"
cached_value = cached_data.get(name, None)
if cached_value is not None:
target_shape = np.max([cached_value.shape, value.shape], axis=0)
cached_value = pad_except_batch_axis(cached_value, target_shape)
value = pad_except_batch_axis(value, target_shape)
value = np.concatenate((cached_value, value))
cached_data[name] = value
self._items_cache[prefix] = cached_data
def write(self, **kwargs):
"""
Writes named list of dictionaries of np.ndarrays.
Finally keyword names will be later prefixes of npz files where those dictionaries will be stored.
ex. writer.write(inputs={'input': np.zeros((2, 10))},
outputs={'classes': np.zeros((2,)), 'probabilities': np.zeros((2, 32))},
labels={'classes': np.zeros((2,))})
Args:
**kwargs: named list of dictionaries of np.ndarrays to store
"""
for prefix, data in kwargs.items():
self._append_to_cache(prefix, data)
biggest_item_size = max(self.cache_size.values())
if biggest_item_size > self._flush_threshold_b:
self.flush()
def flush(self):
for prefix, data in self._items_cache.items():
self._dump(prefix, data)
self._items_cache = {}
def _dump(self, prefix, data):
idx = self._items_counters.setdefault(prefix, 0)
filename = f"{prefix}-{idx:012d}.npz"
output_path = self._output_dir / filename
if self._compress:
np.savez_compressed(output_path, **data)
else:
np.savez(output_path, **data)
nitems = len(list(data.values())[0])
msg_for_labels = (
"If these are correct shapes - consider moving loading of them into metrics.py."
if prefix == "labels"
else ""
)
shapes = {name: value.shape if isinstance(value, np.ndarray) else (len(value),) for name, value in data.items()}
assert all(len(v) == nitems for v in data.values()), (
f'All items in "{prefix}" shall have same size on 0 axis equal to batch size. {msg_for_labels}'
f'{", ".join(f"{name}: {shape}" for name, shape in shapes.items())}'
)
self._items_counters[prefix] += nitems
def __enter__(self):
if self._output_dir.exists() and len(list(self._output_dir.iterdir())):
raise ValueError(f"{self._output_dir.as_posix()} is not empty")
self._output_dir.mkdir(parents=True, exist_ok=True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import logging
import os
import re
from pathlib import Path
from typing import List
LOGGER = logging.getLogger(__name__)
class ExtensionManager:
def __init__(self, name: str):
self._name = name
self._registry = {}
def register_extension(self, extension: str, clazz):
already_registered_class = self._registry.get(extension, None)
if already_registered_class and already_registered_class.__module__ != clazz.__module__:
raise RuntimeError(
f"Conflicting extension {self._name}/{extension}; "
f"{already_registered_class.__module__}.{already_registered_class.__name} "
f"and "
f"{clazz.__module__}.{clazz.__name__}"
)
elif already_registered_class is None:
clazz_full_name = f"{clazz.__module__}.{clazz.__name__}" if clazz is not None else "None"
LOGGER.debug(f"Registering extension {self._name}/{extension}: {clazz_full_name}")
self._registry[extension] = clazz
def get(self, extension):
if extension not in self._registry:
raise RuntimeError(f"Missing extension {self._name}/{extension}")
return self._registry[extension]
@property
def supported_extensions(self):
return list(self._registry)
@staticmethod
def scan_for_extensions(extension_dirs: List[Path]):
register_pattern = r".*\.register_extension\(.*"
for extension_dir in extension_dirs:
for python_path in extension_dir.rglob("*.py"):
if not python_path.is_file():
continue
payload = python_path.read_text()
if re.findall(register_pattern, payload):
import_path = python_path.relative_to(toolkit_root_dir.parent)
package = import_path.parent.as_posix().replace(os.sep, ".")
package_with_module = f"{package}.{import_path.stem}"
spec = importlib.util.spec_from_file_location(name=package_with_module, location=python_path)
my_module = importlib.util.module_from_spec(spec)
my_module.__package__ = package
try:
spec.loader.exec_module(my_module) # pytype: disable=attribute-error
except ModuleNotFoundError as e:
LOGGER.error(
f"Could not load extensions from {import_path} due to missing python packages; {e}"
)
runners = ExtensionManager("runners")
loaders = ExtensionManager("loaders")
savers = ExtensionManager("savers")
converters = ExtensionManager("converters")
toolkit_root_dir = (Path(__file__).parent / "..").resolve()
ExtensionManager.scan_for_extensions([toolkit_root_dir])
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
\ No newline at end of file
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pathlib import Path
from typing import Dict, Optional, Union
import numpy as np
# pytype: disable=import-error
import onnx
import onnx.optimizer
import onnx.shape_inference
import onnxruntime
from google.protobuf import text_format
from onnx.mapping import TENSOR_TYPE_TO_NP_TYPE
# pytype: enable=import-error
from ..core import BaseLoader, BaseRunner, BaseRunnerSession, BaseSaver, Format, Model, Precision, TensorSpec
from ..extensions import loaders, runners, savers
from .utils import infer_precision
LOGGER = logging.getLogger(__name__)
def _value_info2tensor_spec(value_info: onnx.ValueInfoProto):
onnx_data_type_map = {"float": "float32", "double": "float64"}
elem_type_name = onnx.TensorProto.DataType.Name(value_info.type.tensor_type.elem_type).lower()
dtype = onnx_data_type_map.get(elem_type_name, elem_type_name)
def _get_dim(dim):
which = dim.WhichOneof("value")
if which is not None: # which is None when dim is None
dim = getattr(dim, which)
return None if isinstance(dim, (str, bytes)) else dim
shape = value_info.type.tensor_type.shape
shape = tuple([_get_dim(d) for d in shape.dim])
return TensorSpec(value_info.name, dtype=dtype, shape=shape)
def _infer_graph_precision(onnx_graph: onnx.GraphProto) -> Optional[Precision]:
import networkx as nx
# build directed graph
nx_graph = nx.DiGraph()
def _get_dtype(vi):
t = vi.type
if hasattr(t, "tensor_type"):
type_id = t.tensor_type.elem_type
else:
raise NotImplementedError("Not implemented yet")
return TENSOR_TYPE_TO_NP_TYPE[type_id]
node_output2type = {vi.name: _get_dtype(vi) for vi in onnx_graph.value_info}
node_outputs2node = {output_name: node for node in onnx_graph.node for output_name in node.output}
node_inputs2node = {input_name: node for node in onnx_graph.node for input_name in node.input}
for node in onnx_graph.node:
node_dtype = node_output2type.get("+".join(node.output), None)
nx_graph.add_node(
node.name,
op=node.op_type,
attr={a.name: a for a in node.attribute},
dtype=node_dtype,
)
for input_name in node.input:
prev_node = node_outputs2node.get(input_name, None)
if prev_node:
nx_graph.add_edge(prev_node.name, node.name)
for input_node in onnx_graph.input:
input_name = input_node.name
nx_graph.add_node(input_name, op="input", dtype=_get_dtype(input_node))
next_node = node_inputs2node.get(input_name, None)
if next_node:
nx_graph.add_edge(input_name, next_node.name)
for output in onnx_graph.output:
output_name = output.name
nx_graph.add_node(output_name, op="output", dtype=_get_dtype(output))
prev_node = node_outputs2node.get(output_name, None)
if prev_node:
nx_graph.add_edge(prev_node.name, output_name)
else:
LOGGER.warning(f"Could not find previous node for {output_name}")
input_names = [n.name for n in onnx_graph.input]
output_names = [n.name for n in onnx_graph.output]
most_common_dtype = infer_precision(nx_graph, input_names, output_names, lambda node: node.get("dtype", None))
if most_common_dtype is not None:
precision = {np.dtype("float32"): Precision.FP32, np.dtype("float16"): Precision.FP16}[most_common_dtype]
else:
precision = None
return precision
class OnnxLoader(BaseLoader):
def load(self, model_path: Union[str, Path], **_) -> Model:
if isinstance(model_path, Path):
model_path = model_path.as_posix()
model = onnx.load(model_path)
onnx.checker.check_model(model)
onnx.helper.strip_doc_string(model)
model = onnx.shape_inference.infer_shapes(model)
# TODO: probably modification of onnx model ios causes error on optimize
# from onnx.utils import polish_model
# model = polish_model(model) # run checker, docs strip, optimizer and shape inference
inputs = {vi.name: _value_info2tensor_spec(vi) for vi in model.graph.input}
outputs = {vi.name: _value_info2tensor_spec(vi) for vi in model.graph.output}
precision = _infer_graph_precision(model.graph)
return Model(model, precision, inputs, outputs)
class OnnxSaver(BaseSaver):
def __init__(self, as_text: bool = False):
self._as_text = as_text
def save(self, model: Model, model_path: Union[str, Path]) -> None:
model_path = Path(model_path)
LOGGER.debug(f"Saving ONNX model to {model_path.as_posix()}")
model_path.parent.mkdir(parents=True, exist_ok=True)
onnx_model: onnx.ModelProto = model.handle
if self._as_text:
with model_path.open("w") as f:
f.write(text_format.MessageToString(onnx_model))
else:
with model_path.open("wb") as f:
f.write(onnx_model.SerializeToString())
"""
ExecutionProviders on onnxruntime 1.4.0
['TensorrtExecutionProvider',
'CUDAExecutionProvider',
'MIGraphXExecutionProvider',
'NGRAPHExecutionProvider',
'OpenVINOExecutionProvider',
'DnnlExecutionProvider',
'NupharExecutionProvider',
'VitisAIExecutionProvider',
'ArmNNExecutionProvider',
'ACLExecutionProvider',
'CPUExecutionProvider']
"""
def _check_providers(providers):
providers = providers or []
if not isinstance(providers, (list, tuple)):
providers = [providers]
available_providers = onnxruntime.get_available_providers()
unavailable = set(providers) - set(available_providers)
if unavailable:
raise RuntimeError(f"Unavailable providers {unavailable}")
return providers
class OnnxRunner(BaseRunner):
def __init__(self, verbose_runtime_logs: bool = False):
self._providers = None
self._verbose_runtime_logs = verbose_runtime_logs
def init_inference(self, model: Model):
assert isinstance(model.handle, onnx.ModelProto)
return OnnxRunnerSession(
model=model, providers=self._providers, verbose_runtime_logs=self._verbose_runtime_logs
)
class OnnxRunnerSession(BaseRunnerSession):
def __init__(self, model: Model, providers, verbose_runtime_logs: bool = False):
super().__init__(model)
self._input_names = None
self._output_names = None
self._session = None
self._providers = providers
self._verbose_runtime_logs = verbose_runtime_logs
self._old_env_values = {}
def __enter__(self):
self._old_env_values = self._set_env_variables()
sess_options = onnxruntime.SessionOptions() # default session options
if self._verbose_runtime_logs:
sess_options.log_severity_level = 0
sess_options.log_verbosity_level = 1
LOGGER.info(
f"Starting inference session for onnx model providers={self._providers} sess_options={sess_options}"
)
self._input_names = list(self._model.inputs)
self._output_names = list(self._model.outputs)
model_payload = self._model.handle.SerializeToString()
self._session = onnxruntime.InferenceSession(
model_payload, providers=self._providers, sess_options=sess_options
)
return self
def __exit__(self, exc_type, exc_value, traceback):
self._input_names = None
self._output_names = None
self._session = None
self._recover_env_variables(self._old_env_values)
def __call__(self, x: Dict[str, object]):
feed_dict = {k: x[k] for k in self._input_names}
y_pred = self._session.run(self._output_names, feed_dict)
y_pred = dict(zip(self._output_names, y_pred))
return y_pred
loaders.register_extension(Format.ONNX.value, OnnxLoader)
runners.register_extension(Format.ONNX.value, OnnxRunner)
savers.register_extension(Format.ONNX.value, OnnxSaver)
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, Iterable, Optional
# pytype: disable=import-error
import onnx
import tensorrt as trt
from ..core import BaseConverter, Format, Model, Precision, ShapeSpec
from ..extensions import converters
from .utils import get_input_shapes
# pytype: enable=import-error
LOGGER = logging.getLogger(__name__)
TRT_LOGGER = trt.Logger(trt.Logger.INFO)
class Onnx2TRTConverter(BaseConverter):
def __init__(self, *, max_batch_size: int, max_workspace_size: int, precision: str):
self._max_batch_size = max_batch_size
self._max_workspace_size = max_workspace_size
self._precision = Precision(precision)
def convert(self, model: Model, dataloader_fn) -> Model:
input_shapes = get_input_shapes(dataloader_fn(), self._max_batch_size)
cuda_engine = onnx2trt(
model.handle,
shapes=input_shapes,
max_workspace_size=self._max_workspace_size,
max_batch_size=self._max_batch_size,
model_precision=self._precision.value,
)
return model._replace(handle=cuda_engine)
@staticmethod
def required_source_model_precision(requested_model_precision: Precision) -> Precision:
# TensorRT requires source models to be in FP32 precision
return Precision.FP32
def onnx2trt(
onnx_model: onnx.ModelProto,
*,
shapes: Dict[str, ShapeSpec],
max_workspace_size: int,
max_batch_size: int,
model_precision: str,
) -> "trt.ICudaEngine":
"""
Converts onnx model to TensorRT ICudaEngine
Args:
onnx_model: onnx.Model to convert
shapes: dictionary containing min shape, max shape, opt shape for each input name
max_workspace_size: The maximum GPU temporary memory which the CudaEngine can use at execution time.
max_batch_size: The maximum batch size which can be used at execution time,
and also the batch size for which the CudaEngine will be optimized.
model_precision: precision of kernels (possible values: fp16, fp32)
Returns: TensorRT ICudaEngine
"""
# Whether or not 16-bit kernels are permitted.
# During :class:`ICudaEngine` build fp16 kernels will also be tried when this mode is enabled.
fp16_mode = "16" in model_precision
builder = trt.Builder(TRT_LOGGER)
builder.fp16_mode = fp16_mode
builder.max_batch_size = max_batch_size
builder.max_workspace_size = max_workspace_size
# In TensorRT 7.0, the ONNX parser only supports full-dimensions mode,
# meaning that your network definition must be created with the explicitBatch flag set.
# For more information, see
# https://docs.nvidia.com/deeplearning/tensorrt/developer-guide/index.html#work_dynamic_shapes
flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
network = builder.create_network(flags)
with trt.OnnxParser(network, TRT_LOGGER) as parser:
# onnx model parsing
if not parser.parse(onnx_model.SerializeToString()):
for i in range(parser.num_errors):
LOGGER.error(f"OnnxParser error {i}/{parser.num_errors}: {parser.get_error(i)}")
raise RuntimeError("Error during parsing ONNX model (see logs for details)")
# OnnxParser produces here FP32 TensorRT engine for FP16 network
# so we force FP16 here for first input/output
if fp16_mode:
network.get_input(0).dtype = trt.DataType.HALF
network.get_output(0).dtype = trt.DataType.HALF
# optimization
config = builder.create_builder_config()
config.flags |= bool(fp16_mode) << int(trt.BuilderFlag.FP16)
config.max_workspace_size = max_workspace_size
profile = builder.create_optimization_profile()
for name, spec in shapes.items():
profile.set_shape(name, **spec._asdict())
config.add_optimization_profile(profile)
engine = builder.build_engine(network, config=config)
return engine
converters.register_extension(f"{Format.ONNX.value}--{Format.TRT.value}", Onnx2TRTConverter)
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from collections import Counter
from pathlib import Path
from typing import Dict, Iterable, NamedTuple, Optional, Union
import torch # pytype: disable=import-error
import yaml
from ..core import (
GET_MODEL_FN_NAME,
BaseConverter,
BaseLoader,
BaseRunner,
BaseRunnerSession,
BaseSaver,
Format,
Model,
Precision,
TensorSpec,
load_from_file,
)
from ..extensions import converters, loaders, runners, savers
from .utils import get_dynamic_axes, get_input_shapes, get_shapes_with_dynamic_axes
LOGGER = logging.getLogger(__name__)
class InputOutputSpec(NamedTuple):
inputs: Dict[str, TensorSpec]
outputs: Dict[str, TensorSpec]
def get_sample_input(dataloader, device):
for batch in dataloader:
_, x, _ = batch
break
if isinstance(x, dict):
sample_input = list(x.values())
elif isinstance(x, list):
sample_input = x
else:
raise TypeError("The first element (x) of batch returned by dataloader must be a list or a dict")
for idx, s in enumerate(sample_input):
sample_input[idx] = torch.from_numpy(s).to(device)
return tuple(sample_input)
def get_model_device(torch_model):
if next(torch_model.parameters()).is_cuda:
return "cuda"
else:
return "cpu"
def infer_model_precision(model):
counter = Counter()
for param in model.parameters():
counter[param.dtype] += 1
if counter[torch.float16] > 0:
return Precision.FP16
else:
return Precision.FP32
def _get_tensor_dtypes(dataloader, precision):
def _get_dtypes(t):
dtypes = {}
for k, v in t.items():
dtype = str(v.dtype)
if dtype == "float64":
dtype = "float32"
if precision == Precision.FP16 and dtype == "float32":
dtype = "float16"
dtypes[k] = dtype
return dtypes
input_dtypes = {}
output_dtypes = {}
for batch in dataloader:
_, x, y = batch
input_dtypes = _get_dtypes(x)
output_dtypes = _get_dtypes(y)
break
return input_dtypes, output_dtypes
### TODO assumption: floating point input
### type has same precision as the model
def _get_io_spec(model, dataloader_fn):
precision = model.precision
dataloader = dataloader_fn()
input_dtypes, output_dtypes = _get_tensor_dtypes(dataloader, precision)
input_shapes, output_shapes = get_shapes_with_dynamic_axes(dataloader)
inputs = {
name: TensorSpec(name=name, dtype=input_dtypes[name], shape=tuple(input_shapes[name])) for name in model.inputs
}
outputs = {
name: TensorSpec(name=name, dtype=output_dtypes[name], shape=tuple(output_shapes[name]))
for name in model.outputs
}
return InputOutputSpec(inputs, outputs)
class PyTorchModelLoader(BaseLoader):
required_fn_name_for_signature_parsing: Optional[str] = GET_MODEL_FN_NAME
def __init__(self, **kwargs):
self._model_args = kwargs
def load(self, model_path: Union[str, Path], **_) -> Model:
if isinstance(model_path, Path):
model_path = model_path.as_posix()
get_model = load_from_file(model_path, "model", GET_MODEL_FN_NAME)
model, tensor_infos = get_model(**self._model_args)
io_spec = InputOutputSpec(tensor_infos["inputs"], tensor_infos["outputs"])
precision = infer_model_precision(model)
return Model(handle=model, precision=precision, inputs=io_spec.inputs, outputs=io_spec.outputs)
class TorchScriptLoader(BaseLoader):
def __init__(self, tensor_names_path: str = None, **kwargs):
self._model_args = kwargs
self._io_spec = None
if tensor_names_path is not None:
with Path(tensor_names_path).open("r") as fh:
tensor_infos = yaml.load(fh, Loader=yaml.SafeLoader)
self._io_spec = InputOutputSpec(tensor_infos["inputs"], tensor_infos["outputs"])
def load(self, model_path: Union[str, Path], **_) -> Model:
if not isinstance(model_path, Path):
model_path = Path(model_path)
model = torch.jit.load(model_path.as_posix())
precision = infer_model_precision(model)
io_spec = self._io_spec
if not io_spec:
yaml_path = model_path.parent / f"{model_path.stem}.yaml"
if not yaml_path.is_file():
raise ValueError(
f"If `--tensor-names-path is not provided, "
f"TorchScript model loader expects file {yaml_path} with tensor information."
)
with yaml_path.open("r") as fh:
tensor_info = yaml.load(fh, Loader=yaml.SafeLoader)
io_spec = InputOutputSpec(tensor_info["inputs"], tensor_info["outputs"])
return Model(handle=model, precision=precision, inputs=io_spec.inputs, outputs=io_spec.outputs)
class TorchScriptTraceConverter(BaseConverter):
def __init__(self):
pass
def convert(self, model: Model, dataloader_fn) -> Model:
device = get_model_device(model.handle)
dummy_input = get_sample_input(dataloader_fn(), device)
converted_model = torch.jit.trace_module(model.handle, {"forward": dummy_input})
io_spec = _get_io_spec(model, dataloader_fn)
return Model(converted_model, precision=model.precision, inputs=io_spec.inputs, outputs=io_spec.outputs)
class TorchScriptScriptConverter(BaseConverter):
def __init__(self):
pass
def convert(self, model: Model, dataloader_fn) -> Model:
converted_model = torch.jit.script(model.handle)
io_spec = _get_io_spec(model, dataloader_fn)
return Model(converted_model, precision=model.precision, inputs=io_spec.inputs, outputs=io_spec.outputs)
class PYT2ONNXConverter(BaseConverter):
def __init__(self, onnx_opset: int = None):
self._onnx_opset = onnx_opset
def convert(self, model: Model, dataloader_fn) -> Model:
import tempfile
import onnx # pytype: disable=import-error
assert isinstance(model.handle, torch.jit.ScriptModule) or isinstance(
model.handle, torch.nn.Module
), "The model must be of type 'torch.jit.ScriptModule' or 'torch.nn.Module'. Converter aborted."
dynamic_axes = get_dynamic_axes(dataloader_fn())
device = get_model_device(model.handle)
dummy_input = get_sample_input(dataloader_fn(), device)
with tempfile.TemporaryDirectory() as tmpdirname:
export_path = os.path.join(tmpdirname, "model.onnx")
with torch.no_grad():
torch.onnx.export(
model.handle,
dummy_input,
export_path,
do_constant_folding=True,
input_names=list(model.inputs),
output_names=list(model.outputs),
dynamic_axes=dynamic_axes,
opset_version=self._onnx_opset,
enable_onnx_checker=True,
)
onnx_model = onnx.load(export_path)
onnx.checker.check_model(onnx_model)
onnx.helper.strip_doc_string(onnx_model)
onnx_model = onnx.shape_inference.infer_shapes(onnx_model)
return Model(
handle=onnx_model,
precision=model.precision,
inputs=model.inputs,
outputs=model.outputs,
)
class PYT2TensorRTConverter(BaseConverter):
def __init__(self, max_batch_size: int, max_workspace_size: int, onnx_opset: int, precision: str):
self._max_batch_size = max_batch_size
self._max_workspace_size = max_workspace_size
self._onnx_opset = onnx_opset
self._precision = Precision(precision)
def convert(self, model: Model, dataloader_fn) -> Model:
from .onnx import _infer_graph_precision
from .onnx2trt_conv import onnx2trt
pyt2onnx_converter = PYT2ONNXConverter(self._onnx_opset)
onnx_model = pyt2onnx_converter.convert(model, dataloader_fn).handle
precision = _infer_graph_precision(onnx_model.graph)
input_shapes = get_input_shapes(dataloader_fn(), self._max_batch_size)
cuda_engine = onnx2trt(
onnx_model,
shapes=input_shapes,
max_workspace_size=self._max_workspace_size,
max_batch_size=self._max_batch_size,
model_precision=self._precision.value,
)
return Model(
handle=cuda_engine,
precision=model.precision,
inputs=model.inputs,
outputs=model.outputs,
)
@staticmethod
def required_source_model_precision(requested_model_precision: Precision) -> Precision:
# TensorRT requires source models to be in FP32 precision
return Precision.FP32
class TorchScriptSaver(BaseSaver):
def save(self, model: Model, model_path: Union[str, Path]) -> None:
if not isinstance(model_path, Path):
model_path = Path(model_path)
if isinstance(model.handle, torch.jit.ScriptModule):
torch.jit.save(model.handle, model_path.as_posix())
else:
print("The model must be of type 'torch.jit.ScriptModule'. Saving aborted.")
assert False # temporary error handling
def _format_tensor_spec(tensor_spec):
# wrapping shape with list and whole tensor_spec with dict() is required for correct yaml dump
tensor_spec = tensor_spec._replace(shape=list(tensor_spec.shape))
tensor_spec = dict(tensor_spec._asdict())
return tensor_spec
# store TensorSpecs from inputs and outputs in a yaml file
tensor_specs = {
"inputs": {k: _format_tensor_spec(v) for k, v in model.inputs.items()},
"outputs": {k: _format_tensor_spec(v) for k, v in model.outputs.items()},
}
yaml_path = model_path.parent / f"{model_path.stem}.yaml"
with Path(yaml_path).open("w") as fh:
yaml.dump(tensor_specs, fh, indent=4)
class PyTorchRunner(BaseRunner):
def __init__(self):
pass
def init_inference(self, model: Model):
return PyTorchRunnerSession(model=model)
class PyTorchRunnerSession(BaseRunnerSession):
def __init__(self, model: Model):
super().__init__(model)
assert isinstance(model.handle, torch.jit.ScriptModule) or isinstance(
model.handle, torch.nn.Module
), "The model must be of type 'torch.jit.ScriptModule' or 'torch.nn.Module'. Runner aborted."
self._model = model
self._output_names = None
def __enter__(self):
self._output_names = list(self._model.outputs)
return self
def __exit__(self, exc_type, exc_value, traceback):
self._output_names = None
self._model = None
def __call__(self, x: Dict[str, object]):
with torch.no_grad():
feed_list = [torch.from_numpy(v).cuda() for k, v in x.items()]
y_pred = self._model.handle(*feed_list)
if isinstance(y_pred, torch.Tensor):
y_pred = (y_pred,)
y_pred = [t.cpu().numpy() for t in y_pred]
y_pred = dict(zip(self._output_names, y_pred))
return y_pred
loaders.register_extension(Format.PYT.value, PyTorchModelLoader)
loaders.register_extension(Format.TS_TRACE.value, TorchScriptLoader)
loaders.register_extension(Format.TS_SCRIPT.value, TorchScriptLoader)
converters.register_extension(f"{Format.PYT.value}--{Format.TS_SCRIPT.value}", TorchScriptScriptConverter)
converters.register_extension(f"{Format.PYT.value}--{Format.TS_TRACE.value}", TorchScriptTraceConverter)
converters.register_extension(f"{Format.PYT.value}--{Format.ONNX.value}", PYT2ONNXConverter)
converters.register_extension(f"{Format.PYT.value}--{Format.TRT.value}", PYT2TensorRTConverter)
savers.register_extension(Format.TS_SCRIPT.value, TorchScriptSaver)
savers.register_extension(Format.TS_TRACE.value, TorchScriptSaver)
runners.register_extension(Format.PYT.value, PyTorchRunner)
runners.register_extension(Format.TS_SCRIPT.value, PyTorchRunner)
runners.register_extension(Format.TS_TRACE.value, PyTorchRunner)
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
from pathlib import Path
from typing import Dict, NamedTuple, Optional, Union
import numpy as np
# pytype: disable=import-error
try:
import pycuda.autoinit
import pycuda.driver as cuda
except (ImportError, Exception) as e:
logging.getLogger(__name__).warning(f"Problems with importing pycuda package; {e}")
# pytype: enable=import-error
import tensorrt as trt # pytype: disable=import-error
from ..core import BaseLoader, BaseRunner, BaseRunnerSession, BaseSaver, Format, Model, Precision, TensorSpec
from ..extensions import loaders, runners, savers
LOGGER = logging.getLogger(__name__)
TRT_LOGGER = trt.Logger(trt.Logger.INFO)
"""
documentation:
https://docs.nvidia.com/deeplearning/tensorrt/api/python_api/index.html
https://docs.nvidia.com/deeplearning/tensorrt/developer-guide/index.html#python_samples_section
"""
class TensorRTLoader(BaseLoader):
def load(self, model_path: Union[str, Path], **_) -> Model:
model_path = Path(model_path)
LOGGER.debug(f"Loading TensorRT engine from {model_path}")
with model_path.open("rb") as fh, trt.Runtime(TRT_LOGGER) as runtime:
engine = runtime.deserialize_cuda_engine(fh.read())
if engine is None:
raise RuntimeError(f"Could not load ICudaEngine from {model_path}")
inputs = {}
outputs = {}
for binding_idx in range(engine.num_bindings):
name = engine.get_binding_name(binding_idx)
is_input = engine.binding_is_input(binding_idx)
dtype = engine.get_binding_dtype(binding_idx)
shape = engine.get_binding_shape(binding_idx)
if is_input:
inputs[name] = TensorSpec(name, dtype, shape)
else:
outputs[name] = TensorSpec(name, dtype, shape)
return Model(engine, None, inputs, outputs)
class TensorRTSaver(BaseSaver):
def __init__(self):
pass
def save(self, model: Model, model_path: Union[str, Path]) -> None:
model_path = Path(model_path)
LOGGER.debug(f"Saving TensorRT engine to {model_path.as_posix()}")
model_path.parent.mkdir(parents=True, exist_ok=True)
engine: "trt.ICudaEngine" = model.handle
with model_path.open("wb") as fh:
fh.write(engine.serialize())
class TRTBuffers(NamedTuple):
x_host: Optional[Dict[str, object]]
x_dev: Dict[str, object]
y_pred_host: Dict[str, object]
y_pred_dev: Dict[str, object]
class TensorRTRunner(BaseRunner):
def __init__(self):
pass
def init_inference(self, model: Model):
return TensorRTRunnerSession(model=model)
class TensorRTRunnerSession(BaseRunnerSession):
def __init__(self, model: Model):
super().__init__(model)
assert isinstance(model.handle, trt.ICudaEngine)
self._model = model
self._has_dynamic_shapes = None
self._context = None
self._engine: trt.ICudaEngine = self._model.handle
self._cuda_context = pycuda.autoinit.context
self._input_names = None
self._output_names = None
self._buffers = None
def __enter__(self):
self._context = self._engine.create_execution_context()
self._context.__enter__()
self._input_names = [
self._engine[idx] for idx in range(self._engine.num_bindings) if self._engine.binding_is_input(idx)
]
self._output_names = [
self._engine[idx] for idx in range(self._engine.num_bindings) if not self._engine.binding_is_input(idx)
]
# all_binding_shapes_specified is True for models without dynamic shapes
# so initially this variable is False for models with dynamic shapes
self._has_dynamic_shapes = not self._context.all_binding_shapes_specified
return self
def __exit__(self, exc_type, exc_value, traceback):
self._context.__exit__(exc_type, exc_value, traceback)
self._input_names = None
self._output_names = None
# TODO: are cuda buffers dealloc automatically?
self._buffers = None
def __call__(self, x):
buffers = self._prepare_buffers_if_needed(x)
bindings = self._update_bindings(buffers)
for name in self._input_names:
cuda.memcpy_htod(buffers.x_dev[name], buffers.x_host[name])
self._cuda_context.push()
self._context.execute_v2(bindings=bindings)
self._cuda_context.pop()
for name in self._output_names:
cuda.memcpy_dtoh(buffers.y_pred_host[name], buffers.y_pred_dev[name])
return buffers.y_pred_host
def _update_bindings(self, buffers: TRTBuffers):
bindings = [None] * self._engine.num_bindings
for name in buffers.y_pred_dev:
binding_idx: int = self._engine[name]
bindings[binding_idx] = buffers.y_pred_dev[name]
for name in buffers.x_dev:
binding_idx: int = self._engine[name]
bindings[binding_idx] = buffers.x_dev[name]
return bindings
def _set_dynamic_input_shapes(self, x_host):
def _is_shape_dynamic(input_shape):
return any([dim is None or dim == -1 for dim in input_shape])
for name in self._input_names:
bindings_idx = self._engine[name]
data_shape = x_host[name].shape # pytype: disable=attribute-error
if self._engine.is_shape_binding(bindings_idx):
input_shape = self._context.get_shape(bindings_idx)
if _is_shape_dynamic(input_shape):
self._context.set_shape_input(bindings_idx, data_shape)
else:
input_shape = self._engine.get_binding_shape(bindings_idx)
if _is_shape_dynamic(input_shape):
self._context.set_binding_shape(bindings_idx, data_shape)
assert self._context.all_binding_shapes_specified and self._context.all_shape_inputs_specified
def _prepare_buffers_if_needed(self, x_host: Dict[str, object]):
# pytype: disable=attribute-error
new_batch_size = list(x_host.values())[0].shape[0]
current_batch_size = list(self._buffers.y_pred_host.values())[0].shape[0] if self._buffers else 0
# pytype: enable=attribute-error
if self._has_dynamic_shapes or new_batch_size != current_batch_size:
# TODO: are CUDA buffers dealloc automatically?
self._set_dynamic_input_shapes(x_host)
y_pred_host = {}
for name in self._output_names:
shape = self._context.get_binding_shape(self._engine[name])
y_pred_host[name] = np.zeros(shape, dtype=trt.nptype(self._model.outputs[name].dtype))
y_pred_dev = {name: cuda.mem_alloc(data.nbytes) for name, data in y_pred_host.items()}
x_dev = {
name: cuda.mem_alloc(host_input.nbytes)
for name, host_input in x_host.items()
if name in self._input_names # pytype: disable=attribute-error
}
self._buffers = TRTBuffers(None, x_dev, y_pred_host, y_pred_dev)
return self._buffers._replace(x_host=x_host)
if "pycuda.driver" in sys.modules:
loaders.register_extension(Format.TRT.value, TensorRTLoader)
runners.register_extension(Format.TRT.value, TensorRTRunner)
savers.register_extension(Format.TRT.value, TensorRTSaver)
else:
LOGGER.warning("Do not register TensorRT extension due problems with importing pycuda.driver package.")
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import Counter
from typing import Callable, Dict, List
import networkx as nx
from ..core import ShapeSpec
def infer_precision(
nx_graph: nx.Graph,
input_names: List[str],
output_names: List[str],
get_node_dtype_fn: Callable,
):
node_dtypes = [nx_graph.nodes[node_name].get("dtype", None) for node_name in nx_graph.nodes]
node_dtypes = [dt for dt in node_dtypes if dt is None or dt.kind not in ["i", "b"]]
dtypes_counter = Counter(node_dtypes)
return dtypes_counter.most_common()[0][0]
def get_shapes_with_dynamic_axes(dataloader, batch_size_dim=0):
def _set_dynamic_shapes(t, shapes):
for k, v in t.items():
shape = list(v.shape)
for dim, s in enumerate(shape):
if shapes[k][dim] != -1 and shapes[k][dim] != s:
shapes[k][dim] = -1
## get all shapes from input and output tensors
input_shapes = {}
output_shapes = {}
for batch in dataloader:
_, x, y = batch
for k, v in x.items():
input_shapes[k] = list(v.shape)
for k, v in y.items():
output_shapes[k] = list(v.shape)
break
# based on max <max_num_iters> iterations, check which
# dimensions differ to determine dynamic_axes
max_num_iters = 100
for idx, batch in enumerate(dataloader):
if idx >= max_num_iters:
break
_, x, y = batch
_set_dynamic_shapes(x, input_shapes)
_set_dynamic_shapes(y, output_shapes)
return input_shapes, output_shapes
def get_dynamic_axes(dataloader, batch_size_dim=0):
input_shapes, output_shapes = get_shapes_with_dynamic_axes(dataloader, batch_size_dim)
all_shapes = {**input_shapes, **output_shapes}
dynamic_axes = {}
for k, shape in all_shapes.items():
for idx, s in enumerate(shape):
if s == -1:
dynamic_axes[k] = {idx: k + "_" + str(idx)}
for k, v in all_shapes.items():
if k in dynamic_axes:
dynamic_axes[k].update({batch_size_dim: "batch_size_" + str(batch_size_dim)})
else:
dynamic_axes[k] = {batch_size_dim: "batch_size_" + str(batch_size_dim)}
return dynamic_axes
def get_input_shapes(dataloader, max_batch_size=1) -> Dict[str, ShapeSpec]:
def init_counters_and_shapes(x, counters, min_shapes, max_shapes):
for k, v in x.items():
counters[k] = Counter()
min_shapes[k] = [float("inf")] * v.ndim
max_shapes[k] = [float("-inf")] * v.ndim
counters = {}
min_shapes: Dict[str, tuple] = {}
max_shapes: Dict[str, tuple] = {}
for idx, batch in enumerate(dataloader):
ids, x, y = batch
if idx == 0:
init_counters_and_shapes(x, counters, min_shapes, max_shapes)
for k, v in x.items():
shape = v.shape
counters[k][shape] += 1
min_shapes[k] = tuple([min(a, b) for a, b in zip(min_shapes[k], shape)])
max_shapes[k] = tuple([max(a, b) for a, b in zip(max_shapes[k], shape)])
opt_shapes: Dict[str, tuple] = {}
for k, v in counters.items():
opt_shapes[k] = v.most_common(1)[0][0]
shapes = {}
for k in opt_shapes.keys(): # same keys in min_shapes and max_shapes
shapes[k] = ShapeSpec(
min=(1,) + min_shapes[k][1:],
max=(max_batch_size,) + max_shapes[k][1:],
opt=(max_batch_size,) + opt_shapes[k][1:],
)
return shapes
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
import re
from typing import Dict, List
from natsort import natsorted
from tabulate import tabulate
def sort_results(results: List):
results = natsorted(results, key=lambda item: [item[key] for key in item.keys()])
return results
def save_results(filename: str, data: List, formatted: bool = False):
data = format_data(data=data) if formatted else data
with open(filename, "a") as csvfile:
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow(row)
def format_data(data: List[Dict]) -> List[Dict]:
formatted_data = list()
for item in data:
formatted_item = format_keys(data=item)
formatted_data.append(formatted_item)
return formatted_data
def format_keys(data: Dict) -> Dict:
keys = {format_key(key=key): value for key, value in data.items()}
return keys
def format_key(key: str) -> str:
key = " ".join([k.capitalize() for k in re.split("_| ", key)])
return key
def show_results(results: List[Dict]):
headers = list(results[0].keys())
summary = map(lambda x: list(map(lambda item: item[1], x.items())), results)
print(tabulate(summary, headers=headers))
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from typing import List, Optional
def warmup(
model_name: str,
batch_sizes: List[int],
triton_gpu_engine_count: int = 1,
triton_instances: int = 1,
profiling_data: str = "random",
input_shapes: Optional[List[str]] = None,
server_url: str = "localhost",
measurement_window: int = 10000,
shared_memory: bool = False
):
print("\n")
print(f"==== Warmup start ====")
print("\n")
input_shapes = " ".join(map(lambda shape: f" --shape {shape}", input_shapes)) if input_shapes else ""
measurement_window = 6 * measurement_window
max_batch_size = max(batch_sizes)
max_total_requests = 2 * max_batch_size * triton_instances * triton_gpu_engine_count
max_concurrency = min(256, max_total_requests)
batch_size = max(1, max_total_requests // 256)
step = max(1, max_concurrency // 2)
min_concurrency = step
exec_args = f"""-m {model_name} \
-x 1 \
-p {measurement_window} \
-v \
-i http \
-u {server_url}:8000 \
-b {batch_size} \
--concurrency-range {min_concurrency}:{max_concurrency}:{step} \
--input-data {profiling_data} {input_shapes}"""
if shared_memory:
exec_args += " --shared-memory=cuda"
result = os.system(f"perf_client {exec_args}")
if result != 0:
print(f"Failed running performance tests. Perf client failed with exit code {result}")
sys.exit(1)
print("\n")
print(f"==== Warmup done ====")
print("\n")
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, List, NamedTuple, Optional
import numpy as np
from deployment_toolkit.core import BaseMetricsCalculator
class MetricsCalculator(BaseMetricsCalculator):
def __init__(self):
pass
def calc(
self,
*,
ids: List[Any],
y_pred: Dict[str, np.ndarray],
x: Optional[Dict[str, np.ndarray]],
y_real: Optional[Dict[str, np.ndarray]],
) -> Dict[str, float]:
categories = np.argmax(y_pred["OUTPUT__0"], axis=-1)
print(categories.shape)
print(categories[:128], y_pred["OUTPUT__0"] )
print(y_real["OUTPUT__0"][:128])
return {
"accuracy": np.mean(np.argmax(y_pred["OUTPUT__0"], axis=-1) ==
np.argmax(y_real["OUTPUT__0"], axis=-1))
}
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
def update_argparser(parser):
parser.add_argument(
"--config", default="resnet50", type=str, required=True, help="Network to deploy")
parser.add_argument(
"--checkpoint", default=None, type=str, help="The checkpoint of the model. ")
parser.add_argument("--classes", type=int, default=1000, help="Number of classes")
parser.add_argument("--precision", type=str, default="fp32",
choices=["fp32", "fp16"], help="Inference precision")
def get_model(**model_args):
from image_classification import models
model = models.resnet50(pretrained=False)
if "checkpoint" in model_args:
print(f"loading checkpoint {model_args['checkpoint']}")
state_dict = torch.load(model_args["checkpoint"], map_location="cpu")
try:
model.load_state_dict(
{
k.replace("module.", ""): v
for k, v in state_dict.items()
}
)
except RuntimeError as RE:
if not hasattr(model, "ngc_checkpoint_remap"):
raise RE
remap_old = model.ngc_checkpoint_remap(version="20.06.0")
remap_dist = lambda k: k.replace("module.", "")
model.load_state_dict(
{
remap_old(remap_dist(k)): v
for k, v in state_dict.items()
}
)
if model_args["precision"] == "fp16":
model = model.half()
model = model.cuda()
model.eval()
tensor_names = {"inputs": ["INPUT__0"],
"outputs": ["OUTPUT__0"]}
return model, tensor_names
#!/usr/bin/env python3
# Copyright (c) 2021 NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tarfile
from pathlib import Path
from typing import Tuple, Dict, List
from PIL import Image
from tqdm import tqdm
DATASETS_DIR = os.environ.get("DATASETS_DIR", None)
IMAGENET_DIRNAME = "imagenet"
IMAGE_ARCHIVE_FILENAME = "ILSVRC2012_img_val.tar"
DEVKIT_ARCHIVE_FILENAME = "ILSVRC2012_devkit_t12.tar.gz"
LABELS_REL_PATH = "ILSVRC2012_devkit_t12/data/ILSVRC2012_validation_ground_truth.txt"
META_REL_PATH = "ILSVRC2012_devkit_t12/data/meta.mat"
TARGET_SIZE = (224, 224) # (width, height)
_RESIZE_MIN = 256 # resize preserving aspect ratio to where this is minimal size
def parse_meta_mat(metafile) -> Dict[int, str]:
import scipy.io
meta = scipy.io.loadmat(metafile, squeeze_me=True)["synsets"]
nums_children = list(zip(*meta))[4]
meta = [meta[idx] for idx, num_children in enumerate(nums_children) if num_children == 0]
idcs, wnids = list(zip(*meta))[:2]
idx_to_wnid = {idx: wnid for idx, wnid in zip(idcs, wnids)}
return idx_to_wnid
def _process_image(image_file, target_size):
image = Image.open(image_file)
original_size = image.size
# scale image to size where minimal size is _RESIZE_MIN
scale_factor = max(_RESIZE_MIN / original_size[0], _RESIZE_MIN / original_size[1])
resize_to = int(original_size[0] * scale_factor), int(original_size[1] * scale_factor)
resized_image = image.resize(resize_to)
# central crop of image to target_size
left, upper = (resize_to[0] - target_size[0]) // 2, (resize_to[1] - target_size[1]) // 2
cropped_image = resized_image.crop((left, upper, left + target_size[0], upper + target_size[1]))
return cropped_image
def main():
import argparse
parser = argparse.ArgumentParser(description="short_description")
parser.add_argument(
"--dataset-dir",
help="Path to dataset directory where imagenet archives are stored and processed files will be saved.",
required=False,
default=DATASETS_DIR,
)
parser.add_argument(
"--target-size",
help="Size of target image. Format it as <width>,<height>.",
required=False,
default=",".join(map(str, TARGET_SIZE)),
)
args = parser.parse_args()
if args.dataset_dir is None:
raise ValueError(
"Please set $DATASETS_DIR env variable to point dataset dir with original dataset archives "
"and where processed files should be stored. Alternatively provide --dataset-dir CLI argument"
)
datasets_dir = Path(args.dataset_dir)
target_size = tuple(map(int, args.target_size.split(",")))
image_archive_path = datasets_dir / IMAGE_ARCHIVE_FILENAME
if not image_archive_path.exists():
raise RuntimeError(
f"There should be {IMAGE_ARCHIVE_FILENAME} file in {datasets_dir}."
f"You need to download the dataset from http://www.image-net.org/download."
)
devkit_archive_path = datasets_dir / DEVKIT_ARCHIVE_FILENAME
if not devkit_archive_path.exists():
raise RuntimeError(
f"There should be {DEVKIT_ARCHIVE_FILENAME} file in {datasets_dir}."
f"You need to download the dataset from http://www.image-net.org/download."
)
with tarfile.open(devkit_archive_path, mode="r") as devkit_archive_file:
labels_file = devkit_archive_file.extractfile(LABELS_REL_PATH)
labels = list(map(int, labels_file.readlines()))
# map validation labels (idxes from LABELS_REL_PATH) into WNID compatible with training set
meta_file = devkit_archive_file.extractfile(META_REL_PATH)
idx_to_wnid = parse_meta_mat(meta_file)
labels_wnid = [idx_to_wnid[idx] for idx in labels]
# remap WNID into index in sorted list of all WNIDs - this is how network outputs class
available_wnids = sorted(set(labels_wnid))
wnid_to_newidx = {wnid: new_cls for new_cls, wnid in enumerate(available_wnids)}
labels = [wnid_to_newidx[wnid] for wnid in labels_wnid]
output_dir = datasets_dir / IMAGENET_DIRNAME
with tarfile.open(image_archive_path, mode="r") as image_archive_file:
image_rel_paths = sorted(image_archive_file.getnames())
for cls, image_rel_path in tqdm(zip(labels, image_rel_paths), total=len(image_rel_paths)):
output_path = output_dir / str(cls) / image_rel_path
original_image_file = image_archive_file.extractfile(image_rel_path)
processed_image = _process_image(original_image_file, target_size)
output_path.parent.mkdir(parents=True, exist_ok=True)
processed_image.save(output_path.as_posix())
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment