Commit c68e1835 authored by lijian6's avatar lijian6
Browse files

Initial commit

parents
Pipeline #561 failed with stages
in 0 seconds
// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "profile_data_collector.h"
#include <memory>
#include "perf_utils.h"
namespace triton { namespace perfanalyzer {
cb::Error
ProfileDataCollector::Create(std::shared_ptr<ProfileDataCollector>* collector)
{
std::shared_ptr<ProfileDataCollector> local_collector{
new ProfileDataCollector()};
*collector = std::move(local_collector);
return cb::Error::Success;
}
void
ProfileDataCollector::AddWindow(
InferenceLoadMode& id, uint64_t window_start_ns, uint64_t window_end_ns)
{
auto it = FindExperiment(id);
if (it == experiments_.end()) {
Experiment new_experiment{};
new_experiment.mode = id;
new_experiment.window_boundaries.push_back(window_start_ns);
new_experiment.window_boundaries.push_back(window_end_ns);
experiments_.push_back(new_experiment);
} else {
// Window timestamps are always increasing so it is safe to check only the
// last element
if (it->window_boundaries.back() != window_start_ns) {
it->window_boundaries.push_back(window_start_ns);
}
it->window_boundaries.push_back(window_end_ns);
}
}
void
ProfileDataCollector::AddData(
InferenceLoadMode& id, std::vector<RequestRecord>&& request_records)
{
auto it = FindExperiment(id);
if (it == experiments_.end()) {
Experiment new_experiment{};
new_experiment.mode = id;
new_experiment.requests = std::move(request_records);
experiments_.push_back(new_experiment);
} else {
it->requests.insert(
it->requests.end(), std::make_move_iterator(request_records.begin()),
std::make_move_iterator(request_records.end()));
}
}
}} // namespace triton::perfanalyzer
// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <algorithm>
#include <map>
#include <tuple>
#include "client_backend/client_backend.h"
#include "constants.h"
#include "perf_utils.h"
#include "request_record.h"
namespace triton { namespace perfanalyzer {
/// Data structure to hold which inference load mode was used for an experiment.
/// Only one data member will be nonzero, indicating the inference load mode for
/// a particular experiment.
struct InferenceLoadMode {
uint32_t concurrency;
double request_rate;
InferenceLoadMode()
{
concurrency = 0;
request_rate = 0.0;
}
InferenceLoadMode(uint64_t c, double rr)
{
concurrency = c;
request_rate = rr;
}
bool operator==(const InferenceLoadMode& rhs) const
{
return (concurrency == rhs.concurrency) &&
(request_rate == rhs.request_rate);
}
};
/// Data structure to hold profile export data for an experiment (e.g.
/// concurrency 4 or request rate 50)
struct Experiment {
InferenceLoadMode mode;
std::vector<RequestRecord> requests;
std::vector<uint64_t> window_boundaries;
};
#ifndef DOCTEST_CONFIG_DISABLE
class NaggyMockProfileDataCollector;
#endif
/// Data structure and methods for storing profile export data.
class ProfileDataCollector {
public:
static cb::Error Create(std::shared_ptr<ProfileDataCollector>* collector);
~ProfileDataCollector() = default;
/// Add a measurement window to the collector
/// @param id Identifier for the experiment
/// @param window_start_ns The window start timestamp in nanoseconds.
/// @param window_end_ns The window end timestamp in nanoseconds.
void AddWindow(
InferenceLoadMode& id, uint64_t window_start_ns, uint64_t window_end_ns);
/// Add request records to an experiment
/// @param id Identifier for the experiment
/// @param request_records The request information for the current experiment.
void AddData(
InferenceLoadMode& id, std::vector<RequestRecord>&& request_records);
/// Get the experiment data for the profile
/// @return Experiment data
std::vector<Experiment>& GetData() { return experiments_; }
std::string& GetVersion() { return version_; }
private:
ProfileDataCollector() = default;
virtual std::vector<Experiment>::iterator FindExperiment(
InferenceLoadMode& id)
{
return std::find_if(
experiments_.begin(), experiments_.end(),
[&id](const Experiment& e) { return e.mode == id; });
};
std::vector<Experiment> experiments_{};
std::string version_{VERSION};
#ifndef DOCTEST_CONFIG_DISABLE
friend NaggyMockProfileDataCollector;
#endif
};
}} // namespace triton::perfanalyzer
// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "profile_data_exporter.h"
#include <rapidjson/filewritestream.h>
#include <rapidjson/ostreamwrapper.h>
#include <rapidjson/writer.h>
#include "client_backend/client_backend.h"
namespace triton { namespace perfanalyzer {
cb::Error
ProfileDataExporter::Create(std::shared_ptr<ProfileDataExporter>* exporter)
{
std::shared_ptr<ProfileDataExporter> local_exporter{
new ProfileDataExporter()};
*exporter = std::move(local_exporter);
return cb::Error::Success;
}
void
ProfileDataExporter::Export(
const std::vector<Experiment>& raw_experiments, std::string& raw_version,
std::string& file_path)
{
ConvertToJson(raw_experiments, raw_version);
OutputToFile(file_path);
}
void
ProfileDataExporter::ConvertToJson(
const std::vector<Experiment>& raw_experiments, std::string& raw_version)
{
ClearDocument();
rapidjson::Value experiments(rapidjson::kArrayType);
for (const auto& raw_experiment : raw_experiments) {
rapidjson::Value entry(rapidjson::kObjectType);
rapidjson::Value experiment(rapidjson::kObjectType);
rapidjson::Value requests(rapidjson::kArrayType);
rapidjson::Value window_boundaries(rapidjson::kArrayType);
AddExperiment(entry, experiment, raw_experiment);
AddRequests(entry, requests, raw_experiment);
AddWindowBoundaries(entry, window_boundaries, raw_experiment);
experiments.PushBack(entry, document_.GetAllocator());
}
document_.AddMember("experiments", experiments, document_.GetAllocator());
AddVersion(raw_version);
}
void
ProfileDataExporter::ClearDocument()
{
rapidjson::Document d{};
document_.Swap(d);
document_.SetObject();
}
void
ProfileDataExporter::AddExperiment(
rapidjson::Value& entry, rapidjson::Value& experiment,
const Experiment& raw_experiment)
{
rapidjson::Value mode;
rapidjson::Value value;
if (raw_experiment.mode.concurrency != 0) {
mode = rapidjson::StringRef("concurrency");
value.SetUint64(raw_experiment.mode.concurrency);
} else {
mode = rapidjson::StringRef("request_rate");
value.SetDouble(raw_experiment.mode.request_rate);
}
experiment.AddMember("mode", mode, document_.GetAllocator());
experiment.AddMember("value", value, document_.GetAllocator());
entry.AddMember("experiment", experiment, document_.GetAllocator());
}
void
ProfileDataExporter::AddRequests(
rapidjson::Value& entry, rapidjson::Value& requests,
const Experiment& raw_experiment)
{
for (auto& raw_request : raw_experiment.requests) {
rapidjson::Value request(rapidjson::kObjectType);
rapidjson::Value timestamp;
timestamp.SetUint64(raw_request.start_time_.time_since_epoch().count());
request.AddMember("timestamp", timestamp, document_.GetAllocator());
if (raw_request.sequence_id_ != 0) {
rapidjson::Value sequence_id;
sequence_id.SetUint64(raw_request.sequence_id_);
request.AddMember("sequence_id", sequence_id, document_.GetAllocator());
}
rapidjson::Value responses(rapidjson::kArrayType);
AddResponses(responses, raw_request.response_times_);
request.AddMember(
"response_timestamps", responses, document_.GetAllocator());
requests.PushBack(request, document_.GetAllocator());
}
entry.AddMember("requests", requests, document_.GetAllocator());
}
void
ProfileDataExporter::AddResponses(
rapidjson::Value& responses,
const std::vector<std::chrono::time_point<std::chrono::system_clock>>&
response_times)
{
for (auto& response : response_times) {
rapidjson::Value time;
time.SetUint64(response.time_since_epoch().count());
responses.PushBack(time, document_.GetAllocator());
}
}
void
ProfileDataExporter::AddWindowBoundaries(
rapidjson::Value& entry, rapidjson::Value& window_boundaries,
const Experiment& raw_experiment)
{
for (auto& window : raw_experiment.window_boundaries) {
rapidjson::Value w;
w.SetUint64(window);
window_boundaries.PushBack(w, document_.GetAllocator());
}
entry.AddMember(
"window_boundaries", window_boundaries, document_.GetAllocator());
}
void
ProfileDataExporter::AddVersion(std::string& raw_version)
{
rapidjson::Value version;
version = rapidjson::StringRef(raw_version.c_str());
document_.AddMember("version", version, document_.GetAllocator());
}
void
ProfileDataExporter::OutputToFile(std::string& file_path)
{
FILE* fp = fopen(file_path.c_str(), "w");
if (fp == nullptr) {
throw PerfAnalyzerException(
"failed to open file for outputting raw profile data", GENERIC_ERROR);
}
char writeBuffer[65536];
rapidjson::FileWriteStream os(fp, writeBuffer, sizeof(writeBuffer));
rapidjson::Writer<rapidjson::FileWriteStream> writer(os);
document_.Accept(writer);
fclose(fp);
}
}} // namespace triton::perfanalyzer
// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <rapidjson/document.h>
#include "client_backend/client_backend.h"
#include "profile_data_collector.h"
namespace triton { namespace perfanalyzer {
#ifndef DOCTEST_CONFIG_DISABLE
class NaggyMockProfileDataExporter;
#endif
/// Exports profile data.
class ProfileDataExporter {
public:
static cb::Error Create(std::shared_ptr<ProfileDataExporter>* exporter);
~ProfileDataExporter() = default;
/// Export profile data to json file
/// @param raw_experiments All of the raw data for the experiments run by perf
/// analyzer
/// @param raw_version String containing the version number for the json
/// output
/// @param file_path File path to export profile data to.
void Export(
const std::vector<Experiment>& raw_experiments, std::string& raw_version,
std::string& file_path);
private:
ProfileDataExporter() = default;
/// Convert the raw data collected to json output
/// @param raw_experiments All of the raw data for the experiments run by perf
/// analyzer
/// @param raw_version String containing the version number for the json
/// output
virtual void ConvertToJson(
const std::vector<Experiment>& raw_experiments, std::string& raw_version);
virtual void OutputToFile(std::string& file_path);
virtual void AddExperiment(
rapidjson::Value& entry, rapidjson::Value& experiment,
const Experiment& raw_experiment);
void AddRequests(
rapidjson::Value& entry, rapidjson::Value& requests,
const Experiment& raw_experiment);
void AddResponses(
rapidjson::Value& responses,
const std::vector<std::chrono::time_point<std::chrono::system_clock>>&
response_times);
void AddWindowBoundaries(
rapidjson::Value& entry, rapidjson::Value& window_boundaries,
const Experiment& raw_experiment);
void AddVersion(std::string& raw_version);
void ClearDocument();
rapidjson::Document document_{};
#ifndef DOCTEST_CONFIG_DISABLE
friend NaggyMockProfileDataExporter;
#endif
};
}} // namespace triton::perfanalyzer
// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <random>
#include "ictx_id_tracker.h"
namespace triton { namespace perfanalyzer {
// Context ID tracker that is always available and returns random Context IDs
//
class RandCtxIdTracker : public ICtxIdTracker {
public:
RandCtxIdTracker() = default;
void Reset(size_t count) override
{
distribution_ = std::uniform_int_distribution<uint64_t>(0, count - 1);
}
void Restore(size_t id) override{};
size_t Get() override { return distribution_(rng_generator_); };
bool IsAvailable() override { return true; };
private:
std::uniform_int_distribution<uint64_t> distribution_;
std::default_random_engine rng_generator_{};
size_t max = 0;
};
}}; // namespace triton::perfanalyzer
// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <chrono>
#include <memory>
#include <vector>
namespace triton { namespace perfanalyzer {
using NanoIntervals = std::vector<std::chrono::nanoseconds>;
/// Defines a schedule, where the consumer should
/// loop through the provided intervals, and then every time it loops back to
/// the start add an additional amount equal to the duration
///
struct RateSchedule {
NanoIntervals intervals;
std::chrono::nanoseconds duration;
/// Returns the next timestamp in the schedule
///
std::chrono::nanoseconds Next()
{
auto next = intervals[index_] + duration * rounds_;
index_++;
if (index_ >= intervals.size()) {
rounds_++;
index_ = 0;
}
return next;
}
private:
size_t rounds_ = 0;
size_t index_ = 0;
};
using RateSchedulePtr_t = std::shared_ptr<RateSchedule>;
}} // namespace triton::perfanalyzer
// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "report_writer.h"
#include <algorithm>
#include <fstream>
#include "constants.h"
#include "perf_analyzer_exception.h"
namespace triton { namespace perfanalyzer {
cb::Error
ReportWriter::Create(
const std::string& filename, const bool target_concurrency,
const std::vector<pa::PerfStatus>& summary, const bool verbose_csv,
const bool include_server_stats, const int32_t percentile,
const std::shared_ptr<ModelParser>& parser,
std::unique_ptr<ReportWriter>* writer, const bool should_output_metrics)
{
std::unique_ptr<ReportWriter> local_writer(new ReportWriter(
filename, target_concurrency, summary, verbose_csv, include_server_stats,
percentile, parser, should_output_metrics));
*writer = std::move(local_writer);
return cb::Error::Success;
}
ReportWriter::ReportWriter(
const std::string& filename, const bool target_concurrency,
const std::vector<pa::PerfStatus>& summary, const bool verbose_csv,
const bool include_server_stats, const int32_t percentile,
const std::shared_ptr<ModelParser>& parser,
const bool should_output_metrics)
: filename_(filename), target_concurrency_(target_concurrency),
summary_(summary), verbose_csv_(verbose_csv),
include_server_stats_(include_server_stats), percentile_(percentile),
parser_(parser), should_output_metrics_(should_output_metrics)
{
}
void
ReportWriter::GenerateReport()
{
if (!filename_.empty()) {
std::ofstream ofs(filename_, std::ofstream::out);
if (target_concurrency_) {
ofs << "Concurrency,";
} else {
ofs << "Request Rate,";
}
ofs << "Inferences/Second,";
if (parser_->IsDecoupled()) {
ofs << "Response Throughput,";
}
ofs << "Client Send,";
if (include_server_stats_) {
ofs << "Network+Server Send/Recv,Server Queue,"
<< "Server Compute Input,Server Compute Infer,"
<< "Server Compute Output,";
// Only include cache hit if enabled, keep out for backwards
// compatibility if disabled
if (parser_->ResponseCacheEnabled()) {
ofs << "Server Cache Hit,";
ofs << "Server Cache Miss,";
}
}
ofs << "Client Recv";
for (const auto& percentile :
summary_[0].client_stats.percentile_latency_ns) {
ofs << ",p" << percentile.first << " latency";
}
if (verbose_csv_) {
if (percentile_ == -1) {
ofs << ",Avg latency";
}
ofs << ",request/response";
ofs << ",response wait";
if (should_output_metrics_) {
ofs << ",Avg GPU Utilization";
ofs << ",Avg GPU Power Usage";
ofs << ",Max GPU Memory Usage";
ofs << ",Total GPU Memory";
}
}
ofs << std::endl;
// Sort summary results in order of increasing infer/sec.
std::sort(
summary_.begin(), summary_.end(),
[](const pa::PerfStatus& a, const pa::PerfStatus& b) -> bool {
return a.client_stats.infer_per_sec < b.client_stats.infer_per_sec;
});
for (pa::PerfStatus& status : summary_) {
if (target_concurrency_) {
ofs << status.concurrency << ",";
} else {
ofs << status.request_rate << ",";
}
ofs << status.client_stats.infer_per_sec << ",";
if (parser_->IsDecoupled()) {
ofs << status.client_stats.responses_per_sec << ",";
}
ofs << (status.client_stats.avg_send_time_ns / 1000) << ",";
if (include_server_stats_) {
uint64_t avg_queue_ns = status.server_stats.queue_count > 0
? (status.server_stats.queue_time_ns /
status.server_stats.queue_count)
: 0;
uint64_t avg_compute_input_ns =
status.server_stats.compute_input_count > 0
? (status.server_stats.compute_input_time_ns /
status.server_stats.compute_input_count)
: 0;
uint64_t avg_compute_infer_ns =
status.server_stats.compute_infer_count > 0
? (status.server_stats.compute_infer_time_ns /
status.server_stats.compute_infer_count)
: 0;
uint64_t avg_compute_output_ns =
status.server_stats.compute_output_count > 0
? (status.server_stats.compute_output_time_ns /
status.server_stats.compute_output_count)
: 0;
uint64_t compute_time_ns = status.server_stats.compute_input_time_ns +
status.server_stats.compute_infer_time_ns +
status.server_stats.compute_output_time_ns;
if (status.server_stats.compute_input_count !=
status.server_stats.compute_infer_count ||
status.server_stats.compute_infer_count !=
status.server_stats.compute_output_count) {
throw std::runtime_error(
"Server side statistics compute counts must be the same.");
}
uint64_t compute_cnt = status.server_stats.compute_input_count;
uint64_t avg_compute_ns =
compute_cnt > 0 ? compute_time_ns / compute_cnt : 0;
uint64_t avg_cache_hit_ns =
status.server_stats.cache_hit_count > 0
? (status.server_stats.cache_hit_time_ns /
status.server_stats.cache_hit_count)
: 0;
uint64_t avg_cache_miss_ns =
status.server_stats.cache_miss_count > 0
? (status.server_stats.cache_miss_time_ns /
status.server_stats.cache_miss_count)
: 0;
uint64_t avg_client_wait_ns = status.client_stats.avg_latency_ns -
status.client_stats.avg_send_time_ns -
status.client_stats.avg_receive_time_ns;
// Network misc is calculated by subtracting data from different
// measurements (server v.s. client), so the result needs to be capped
// at 0
uint64_t avg_accounted_time = avg_queue_ns + avg_compute_ns +
avg_cache_hit_ns + avg_cache_miss_ns;
uint64_t avg_network_misc_ns =
avg_client_wait_ns > avg_accounted_time
? (avg_client_wait_ns - avg_accounted_time)
: 0;
if (avg_network_misc_ns == 0) {
std::cerr << "Server average accounted time was larger than client "
"average wait time due to small sample size. Increase "
"the measurement interval with `--measurement-interval`."
<< std::endl;
}
ofs << (avg_network_misc_ns / 1000) << "," << (avg_queue_ns / 1000)
<< "," << (avg_compute_input_ns / 1000) << ","
<< (avg_compute_infer_ns / 1000) << ","
<< (avg_compute_output_ns / 1000) << ",";
if (parser_->ResponseCacheEnabled()) {
ofs << (avg_cache_hit_ns / 1000) << ",";
ofs << (avg_cache_miss_ns / 1000) << ",";
}
}
ofs << (status.client_stats.avg_receive_time_ns / 1000);
for (const auto& percentile : status.client_stats.percentile_latency_ns) {
ofs << "," << (percentile.second / 1000);
}
if (verbose_csv_) {
const uint64_t avg_latency_us =
status.client_stats.avg_latency_ns / 1000;
const uint64_t avg_send_time_us =
status.client_stats.avg_send_time_ns / 1000;
const uint64_t avg_receive_time_us =
status.client_stats.avg_receive_time_ns / 1000;
const uint64_t avg_request_time_us =
status.client_stats.avg_request_time_ns / 1000;
const uint64_t avg_response_wait_time_us =
avg_request_time_us - avg_send_time_us - avg_receive_time_us;
if (percentile_ == -1) {
ofs << "," << avg_latency_us;
}
ofs << "," << std::to_string(avg_send_time_us + avg_receive_time_us);
ofs << "," << std::to_string(avg_response_wait_time_us);
if (should_output_metrics_) {
if (status.metrics.size() == 1) {
WriteGpuMetrics(ofs, status.metrics[0]);
} else {
throw PerfAnalyzerException(
"There should only be one entry in the metrics vector.",
GENERIC_ERROR);
}
}
}
ofs << std::endl;
}
ofs.close();
if (include_server_stats_) {
// Record composing model stat in a separate file.
if (!summary_.front().server_stats.composing_models_stat.empty()) {
// For each of the composing model, generate CSV file in the same
// format as the one for ensemble.
for (const auto& model_identifier :
summary_[0].server_stats.composing_models_stat) {
const auto& name = model_identifier.first.first;
const auto& version = model_identifier.first.second;
const auto name_ver = name + "_v" + version;
std::ofstream ofs(name_ver + "." + filename_, std::ofstream::out);
if (target_concurrency_) {
ofs << "Concurrency,";
} else {
ofs << "Request Rate,";
}
ofs << "Inferences/Second,Client Send,"
<< "Network+Server Send/Recv,Server Queue,"
<< "Server Compute Input,Server Compute Infer,"
<< "Server Compute Output,";
// Only include cache hit if enabled, keep out for backwards
// compatibility if disabled
if (parser_->ResponseCacheEnabled()) {
ofs << "Server Cache Hit,";
ofs << "Server Cache Miss,";
}
ofs << "Client Recv" << std::endl;
for (pa::PerfStatus& status : summary_) {
auto it = status.server_stats.composing_models_stat.find(
model_identifier.first);
const auto& stats = it->second;
uint64_t avg_queue_ns =
stats.queue_count > 0 ? stats.queue_time_ns / stats.queue_count
: 0;
uint64_t avg_compute_input_ns =
stats.compute_input_count > 0
? stats.compute_input_time_ns / stats.compute_input_count
: 0;
uint64_t avg_compute_infer_ns =
stats.compute_infer_count > 0
? stats.compute_infer_time_ns / stats.compute_infer_count
: 0;
uint64_t avg_compute_output_ns =
stats.compute_output_count > 0
? stats.compute_output_time_ns / stats.compute_output_count
: 0;
uint64_t compute_time_ns = stats.compute_input_time_ns +
stats.compute_infer_time_ns +
stats.compute_output_time_ns;
if (stats.compute_input_count != stats.compute_infer_count ||
stats.compute_infer_count != stats.compute_output_count) {
throw std::runtime_error(
"Server side statistics compute counts must be the same.");
}
uint64_t compute_cnt = stats.compute_input_count;
uint64_t avg_compute_ns =
compute_cnt > 0 ? compute_time_ns / compute_cnt : 0;
uint64_t avg_cache_hit_ns =
stats.cache_hit_count > 0
? stats.cache_hit_time_ns / stats.cache_hit_count
: 0;
uint64_t avg_cache_miss_ns =
stats.cache_miss_count > 0
? stats.cache_miss_time_ns / stats.cache_miss_count
: 0;
uint64_t avg_overhead_ns =
stats.success_count > 0
? stats.cumm_time_ns / stats.success_count
: 0;
const uint64_t avg_accounted_time = avg_queue_ns + avg_compute_ns +
avg_cache_hit_ns +
avg_cache_miss_ns;
avg_overhead_ns = (avg_overhead_ns > avg_accounted_time)
? (avg_overhead_ns - avg_accounted_time)
: 0;
if (avg_overhead_ns == 0) {
std::cerr
<< "Server average accounted time was larger than client "
"average wait time due to small sample size. Increase "
"the measurement interval with `--measurement-interval`."
<< std::endl;
}
// infer / sec of the composing model is calculated using the
// request count ratio between the composing model and the
// ensemble
double infer_ratio = status.server_stats.success_count > 0
? (1.0 * stats.success_count /
status.server_stats.success_count)
: 0.0;
double infer_per_sec =
infer_ratio * status.client_stats.infer_per_sec;
if (target_concurrency_) {
ofs << status.concurrency << ",";
} else {
ofs << status.request_rate << ",";
}
ofs << infer_per_sec << ",0," << (avg_overhead_ns / 1000) << ","
<< (avg_queue_ns / 1000) << "," << (avg_compute_input_ns / 1000)
<< "," << (avg_compute_infer_ns / 1000) << ","
<< (avg_compute_output_ns / 1000) << ",";
// Only include cache hit if enabled, keep out for backwards
// compatibility if disabled
if (parser_->ResponseCacheEnabled()) {
ofs << (avg_cache_hit_ns / 1000) << ",";
ofs << (avg_cache_miss_ns / 1000) << ",";
}
// Client recv
ofs << "0" << std::endl;
}
}
ofs.close();
}
}
}
}
void
ReportWriter::WriteGpuMetrics(std::ostream& ofs, const Metrics& metric)
{
auto& gpu_util_map = metric.gpu_utilization_per_gpu;
auto& gpu_power_usage_map = metric.gpu_power_usage_per_gpu;
auto& gpu_mem_usage_map = metric.gpu_memory_used_bytes_per_gpu;
auto& gpu_total_mem_map = metric.gpu_memory_total_bytes_per_gpu;
// Currently assume GPU metrics will be appended to existing line
ofs << ",";
for (auto& entry : gpu_util_map) {
ofs << entry.first << ":" << entry.second << ";";
}
ofs << ",";
for (auto& entry : gpu_power_usage_map) {
ofs << entry.first << ":" << entry.second << ";";
}
ofs << ",";
for (auto& entry : gpu_mem_usage_map) {
ofs << entry.first << ":" << entry.second << ";";
}
ofs << ",";
for (auto& entry : gpu_total_mem_map) {
ofs << entry.first << ":" << entry.second << ";";
}
}
}} // namespace triton::perfanalyzer
// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <ostream>
#include "client_backend/client_backend.h"
#include "inference_profiler.h"
#include "metrics.h"
#include "model_parser.h"
#include "perf_utils.h"
namespace triton { namespace perfanalyzer {
#ifndef DOCTEST_CONFIG_DISABLE
class TestReportWriter;
#endif
//==============================================================================
/// ReportWriter is a helper class to generate csv files from the profiled data.
///
class ReportWriter {
public:
~ReportWriter() = default;
/// Create a ReportWriter that is responsible for generating csv output files.
/// \param filename Name of csv file.
/// \param target_concurrency Is there a concurrency range or request rate
/// range?
/// \param summary Returns the trace of the measurement along the
/// search path.
/// \param verbose_csv Print extra information for Model Analyzer
/// \param include_server_stats Are server stats included in output
/// \param percentile The percentile in terms of latency to be reported.
/// if it is a valid percentile value, the percentile latency will reported
/// and used as stable criteria instead of average latency. If it is -1,
/// average latency will be reported and used as stable criteria.
/// \param parser The ModelParse object which holds all the details about the
/// model.
/// \param writer Returns a new ReportWriter object.
/// \param should_output_metrics Whether server-side inference server metrics
/// should be output.
/// \return cb::Error object indicating success or failure.
static cb::Error Create(
const std::string& filename, const bool target_concurrency,
const std::vector<pa::PerfStatus>& summary, const bool verbose_csv,
const bool include_server_stats, const int32_t percentile,
const std::shared_ptr<ModelParser>& parser,
std::unique_ptr<ReportWriter>* writer, const bool should_output_metrics);
void GenerateReport();
/// Output gpu metrics to a stream
/// \param ofs A stream to output the csv data
/// \param metric The metric container for a particular concurrency or request
/// rate
void WriteGpuMetrics(std::ostream& ofs, const Metrics& metric);
private:
ReportWriter(
const std::string& filename, const bool target_concurrency,
const std::vector<pa::PerfStatus>& summary, const bool verbose_csv,
const bool include_server_stats, const int32_t percentile,
const std::shared_ptr<ModelParser>& parser,
const bool should_output_metrics);
const std::string& filename_{""};
const bool target_concurrency_{true};
const bool include_server_stats_{true};
const bool verbose_csv_{true};
const int32_t percentile_{90};
std::vector<pa::PerfStatus> summary_{};
const std::shared_ptr<ModelParser>& parser_{nullptr};
const bool should_output_metrics_{false};
#ifndef DOCTEST_CONFIG_DISABLE
friend TestReportWriter;
public:
ReportWriter() = default;
#endif
};
}} // namespace triton::perfanalyzer
// Copyright 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "request_rate_manager.h"
namespace triton { namespace perfanalyzer {
RequestRateManager::~RequestRateManager()
{
// The destruction of derived class should wait for all the request generator
// threads to finish
StopWorkerThreads();
}
cb::Error
RequestRateManager::Create(
const bool async, const bool streaming,
const uint64_t measurement_window_ms, const size_t max_trials,
Distribution request_distribution, const int32_t batch_size,
const size_t max_threads, const uint32_t num_of_sequences,
const SharedMemoryType shared_memory_type, const size_t output_shm_size,
const bool serial_sequences, const std::shared_ptr<ModelParser>& parser,
const std::shared_ptr<cb::ClientBackendFactory>& factory,
std::unique_ptr<LoadManager>* manager)
{
std::unique_ptr<RequestRateManager> local_manager(new RequestRateManager(
async, streaming, request_distribution, batch_size, measurement_window_ms,
max_trials, max_threads, num_of_sequences, shared_memory_type,
output_shm_size, serial_sequences, parser, factory));
*manager = std::move(local_manager);
return cb::Error::Success;
}
RequestRateManager::RequestRateManager(
const bool async, const bool streaming, Distribution request_distribution,
int32_t batch_size, const uint64_t measurement_window_ms,
const size_t max_trials, const size_t max_threads,
const uint32_t num_of_sequences, const SharedMemoryType shared_memory_type,
const size_t output_shm_size, const bool serial_sequences,
const std::shared_ptr<ModelParser>& parser,
const std::shared_ptr<cb::ClientBackendFactory>& factory)
: LoadManager(
async, streaming, batch_size, max_threads, shared_memory_type,
output_shm_size, parser, factory),
request_distribution_(request_distribution), execute_(false),
num_of_sequences_(num_of_sequences), serial_sequences_(serial_sequences)
{
gen_duration_.reset(new std::chrono::nanoseconds(
max_trials * measurement_window_ms * NANOS_PER_MILLIS));
threads_config_.reserve(max_threads);
}
void
RequestRateManager::InitManagerFinalize()
{
if (on_sequence_model_) {
sequence_manager_->InitSequenceStatuses(num_of_sequences_);
}
}
cb::Error
RequestRateManager::ChangeRequestRate(const double request_rate)
{
PauseWorkers();
ConfigureThreads();
// Can safely update the schedule
GenerateSchedule(request_rate);
ResumeWorkers();
return cb::Error::Success;
}
void
RequestRateManager::GenerateSchedule(const double request_rate)
{
std::chrono::nanoseconds max_duration;
std::function<std::chrono::nanoseconds(std::mt19937&)> distribution;
if (request_distribution_ == Distribution::POISSON) {
distribution = ScheduleDistribution<Distribution::POISSON>(request_rate);
// Poisson distribution needs to generate a schedule for the maximum
// possible duration to make sure that it is as random and as close to the
// desired rate as possible
max_duration = *gen_duration_;
} else if (request_distribution_ == Distribution::CONSTANT) {
distribution = ScheduleDistribution<Distribution::CONSTANT>(request_rate);
// Constant distribution only needs one entry per worker -- that one value
// can be repeated over and over to emulate a full schedule of any length
max_duration = std::chrono::nanoseconds(1);
} else {
return;
}
auto worker_schedules = CreateWorkerSchedules(max_duration, distribution);
GiveSchedulesToWorkers(worker_schedules);
}
std::vector<RateSchedulePtr_t>
RequestRateManager::CreateWorkerSchedules(
std::chrono::nanoseconds max_duration,
std::function<std::chrono::nanoseconds(std::mt19937&)> distribution)
{
std::mt19937 schedule_rng;
std::vector<RateSchedulePtr_t> worker_schedules =
CreateEmptyWorkerSchedules();
std::vector<size_t> thread_ids{CalculateThreadIds()};
std::chrono::nanoseconds next_timestamp(0);
size_t thread_id_index = 0;
size_t worker_index = 0;
// Generate schedule until we hit max_duration, but also make sure that all
// worker schedules follow the thread id distribution
//
while (next_timestamp < max_duration ||
thread_id_index % thread_ids.size() != 0) {
next_timestamp = next_timestamp + distribution(schedule_rng);
worker_index = thread_ids[thread_id_index];
thread_id_index = ++thread_id_index % thread_ids.size();
worker_schedules[worker_index]->intervals.emplace_back(next_timestamp);
}
SetScheduleDurations(worker_schedules);
return worker_schedules;
}
std::vector<RateSchedulePtr_t>
RequestRateManager::CreateEmptyWorkerSchedules()
{
std::vector<RateSchedulePtr_t> worker_schedules;
for (size_t i = 0; i < workers_.size(); i++) {
worker_schedules.push_back(std::make_shared<RateSchedule>());
}
return worker_schedules;
}
std::vector<size_t>
RequestRateManager::CalculateThreadIds()
{
std::vector<size_t> thread_ids{};
// Determine number of ids to loop over for time stamps
size_t num_ids = 0;
if (on_sequence_model_) {
num_ids = num_of_sequences_;
} else {
num_ids = max_threads_;
}
for (size_t i = 0; i < num_ids; i++) {
size_t t = i % DetermineNumThreads();
thread_ids.push_back(t);
}
return thread_ids;
}
void
RequestRateManager::SetScheduleDurations(
std::vector<RateSchedulePtr_t>& schedules)
{
RateSchedulePtr_t last_schedule = schedules.back();
std::chrono::nanoseconds duration = last_schedule->intervals.back();
for (auto schedule : schedules) {
duration = std::max(schedule->intervals.back(), duration);
}
for (auto schedule : schedules) {
schedule->duration = duration;
}
}
void
RequestRateManager::GiveSchedulesToWorkers(
const std::vector<RateSchedulePtr_t>& worker_schedules)
{
for (size_t i = 0; i < workers_.size(); i++) {
auto w = std::dynamic_pointer_cast<IScheduler>(workers_[i]);
w->SetSchedule(worker_schedules[i]);
}
}
void
RequestRateManager::PauseWorkers()
{
// Pause all the threads
execute_ = false;
// Wait to see all threads are paused.
for (auto& thread_config : threads_config_) {
while (!thread_config->is_paused_) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}
void
RequestRateManager::ConfigureThreads()
{
if (threads_.empty()) {
size_t num_of_threads = DetermineNumThreads();
while (workers_.size() < num_of_threads) {
// Launch new thread for inferencing
threads_stat_.emplace_back(new ThreadStat());
threads_config_.emplace_back(
new RequestRateWorker::ThreadConfig(workers_.size()));
workers_.push_back(
MakeWorker(threads_stat_.back(), threads_config_.back()));
}
// Compute the number of sequences for each thread (take floor)
// and spread the remaining value
size_t avg_num_seqs = num_of_sequences_ / workers_.size();
size_t num_seqs_add_one = num_of_sequences_ % workers_.size();
size_t seq_offset = 0;
for (size_t i = 0; i < workers_.size(); i++) {
size_t num_of_seq = avg_num_seqs + (i < num_seqs_add_one ? 1 : 0);
threads_config_[i]->num_sequences_ = num_of_seq;
threads_config_[i]->seq_stat_index_offset_ = seq_offset;
seq_offset += num_of_seq;
threads_.emplace_back(&IWorker::Infer, workers_[i]);
}
}
}
void
RequestRateManager::ResumeWorkers()
{
// Update the start_time_ to point to current time
start_time_ = std::chrono::steady_clock::now();
// Wake up all the threads to begin execution
execute_ = true;
wake_signal_.notify_all();
}
std::shared_ptr<IWorker>
RequestRateManager::MakeWorker(
std::shared_ptr<ThreadStat> thread_stat,
std::shared_ptr<RequestRateWorker::ThreadConfig> thread_config)
{
size_t id = workers_.size();
size_t num_of_threads = DetermineNumThreads();
return std::make_shared<RequestRateWorker>(
id, thread_stat, thread_config, parser_, data_loader_, factory_,
on_sequence_model_, async_, num_of_threads, using_json_data_, streaming_,
batch_size_, wake_signal_, wake_mutex_, execute_, start_time_,
serial_sequences_, infer_data_manager_, sequence_manager_);
}
size_t
RequestRateManager::DetermineNumThreads()
{
size_t num_of_threads = max_threads_;
if (on_sequence_model_) {
num_of_threads = std::min(max_threads_, num_of_sequences_);
}
return num_of_threads;
}
}} // namespace triton::perfanalyzer
// Copyright 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <condition_variable>
#include "load_manager.h"
#include "request_rate_worker.h"
namespace triton { namespace perfanalyzer {
#ifndef DOCTEST_CONFIG_DISABLE
class TestRequestRateManager;
#endif
//==============================================================================
/// RequestRateManager is a helper class to send inference requests to
/// inference server in accordance with a Poisson distribution. This
/// distribution models the real-world traffic patterns.
///
/// An instance of this load manager will be created at the beginning of the
/// perf analyzer and it will be used to simulate load with different target
/// requests per second values and to collect per-request statistic.
///
/// Detail:
/// Request Rate Manager will try to follow a pre-computed schedule while
/// issuing requests to the server and maintain a constant request rate. The
/// manager will spawn max_threads many worker thread to meet the timeline
/// imposed by the schedule. The worker threads will record the start time and
/// end time of each request into a shared vector which will be used to report
/// the observed latencies in serving requests. Additionally, they will report a
/// vector of the number of requests missed their schedule.
///
class RequestRateManager : public LoadManager {
public:
~RequestRateManager();
/// Create an object of realistic load manager that is responsible to maintain
/// specified load on inference server.
/// \param async Whether to use asynchronous or synchronous API for infer
/// request.
/// \param streaming Whether to use gRPC streaming API for infer request
/// \param measurement_window_ms The time window for measurements.
/// \param max_trials The maximum number of windows that will be measured
/// \param request_distribution The kind of distribution to use for drawing
/// out intervals between successive requests.
/// \param batch_size The batch size used for each request.
/// \param max_threads The maximum number of working threads to be spawned.
/// \param num_of_sequences The number of concurrent sequences that must be
/// maintained on the server.
/// \param string_length The length of the string to create for input.
/// \param string_data The data to use for generating string input.
/// \param zero_input Whether to fill the input tensors with zero.
/// \param user_data The vector containing path/paths to user-provided data
/// that can be a directory or path to a json data file.
/// \param shared_memory_type The type of shared memory to use for inputs.
/// \param output_shm_size The size of the shared memory to allocate for the
/// output.
/// \param serial_sequences Enable serial sequence mode.
/// \param parser The ModelParser object to get the model details.
/// \param factory The ClientBackendFactory object used to create
/// client to the server.
/// \param manager Returns a new ConcurrencyManager object.
/// \return cb::Error object indicating success or failure.
static cb::Error Create(
const bool async, const bool streaming,
const uint64_t measurement_window_ms, const size_t max_trials,
Distribution request_distribution, const int32_t batch_size,
const size_t max_threads, const uint32_t num_of_sequences,
const SharedMemoryType shared_memory_type, const size_t output_shm_size,
const bool serial_sequences, const std::shared_ptr<ModelParser>& parser,
const std::shared_ptr<cb::ClientBackendFactory>& factory,
std::unique_ptr<LoadManager>* manager);
/// Adjusts the rate of issuing requests to be the same as 'request_rate'
/// \param request_rate The rate at which requests must be issued to the
/// server.
/// \return cb::Error object indicating success or failure.
cb::Error ChangeRequestRate(const double target_request_rate);
protected:
RequestRateManager(
const bool async, const bool streaming, Distribution request_distribution,
const int32_t batch_size, const uint64_t measurement_window_ms,
const size_t max_trials, const size_t max_threads,
const uint32_t num_of_sequences,
const SharedMemoryType shared_memory_type, const size_t output_shm_size,
const bool serial_sequences, const std::shared_ptr<ModelParser>& parser,
const std::shared_ptr<cb::ClientBackendFactory>& factory);
void InitManagerFinalize() override;
/// Generates and update the request schedule as per the given request rate.
/// \param request_rate The request rate to use for new schedule.
void GenerateSchedule(const double request_rate);
std::vector<RateSchedulePtr_t> CreateWorkerSchedules(
std::chrono::nanoseconds duration,
std::function<std::chrono::nanoseconds(std::mt19937&)> distribution);
std::vector<RateSchedulePtr_t> CreateEmptyWorkerSchedules();
std::vector<size_t> CalculateThreadIds();
void SetScheduleDurations(std::vector<RateSchedulePtr_t>& schedules);
void GiveSchedulesToWorkers(
const std::vector<RateSchedulePtr_t>& worker_schedules);
// Pauses the worker threads
void PauseWorkers();
void ConfigureThreads();
// Resets the counters and resumes the worker threads
void ResumeWorkers();
// Makes a new worker
virtual std::shared_ptr<IWorker> MakeWorker(
std::shared_ptr<ThreadStat>,
std::shared_ptr<RequestRateWorker::ThreadConfig>);
size_t DetermineNumThreads();
std::vector<std::shared_ptr<RequestRateWorker::ThreadConfig>> threads_config_;
std::shared_ptr<std::chrono::nanoseconds> gen_duration_;
Distribution request_distribution_;
std::chrono::steady_clock::time_point start_time_;
bool execute_;
const size_t num_of_sequences_{0};
const bool serial_sequences_{false};
#ifndef DOCTEST_CONFIG_DISABLE
friend TestRequestRateManager;
public:
RequestRateManager() = default;
#endif
};
}} // namespace triton::perfanalyzer
// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "request_rate_worker.h"
#include <algorithm>
#include <thread>
#include "client_backend/client_backend.h"
#include "data_loader.h"
#include "perf_utils.h"
namespace triton { namespace perfanalyzer {
void
RequestRateWorker::Infer()
{
CreateCtxIdTracker();
CreateContexts();
// run inferencing until receiving exit signal to maintain server load.
do {
HandleExecuteOff();
bool is_delayed = SleepIfNecessary();
uint32_t ctx_id = GetCtxId();
SendInferRequest(ctx_id, is_delayed);
RestoreFreeCtxId(ctx_id);
if (HandleExitConditions()) {
return;
}
} while (true);
}
void
RequestRateWorker::CreateCtxIdTracker()
{
bool is_concurrency = false;
ctx_id_tracker_ = CtxIdTrackerFactory::CreateTracker(
is_concurrency, on_sequence_model_, serial_sequences_);
}
void
RequestRateWorker::CreateContexts()
{
size_t active_ctx_cnt =
on_sequence_model_ ? thread_config_->num_sequences_ : 1;
while (ctxs_.size() < active_ctx_cnt) {
CreateContext();
}
ResetFreeCtxIds();
}
void
RequestRateWorker::ResetFreeCtxIds()
{
std::lock_guard<std::mutex> lock(cb_mtx_);
ctx_id_tracker_->Reset(ctxs_.size());
}
void
RequestRateWorker::SetSchedule(RateSchedulePtr_t schedule)
{
schedule_ = schedule;
}
std::chrono::nanoseconds
RequestRateWorker::GetNextTimestamp()
{
return schedule_->Next();
}
uint32_t
RequestRateWorker::GetSeqStatIndex(uint32_t ctx_id)
{
return (thread_config_->seq_stat_index_offset_ + ctx_id);
}
void
RequestRateWorker::HandleExecuteOff()
{
// Should wait till main thread signals execution start
if (!execute_) {
CompleteOngoingSequences();
WaitForOngoingRequests();
// Reset Ctx IDs because CompleteOngoingSequences()
// has destructive side affects
ResetFreeCtxIds();
// Wait if no request should be sent and it is not exiting
thread_config_->is_paused_ = true;
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.wait(lock, [this]() { return early_exit || execute_; });
}
thread_config_->is_paused_ = false;
}
bool
RequestRateWorker::SleepIfNecessary()
{
WaitForFreeCtx();
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
std::chrono::nanoseconds next_timestamp = GetNextTimestamp();
std::chrono::nanoseconds current_timestamp = now - start_time_;
std::chrono::nanoseconds wait_time = next_timestamp - current_timestamp;
bool delayed = false;
if (wait_time.count() < 0) {
delayed = true;
} else {
thread_stat_->idle_timer.Start();
std::this_thread::sleep_for(wait_time);
thread_stat_->idle_timer.Stop();
}
return delayed;
}
void
RequestRateWorker::WaitForFreeCtx()
{
if (!ctx_id_tracker_->IsAvailable()) {
notified_ = false;
// wait for signal from callback.
std::unique_lock<std::mutex> lk(cb_mtx_);
thread_stat_->idle_timer.Start();
cb_cv_.wait(lk, [this] {
if (notified_) {
notified_ = false;
return true;
}
return false;
});
thread_stat_->idle_timer.Stop();
}
}
}} // namespace triton::perfanalyzer
// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <memory>
#include "ischeduler.h"
#include "load_worker.h"
#include "model_parser.h"
#include "sequence_manager.h"
namespace triton { namespace perfanalyzer {
#ifndef DOCTEST_CONFIG_DISABLE
class NaggyMockRequestRateWorker;
class TestRequestRateManager;
class TestCustomLoadManager;
#endif
/// Worker thread for RequestRateManager
///
/// If the model is non-sequence model, each worker uses only one context
/// to maintain concurrency assigned to worker.
/// If the model is sequence model, each worker has to use multiples contexts
/// to maintain (sequence) concurrency assigned to worker.
///
class RequestRateWorker : public LoadWorker, public IScheduler {
public:
struct ThreadConfig {
ThreadConfig(uint32_t index)
: id_(index), seq_stat_index_offset_(0), is_paused_(false),
num_sequences_(1)
{
}
uint32_t id_;
// The starting sequence stat index for this worker
size_t seq_stat_index_offset_;
uint32_t num_sequences_;
bool is_paused_;
};
RequestRateWorker(
uint32_t id, std::shared_ptr<ThreadStat> thread_stat,
std::shared_ptr<ThreadConfig> thread_config,
const std::shared_ptr<ModelParser> parser,
std::shared_ptr<DataLoader> data_loader,
const std::shared_ptr<cb::ClientBackendFactory> factory,
const bool on_sequence_model, const bool async, const size_t num_threads,
const bool using_json_data, const bool streaming,
const int32_t batch_size, std::condition_variable& wake_signal,
std::mutex& wake_mutex, bool& execute,
std::chrono::steady_clock::time_point& start_time,
const bool serial_sequences,
const std::shared_ptr<IInferDataManager>& infer_data_manager,
std::shared_ptr<SequenceManager> sequence_manager)
: LoadWorker(
id, thread_stat, parser, data_loader, factory, on_sequence_model,
async, streaming, batch_size, using_json_data, wake_signal,
wake_mutex, execute, infer_data_manager, sequence_manager),
thread_config_(thread_config), num_threads_(num_threads),
start_time_(start_time), serial_sequences_(serial_sequences)
{
}
void Infer() override;
/// Provides the schedule that should be followed
///
void SetSchedule(RateSchedulePtr_t schedule) override;
private:
RateSchedulePtr_t schedule_;
const size_t num_threads_;
const bool serial_sequences_;
std::chrono::steady_clock::time_point& start_time_;
std::shared_ptr<ThreadConfig> thread_config_;
void CreateCtxIdTracker();
std::chrono::nanoseconds GetNextTimestamp();
uint32_t GetSeqStatIndex(uint32_t ctx_id) override;
void CreateContexts();
void HandleExecuteOff();
void ResetFreeCtxIds();
// Sleep until it is time for the next part of the schedule
// Returns true if the request was delayed
bool SleepIfNecessary();
void WaitForFreeCtx();
void CreateContextFinalize(std::shared_ptr<InferContext> ctx) override
{
ctx->RegisterAsyncCallbackFinalize(std::bind(
&RequestRateWorker::AsyncCallbackFinalize, this,
std::placeholders::_1));
ctx->SetNumActiveThreads(num_threads_);
}
#ifndef DOCTEST_CONFIG_DISABLE
friend NaggyMockRequestRateWorker;
friend TestCustomLoadManager;
friend TestRequestRateManager;
#endif
};
}} // namespace triton::perfanalyzer
// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <chrono>
#include <cstdint>
#include <tuple>
#include <vector>
namespace triton { namespace perfanalyzer {
/// A record of an individual request
struct RequestRecord {
RequestRecord() = default;
RequestRecord(
std::chrono::time_point<std::chrono::system_clock> start_time,
std::vector<std::chrono::time_point<std::chrono::system_clock>>
response_times,
bool sequence_end, bool delayed, uint64_t sequence_id,
bool has_null_last_response)
: start_time_(start_time), response_times_(response_times),
sequence_end_(sequence_end), delayed_(delayed),
sequence_id_(sequence_id),
has_null_last_response_(has_null_last_response)
{
}
// The timestamp of when the request was started.
std::chrono::time_point<std::chrono::system_clock> start_time_;
// Collection of response times
std::vector<std::chrono::time_point<std::chrono::system_clock>>
response_times_;
// Whether or not the request is at the end of a sequence.
bool sequence_end_;
// Whether or not the request is delayed as per schedule.
bool delayed_;
// Sequence ID of the request
uint64_t sequence_id_;
// Whether the last response is null
bool has_null_last_response_;
};
}} // namespace triton::perfanalyzer
// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "sequence_manager.h"
namespace triton { namespace perfanalyzer {
SequenceManager::SequenceManager(
const uint64_t start_sequence_id, const uint64_t sequence_id_range,
const size_t sequence_length, const bool sequence_length_specified,
const double sequence_length_variation, const bool using_json_data,
std::shared_ptr<DataLoader> data_loader)
: start_sequence_id_(start_sequence_id),
sequence_id_range_(sequence_id_range), sequence_length_(sequence_length),
sequence_length_specified_(sequence_length_specified),
sequence_length_variation_(sequence_length_variation),
using_json_data_(using_json_data), data_loader_(data_loader)
{
distribution_ = std::uniform_int_distribution<uint64_t>(
0, data_loader_->GetDataStreamsCount() - 1);
}
void
SequenceManager::InitSequenceStatuses(size_t num_sequence_statuses)
{
sequence_statuses_.clear();
for (size_t sequence_status_index{0};
sequence_status_index < num_sequence_statuses; sequence_status_index++) {
sequence_statuses_.push_back(std::make_shared<SequenceStatus>());
}
}
const uint64_t
SequenceManager::GetSequenceID(size_t sequence_status_index) const
{
return sequence_statuses_.at(sequence_status_index)->seq_id_;
}
std::mutex&
SequenceManager::GetMutex(size_t sequence_status_index)
{
return sequence_statuses_.at(sequence_status_index)->mtx_;
}
const uint64_t
SequenceManager::GetDataStreamID(size_t sequence_status_index) const
{
return sequence_statuses_.at(sequence_status_index)->data_stream_id_;
}
const size_t
SequenceManager::GetRemainingQueries(size_t sequence_status_index) const
{
return sequence_statuses_.at(sequence_status_index)->remaining_queries_;
}
void
SequenceManager::SetRemainingQueries(
size_t sequence_status_index, size_t remaining_queries)
{
sequence_statuses_.at(sequence_status_index)->remaining_queries_ =
remaining_queries;
}
void
SequenceManager::DecrementRemainingQueries(size_t sequence_status_index)
{
sequence_statuses_.at(sequence_status_index)->remaining_queries_--;
}
const size_t
SequenceManager::GetNumSequenceStatuses() const
{
return sequence_statuses_.size();
}
void
SequenceManager::SetInferSequenceOptions(
const uint32_t seq_stat_index, std::unique_ptr<cb::InferOptions>& options)
{
options->sequence_start_ =
(sequence_statuses_[seq_stat_index]->remaining_queries_ == 0);
// New sequence must be initialized before setting the id.
if (options->sequence_start_) {
InitNewSequence(seq_stat_index);
}
options->sequence_id_ = sequence_statuses_[seq_stat_index]->seq_id_;
options->sequence_end_ =
(sequence_statuses_[seq_stat_index]->remaining_queries_ == 1);
}
const size_t
SequenceManager::GetSequenceLength(size_t sequence_status_index) const
{
return sequence_statuses_.at(sequence_status_index)->sequence_length_;
}
void
SequenceManager::InitNewSequence(int seq_stat_index)
{
sequence_statuses_[seq_stat_index]->seq_id_ = GetNextSeqId(seq_stat_index);
if (!using_json_data_) {
size_t new_length = GetRandomSequenceLength(sequence_length_variation_);
sequence_statuses_[seq_stat_index]->remaining_queries_ =
new_length == 0 ? 1 : new_length;
} else {
// Selecting next available data stream based on uniform distribution.
const uint64_t data_stream_id{GetNewDataStreamId()};
sequence_statuses_[seq_stat_index]->data_stream_id_ = data_stream_id;
const size_t total_steps{data_loader_->GetTotalSteps(data_stream_id)};
if (sequence_length_specified_) {
const size_t varied_sequence_length{
GetRandomSequenceLength(sequence_length_variation_)};
sequence_statuses_[seq_stat_index]->sequence_length_ =
varied_sequence_length;
} else {
sequence_statuses_[seq_stat_index]->sequence_length_ = total_steps;
}
sequence_statuses_[seq_stat_index]->remaining_queries_ =
sequence_statuses_[seq_stat_index]->sequence_length_;
}
}
uint64_t
SequenceManager::GetNextSeqId(int seq_stat_index)
{
uint64_t old_seq_id = sequence_statuses_[seq_stat_index]->seq_id_;
uint64_t next_seq_id =
curr_seq_id_++ % sequence_id_range_ + start_sequence_id_;
// If the next sequence ID is still in use, reuse the same sequence ID
// that this sequence_status used last time
//
for (uint i = 0; i < sequence_statuses_.size(); i++) {
if (next_seq_id == sequence_statuses_[i]->seq_id_) {
next_seq_id = old_seq_id;
break;
}
}
return next_seq_id;
}
size_t
SequenceManager::GetRandomSequenceLength(double offset_ratio)
{
int random_offset = ((2.0 * rand() / double(RAND_MAX)) - 1.0) * offset_ratio /
100.0 * sequence_length_;
if (int(sequence_length_) + random_offset <= 0) {
return 1;
}
return sequence_length_ + random_offset;
}
}} // namespace triton::perfanalyzer
// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <atomic>
#include <cstdint>
#include <memory>
#include <random>
#include <vector>
#include "client_backend/client_backend.h"
#include "data_loader.h"
#include "sequence_status.h"
namespace triton { namespace perfanalyzer {
#ifndef DOCTEST_CONFIG_DISABLE
class NaggyMockSequenceManager;
#endif
/// Manages operations related to preparing requests to sequence models.
///
class SequenceManager {
public:
/// Constructs the sequence manager object. Involves initializing the
/// distribution for randomly assigning input data streams to new sequences.
/// \param start_sequence_id See associated data member description.
/// \param sequence_id_range See associated data member description.
/// \param sequence_length See associated data member description.
/// \param sequence_length_specified See associated data member description.
/// \param sequence_length_variation See associated data member description.
/// \param using_json_data See associated data member description.
/// \param data_loader See associated data member description.
/// \return The constructed sequence manager object.
///
SequenceManager(
const uint64_t start_sequence_id, const uint64_t sequence_id_range,
const size_t sequence_length, const bool sequence_length_specified,
const double sequence_length_variation, const bool using_json_data,
std::shared_ptr<DataLoader> data_loader);
/// Initializes the sequence statuses data structure.
/// \param num_sequence_statuses The number of sequence status objects to
/// create.
///
void InitSequenceStatuses(size_t num_sequence_statuses);
/// Gets the sequence ID for the specified sequence status object.
/// \param sequence_status_index The index of the sequence status object.
/// \return The sequence ID for the specified sequence status object.
///
const uint64_t GetSequenceID(size_t sequence_status_index) const;
/// Gets a non-const reference to the mutex for the specified sequence status
/// object.
/// \param sequence_status_index The index of the sequence status object.
/// \return A non-const reference to the mutex for the specified sequence
/// status object.
///
std::mutex& GetMutex(size_t sequence_status_index);
/// Gets the data stream ID for the specified sequence status object.
/// \param sequence_status_index The index of the sequence status object.
/// \return The data stream ID for the specified sequence status object.
///
const uint64_t GetDataStreamID(size_t sequence_status_index) const;
/// Gets the remaining queries for the specified sequence status object.
/// \param sequence_status_index The index of the sequence status object.
/// \return The remaining queries for the specified sequence status object.
///
const size_t GetRemainingQueries(size_t sequence_status_index) const;
/// Sets the remaining queries for the specified sequence status object.
/// \param sequence_status_index The index of the sequence status object.
/// \param remaining_queries The new value of the remaining queries for the
/// specified sequence status object.
///
void SetRemainingQueries(
size_t sequence_status_index, size_t remaining_queries);
/// Decrements the remaining queries for the specified sequence status object.
/// \param sequence_status_index The index of the sequence status object.
///
void DecrementRemainingQueries(size_t sequence_status_index);
/// Gets the number of sequence status objects in the sequence statuses data
/// structure.
/// \param sequence_status_index The index of the sequence status object.
/// \return The number of sequence status objects in the sequence statuses
/// data structure.
///
const size_t GetNumSequenceStatuses() const;
/// Sets options related to a single request to a sequence model.
/// \param seq_stat_index The index for the sequence status object that is
/// having its options set.
/// \param options The options object for the request that is being prepared.
///
virtual void SetInferSequenceOptions(
const uint32_t seq_stat_index,
std::unique_ptr<cb::InferOptions>& options);
/// Gets the sequence length for the specified sequence status object.
/// \param sequence_status_index The index of the sequence status object.
/// \return The sequence length for the specified sequence status object.
///
const size_t GetSequenceLength(size_t sequence_status_index) const;
private:
/// Initializes values for a sequence status object.
/// \param seq_stat_index The index for the sequence status object that is
/// being initialized.
///
virtual void InitNewSequence(int seq_stat_index);
/// Determines an appropriate next sequence ID for a renewed sequence status
/// object.
/// \param seq_stat_index The index for the sequence for which a request is
/// being prepared.
/// \return The potentially new sequence ID to be used by a renewed sequence
/// status object.
///
virtual uint64_t GetNextSeqId(int seq_stat_index);
virtual uint64_t GetNewDataStreamId()
{
return distribution_(rng_generator_);
}
/// Generates a random sequence length based on a threshold.
/// \param offset_ratio The offset ratio/threshold of the generated length.
/// \return A random sequence length.
///
virtual size_t GetRandomSequenceLength(double offset_ratio);
/// Data structure holding sequence status objects
///
std::vector<std::shared_ptr<SequenceStatus>> sequence_statuses_{};
/// Current sequence id (for issuing new sequences)
///
std::atomic<uint64_t> curr_seq_id_{0};
/// Data loader to be used for various sequence operations.
///
std::shared_ptr<DataLoader> data_loader_{nullptr};
/// The starting sequence ID to be used for iterating through valid sequence
/// IDs.
///
const uint64_t start_sequence_id_{0};
/// The maximum sequence ID to be used for iterating through valid sequence
/// IDs.
///
const uint64_t sequence_id_range_{0};
/// The base length of new sequences.
///
const size_t sequence_length_{0};
/// Whether the user specified the sequence length.
///
const bool sequence_length_specified_{false};
/// The percentage variation in length of sequences using autogenerated data
/// as input.
///
const double sequence_length_variation_{0.0};
/// Indicates whether to generate sequence request input data or read it from
/// a JSON file.
///
const bool using_json_data_{false};
/// The distribution for randomly assigning new sequences a data stream in the
/// input data JSON.
///
std::uniform_int_distribution<uint64_t> distribution_;
/// The random number generator for randomly assigning new sequences a data
/// stream in the input data JSON.
///
std::default_random_engine rng_generator_{};
#ifndef DOCTEST_CONFIG_DISABLE
friend NaggyMockSequenceManager;
public:
SequenceManager() = default;
#endif
};
}} // namespace triton::perfanalyzer
// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <cstdint>
#include <mutex>
namespace triton { namespace perfanalyzer {
// Holds the status of the inflight sequence
struct SequenceStatus {
SequenceStatus(uint64_t seq_id = 0)
: seq_id_(seq_id), data_stream_id_(0), remaining_queries_(0)
{
}
// The unique correlation id allocated to the sequence
uint64_t seq_id_;
// The data stream id providing data for the sequence
uint64_t data_stream_id_;
// The number of queries remaining to complete the sequence
size_t remaining_queries_;
// The length of the sequence
size_t sequence_length_{0};
// A lock to protect sequence data
std::mutex mtx_;
};
}} // namespace triton::perfanalyzer
// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
namespace triton { namespace perfanalyzer {
/// Data for one input or output tensor
///
struct TensorData {
const uint8_t* data_ptr{nullptr};
size_t batch1_size{0};
bool is_valid{false};
};
}} // namespace triton::perfanalyzer
// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
#include <getopt.h>
#include <array>
#include "command_line_parser.h"
#include "doctest.h"
namespace triton { namespace perfanalyzer {
inline void
CHECK_STRING(const char* name, const std::string& str, const std::string& val)
{
CHECK_MESSAGE(
!str.compare(val), name, " expecting '", val, "', found '", str, "'");
}
inline void
CHECK_STRING(std::string act, std::string exp)
{
CHECK_MESSAGE(
!act.compare(exp), "Expecting: '", exp, "', Found: '", act, "'");
}
std::string
CreateUsageMessage(const std::string& option_name, const std::string& msg)
{
return "Failed to parse " + option_name + ". " + msg;
}
// Performs a doc test check against all the individual parameters
// in a PAParams object.
//
// /param act actual object under test
// /param exp expected value for object
//
inline void
CHECK_PARAMS(PAParamsPtr act, PAParamsPtr exp)
{
CHECK(act->verbose == exp->verbose);
CHECK(act->streaming == exp->streaming);
CHECK(act->extra_verbose == exp->extra_verbose);
CHECK(act->max_threads == exp->max_threads);
CHECK(act->max_threads_specified == exp->max_threads_specified);
CHECK(act->sequence_length == exp->sequence_length);
CHECK(act->percentile == exp->percentile);
REQUIRE(act->user_data.size() == exp->user_data.size());
for (size_t i = 0; i < act->user_data.size(); i++) {
CHECK_STRING(act->user_data[i], exp->user_data[i]);
}
CHECK(act->input_shapes.size() == exp->input_shapes.size());
for (auto act_shape : act->input_shapes) {
auto exp_shape = exp->input_shapes.find(act_shape.first);
REQUIRE_MESSAGE(
exp_shape != exp->input_shapes.end(),
"Unexpected input_shape: ", act_shape.first);
REQUIRE(act_shape.second.size() == exp_shape->second.size());
for (size_t i = 0; i < act_shape.second.size(); i++) {
CHECK_MESSAGE(
act_shape.second[i] == exp_shape->second[i],
"Unexpected shape value for: ", act_shape.first, "[", i, "]");
}
}
CHECK(act->measurement_window_ms == exp->measurement_window_ms);
CHECK(act->using_concurrency_range == exp->using_concurrency_range);
CHECK(act->concurrency_range.start == exp->concurrency_range.start);
CHECK(act->concurrency_range.end == exp->concurrency_range.end);
CHECK(act->concurrency_range.step == exp->concurrency_range.step);
CHECK(act->latency_threshold_ms == exp->latency_threshold_ms);
CHECK(act->stability_threshold == doctest::Approx(act->stability_threshold));
CHECK(act->max_trials == exp->max_trials);
CHECK(act->zero_input == exp->zero_input);
CHECK(act->string_length == exp->string_length);
CHECK_STRING(act->string_data, exp->string_data);
CHECK(act->async == exp->async);
CHECK(act->forced_sync == exp->forced_sync);
CHECK(act->using_request_rate_range == exp->using_request_rate_range);
CHECK(
act->request_rate_range[0] ==
doctest::Approx(exp->request_rate_range[0]));
CHECK(
act->request_rate_range[1] ==
doctest::Approx(exp->request_rate_range[1]));
CHECK(
act->request_rate_range[2] ==
doctest::Approx(exp->request_rate_range[2]));
CHECK(act->num_of_sequences == exp->num_of_sequences);
CHECK(act->search_mode == exp->search_mode);
CHECK(act->request_distribution == exp->request_distribution);
CHECK(act->using_custom_intervals == exp->using_custom_intervals);
CHECK_STRING(act->request_intervals_file, exp->request_intervals_file);
CHECK(act->shared_memory_type == exp->shared_memory_type);
CHECK(act->output_shm_size == exp->output_shm_size);
CHECK(act->kind == exp->kind);
CHECK_STRING(act->model_signature_name, exp->model_signature_name);
CHECK(act->using_grpc_compression == exp->using_grpc_compression);
CHECK(act->compression_algorithm == exp->compression_algorithm);
CHECK(act->measurement_mode == exp->measurement_mode);
CHECK(act->measurement_request_count == exp->measurement_request_count);
CHECK_STRING(act->triton_server_path, exp->triton_server_path);
CHECK_STRING(act->model_repository_path, exp->model_repository_path);
CHECK(act->start_sequence_id == exp->start_sequence_id);
CHECK(act->sequence_id_range == exp->sequence_id_range);
CHECK_STRING(
act->ssl_options.ssl_grpc_certificate_chain_file,
exp->ssl_options.ssl_grpc_certificate_chain_file);
CHECK_STRING(
act->ssl_options.ssl_grpc_private_key_file,
exp->ssl_options.ssl_grpc_private_key_file);
CHECK_STRING(
act->ssl_options.ssl_grpc_root_certifications_file,
exp->ssl_options.ssl_grpc_root_certifications_file);
CHECK(act->ssl_options.ssl_grpc_use_ssl == exp->ssl_options.ssl_grpc_use_ssl);
CHECK_STRING(
act->ssl_options.ssl_https_ca_certificates_file,
exp->ssl_options.ssl_https_ca_certificates_file);
CHECK_STRING(
act->ssl_options.ssl_https_client_certificate_file,
exp->ssl_options.ssl_https_client_certificate_file);
CHECK_STRING(
act->ssl_options.ssl_https_client_certificate_type,
exp->ssl_options.ssl_https_client_certificate_type);
CHECK_STRING(
act->ssl_options.ssl_https_private_key_file,
exp->ssl_options.ssl_https_private_key_file);
CHECK_STRING(
act->ssl_options.ssl_https_private_key_type,
exp->ssl_options.ssl_https_private_key_type);
CHECK(
act->ssl_options.ssl_https_verify_host ==
exp->ssl_options.ssl_https_verify_host);
CHECK(
act->ssl_options.ssl_https_verify_peer ==
exp->ssl_options.ssl_https_verify_peer);
CHECK(act->verbose_csv == exp->verbose_csv);
CHECK(act->enable_mpi == exp->enable_mpi);
CHECK(act->trace_options.size() == exp->trace_options.size());
CHECK(act->using_old_options == exp->using_old_options);
CHECK(act->dynamic_concurrency_mode == exp->dynamic_concurrency_mode);
CHECK(act->url_specified == exp->url_specified);
CHECK_STRING(act->url, exp->url);
CHECK_STRING(act->model_name, exp->model_name);
CHECK_STRING(act->model_version, exp->model_version);
CHECK(act->batch_size == exp->batch_size);
CHECK(act->using_batch_size == exp->using_batch_size);
CHECK(act->concurrent_request_count == exp->concurrent_request_count);
CHECK(act->protocol == exp->protocol);
CHECK(act->http_headers->size() == exp->http_headers->size());
CHECK(act->max_concurrency == exp->max_concurrency);
CHECK_STRING(act->filename, act->filename);
CHECK(act->mpi_driver != nullptr);
CHECK_STRING(act->memory_type, exp->memory_type);
}
#define CHECK_INT_OPTION(option_name, exp_val, msg) \
SUBCASE("valid value") \
{ \
int argc = 5; \
char* argv[argc] = {app_name, "-m", model_name, option_name, "2000"}; \
CAPTURE(argv[3]); \
CAPTURE(argv[4]); \
\
REQUIRE_NOTHROW(act = parser.Parse(argc, argv)); \
CHECK(!parser.UsageCalled()); \
CAPTURE(parser.GetUsageMessage()); \
\
exp_val = 2000; \
CAPTURE(exp_val); \
} \
\
SUBCASE("negative value") \
{ \
int argc = 5; \
char* argv[argc] = {app_name, "-m", model_name, option_name, "-2000"}; \
REQUIRE_NOTHROW(act = parser.Parse(argc, argv)); \
CHECK(parser.UsageCalled()); \
CHECK_STRING("Usage Message", parser.GetUsageMessage(), msg); \
} \
\
SUBCASE("floating point value") \
{ \
int argc = 5; \
char* argv[argc] = {app_name, "-m", model_name, option_name, "29.5"}; \
\
REQUIRE_NOTHROW(act = parser.Parse(argc, argv)); \
CHECK(!parser.UsageCalled()); \
\
exp_val = 29; \
} \
\
SUBCASE("missing value") \
{ \
int argc = 4; \
char* argv[argc] = {app_name, "-m", model_name, option_name}; \
\
opterr = 0; \
REQUIRE_NOTHROW(act = parser.Parse(argc, argv)); \
CHECK(parser.UsageCalled()); \
CHECK_STRING("Usage Message", parser.GetUsageMessage(), ""); \
}
TEST_CASE("Testing PerfAnalyzerParameters")
{
PAParamsPtr params(new PerfAnalyzerParameters{});
CHECK(params->verbose == false);
CHECK(params->streaming == false);
CHECK(params->extra_verbose == false);
CHECK(params->max_threads == 4);
CHECK(params->max_threads_specified == false);
CHECK(params->sequence_length == 20);
CHECK(params->percentile == -1);
CHECK(params->user_data.size() == 0);
CHECK(params->input_shapes.size() == 0);
CHECK(params->measurement_window_ms == 5000);
CHECK(params->using_concurrency_range == false);
CHECK(params->concurrency_range.start == 1);
CHECK(params->concurrency_range.end == 1);
CHECK(params->concurrency_range.step == 1);
CHECK(params->latency_threshold_ms == NO_LIMIT);
CHECK(params->stability_threshold == doctest::Approx(0.1));
CHECK(params->max_trials == 10);
CHECK(params->zero_input == false);
CHECK(params->string_length == 128);
CHECK_STRING("string_data", params->string_data, "");
CHECK(params->async == false);
CHECK(params->forced_sync == false);
CHECK(params->using_request_rate_range == false);
CHECK(params->request_rate_range[0] == doctest::Approx(1.0));
CHECK(params->request_rate_range[1] == doctest::Approx(1.0));
CHECK(params->request_rate_range[2] == doctest::Approx(1.0));
CHECK(params->num_of_sequences == 4);
CHECK(params->search_mode == SearchMode::LINEAR);
CHECK(params->request_distribution == Distribution::CONSTANT);
CHECK(params->using_custom_intervals == false);
CHECK_STRING("request_intervals_file", params->request_intervals_file, "");
CHECK(params->shared_memory_type == NO_SHARED_MEMORY);
CHECK(params->output_shm_size == 102400);
CHECK(params->kind == clientbackend::BackendKind::TRITON);
CHECK_STRING(
"model_signature_name", params->model_signature_name, "serving_default");
CHECK(params->using_grpc_compression == false);
CHECK(
params->compression_algorithm ==
clientbackend::GrpcCompressionAlgorithm::COMPRESS_NONE);
CHECK(params->measurement_mode == MeasurementMode::TIME_WINDOWS);
CHECK(params->measurement_request_count == 50);
CHECK_STRING(
"triton_server_path", params->triton_server_path, "/opt/tritonserver");
CHECK_STRING("model_repository_path", params->model_repository_path, "");
CHECK(params->start_sequence_id == 1);
CHECK(params->sequence_id_range == UINT32_MAX);
CHECK_STRING(
"ssl_grpc_certificate_chain_file",
params->ssl_options.ssl_grpc_certificate_chain_file, "");
CHECK_STRING(
"ssl_grpc_private_key_file",
params->ssl_options.ssl_grpc_private_key_file, "");
CHECK_STRING(
"ssl_grpc_root_certifications_file",
params->ssl_options.ssl_grpc_root_certifications_file, "");
CHECK(params->ssl_options.ssl_grpc_use_ssl == false);
CHECK_STRING(
"ssl_https_ca_certificates_file",
params->ssl_options.ssl_https_ca_certificates_file, "");
CHECK_STRING(
"ssl_https_client_certificate_file",
params->ssl_options.ssl_https_client_certificate_file, "");
CHECK_STRING(
"ssl_https_client_certificate_type",
params->ssl_options.ssl_https_client_certificate_type, "");
CHECK_STRING(
"ssl_https_private_key_file",
params->ssl_options.ssl_https_private_key_file, "");
CHECK_STRING(
"ssl_https_private_key_type",
params->ssl_options.ssl_https_private_key_type, "");
CHECK(params->ssl_options.ssl_https_verify_host == 2);
CHECK(params->ssl_options.ssl_https_verify_peer == 1);
CHECK(params->verbose_csv == false);
CHECK(params->enable_mpi == false);
CHECK(params->trace_options.size() == 0);
CHECK(params->using_old_options == false);
CHECK(params->dynamic_concurrency_mode == false);
CHECK(params->url_specified == false);
CHECK_STRING("url", params->url, "localhost:8000");
CHECK_STRING("model_name", params->model_name, "");
CHECK_STRING("model_version", params->model_version, "");
CHECK(params->batch_size == 1);
CHECK(params->using_batch_size == false);
CHECK(params->concurrent_request_count == 1);
CHECK(params->protocol == clientbackend::ProtocolType::HTTP);
CHECK(params->http_headers->size() == 0);
CHECK(params->max_concurrency == 0);
CHECK_STRING("filename", params->filename, "");
CHECK(params->mpi_driver == nullptr);
CHECK_STRING("memory_type", params->memory_type, "system");
}
// Test CLParser Class that captures the usage string but suppresses the output
//
class TestCLParser : public CLParser {
public:
std::string GetUsageMessage() const { return usage_message_; }
bool UsageCalled() const { return usage_called_; }
private:
std::string usage_message_;
bool usage_called_ = false;
virtual void Usage(const std::string& msg = std::string())
{
usage_called_ = true;
usage_message_ = msg;
}
};
TEST_CASE("Testing Command Line Parser")
{
char* model_name = "my_model";
char* app_name = "test_perf_analyzer";
std::string expected_msg;
opterr = 1; // Enable error output for GetOpt library
bool check_params = true;
TestCLParser parser; // Command Line parser under test
PAParamsPtr act; // Actual options parsed from parser
PAParamsPtr exp{new PerfAnalyzerParameters()}; // Expected results
// Most common defaults
exp->model_name = model_name; // model_name;
exp->max_threads = 16;
SUBCASE("with no parameters")
{
int argc = 1;
char* argv[argc] = {app_name};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(parser.UsageCalled());
expected_msg =
CreateUsageMessage("-m (model name)", "The value must be specified.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->model_name = "";
}
SUBCASE("with min parameters")
{
int argc = 3;
char* argv[argc] = {app_name, "-m", model_name};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(!parser.UsageCalled());
}
SUBCASE("Option : --streaming")
{
SUBCASE("streaming option - without model")
{
int argc = 2;
char* argv[argc] = {app_name, "--streaming"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(parser.UsageCalled());
CHECK_STRING(
"Usage Message", parser.GetUsageMessage(),
"Streaming is only allowed with gRPC protocol.");
exp->model_name = "";
exp->streaming = true;
}
SUBCASE("with model")
{
int argc = 4;
char* argv[argc] = {app_name, "-m", model_name, "--streaming"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(parser.UsageCalled());
// NOTE: This is not an informative error message, how do I specify a gRPC
// protocol? Error output should list missing params.
//
CHECK_STRING(
"Usage Message", parser.GetUsageMessage(),
"Streaming is only allowed with gRPC protocol.");
exp->streaming = true;
}
SUBCASE("with model last")
{
int argc = 4;
char* argv[argc] = {app_name, "--streaming", "-m", model_name};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(parser.UsageCalled());
CHECK_STRING(
"Usage Message", parser.GetUsageMessage(),
"Streaming is only allowed with gRPC protocol.");
exp->streaming = true;
}
}
SUBCASE("Option : --max-threads")
{
SUBCASE("set to 1")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "--max-threads", "1"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(!parser.UsageCalled());
exp->max_threads = 1;
exp->max_threads_specified = true;
}
SUBCASE("set to max")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "--max-threads", "65535"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(!parser.UsageCalled());
exp->max_threads = 65535;
exp->max_threads_specified = true;
}
SUBCASE("missing value")
{
int argc = 4;
char* argv[argc] = {app_name, "-m", model_name, "--max-threads"};
opterr = 0; // Disable error output for GetOpt library for this case
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(parser.UsageCalled());
// NOTE: Empty message is not helpful
//
CHECK_STRING("Usage Message", parser.GetUsageMessage(), "");
// BUG: Dumping string "option '--max-threads' requires an argument"
// directly to std::out, instead of through usage()
//
}
SUBCASE("bad value")
{
int argc = 4;
char* argv[argc] = {app_name, "-m", model_name, "--max-threads", "bad"};
opterr = 0; // Disable error output for GetOpt library for this case
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(parser.UsageCalled());
// NOTE: Empty message is not helpful
//
CHECK_STRING("Usage Message", parser.GetUsageMessage(), "");
// BUG: Dumping string "option '--max-threads' requires an argument"
// directly to std::out, instead of through usage()
//
}
}
SUBCASE("Option : --sequence-length")
{
SUBCASE("set to 2000")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--sequence-length", "2000"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->sequence_length = 2000;
}
SUBCASE("set to 0")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "--sequence-length", "0"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->sequence_length = 20;
}
}
SUBCASE("Option : --sequence-length-variation")
{
SUBCASE("non-negative")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--sequence-length-variation", "33.3"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->sequence_length_variation = 33.3;
}
SUBCASE("negative")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--sequence-length-variation", "-10"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
REQUIRE(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--sequence-length-variation", "The value must be >= 0.0.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->sequence_length_variation = -10.0;
}
}
SUBCASE("Option : --percentile")
{
SUBCASE("set to 25")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "--percentile", "25"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->percentile = 25;
}
SUBCASE("set to 225 - overflow check")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "--percentile", "225"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--percentile",
"The value must be -1 for not reporting or in range (0, 100).");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->percentile = 225;
}
SUBCASE("set to -1 - use average latency")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "--percentile", "-1"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->percentile = -1;
}
}
SUBCASE("Option : --data-directory")
{
SUBCASE("set to `/usr/data`")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--data-directory", "/usr/data"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->user_data.push_back("/usr/data");
}
SUBCASE("call twice")
{
// QUESTION: Is this the expected behavior? There is not enough details in
// in the output. It is marked as deprecated, what does that mean? Is it
// used?
//
int argc = 7;
char* argv[argc] = {app_name, "-m", model_name,
"--data-directory", "/usr/data", "--data-directory",
"/another/dir"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->user_data.push_back("/usr/data");
exp->user_data.push_back("/another/dir");
}
}
SUBCASE("Option : --sequence-id-range")
{
SUBCASE("One arg")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--sequence-id-range", "53"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->start_sequence_id = 53;
exp->sequence_id_range = UINT32_MAX;
}
SUBCASE("Two args")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--sequence-id-range", "53:67"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->start_sequence_id = 53;
exp->sequence_id_range = 14;
}
SUBCASE("Three args")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--sequence-id-range", "53:67:92"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--sequence-id-range", "The value does not match <start:end>.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
// It will get the final 2 values
//
exp->start_sequence_id = 67;
exp->sequence_id_range = 25;
}
SUBCASE("Not a number")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--sequence-id-range", "BAD"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--sequence-id-range", "Invalid value provided: BAD");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
check_params = false; // Usage message called
}
SUBCASE("Not a number 2")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--sequence-id-range", "53:BAD"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--sequence-id-range", "Invalid value provided: 53:BAD");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
check_params = false; // Usage message called
}
}
SUBCASE("Option : --input-tensor-format")
{
SUBCASE("binary")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--input-tensor-format", "binary"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->input_tensor_format = cb::TensorFormat::BINARY;
}
SUBCASE("json")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--input-tensor-format", "json"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->input_tensor_format = cb::TensorFormat::JSON;
}
SUBCASE("invalid")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--input-tensor-format", "invalid"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
exp->input_tensor_format = cb::TensorFormat::UNKNOWN;
}
}
SUBCASE("Option : --shape")
{
SUBCASE("expected input, single shape")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--shape", "input_name:1,2,3"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->input_shapes.emplace(
std::string("input_name"), std::vector<int64_t>{1, 2, 3});
}
SUBCASE("expected input, multiple shapes")
{
int argc = 9;
char* argv[argc] = {
app_name,
"-m",
model_name,
"--shape",
"input_name:1,2,3",
"--shape",
"alpha:10,24",
"--shape",
"beta:10,200,34,15,9000"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->input_shapes.emplace(
std::string("input_name"), std::vector<int64_t>{1, 2, 3});
exp->input_shapes.emplace(
std::string("alpha"), std::vector<int64_t>{10, 24});
exp->input_shapes.emplace(
std::string("beta"), std::vector<int64_t>{10, 200, 34, 15, 9000});
}
SUBCASE("using negative dims")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--shape", "input_name:-1,2,3"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--shape", "The dimensions of input tensor must be > 0.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->input_shapes.emplace(
std::string("input_name"), std::vector<int64_t>{-1, 2, 3});
}
SUBCASE("equals sign, not colon")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--shape", "input_name=-1,2,3"};
// BUG this should call usages with the message
// "failed to parse input shape. There must be a colon after input name
//
CHECK_THROWS_WITH(
act = parser.Parse(argc, argv),
"basic_string::substr: __pos (which is 18) > this->size() (which is "
"17)");
check_params = false;
}
SUBCASE("missing shape")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "--shape", "input_name"};
// BUG this should call usages with the message
// "failed to parse input shape. There must be a colon after input name
//
CHECK_THROWS_WITH(
act = parser.Parse(argc, argv),
"basic_string::substr: __pos (which is 11) > this->size() (which is "
"10)");
check_params = false;
}
SUBCASE("missing colon")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--shape", "input_name1,2,3"};
// BUG this should call usages with the message
// "failed to parse input shape. There must be a colon after input name
//
CHECK_THROWS_WITH(
act = parser.Parse(argc, argv),
"basic_string::substr: __pos (which is 16) > this->size() (which is "
"15)");
check_params = false;
}
SUBCASE("bad shapes - a,b,c")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--shape", "input_name:a,b,c"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--shape", "Invalid value provided: input_name:a,b,c");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
check_params = false; // Usage message called
}
SUBCASE("bad shapes - [1,2,3]")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--shape", "input_name:[1,2,3]"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--shape", "Invalid value provided: input_name:[1,2,3]");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
check_params = false; // Usage message called
}
}
SUBCASE("Option : --measurement-interval")
{
SUBCASE("set to 500")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "", "500"};
SUBCASE("Long form")
{
argv[3] = "--measurement-interval";
}
SUBCASE("Short form")
{
argv[3] = "-p";
}
CAPTURE(argv[3]);
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->measurement_window_ms = 500;
}
SUBCASE("set to -200")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "", "-200"};
SUBCASE("Long form")
{
argv[3] = "--measurement-interval";
}
SUBCASE("Short form")
{
argv[3] = "-p";
}
CAPTURE(argv[3]);
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--measurement-interval (-p)", "The value must be > 0 msec.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
}
SUBCASE("set to non-numeric value")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "", "foobar"};
SUBCASE("Long form")
{
argv[3] = "--measurement-interval";
expected_msg = CreateUsageMessage(
"--measurement-interval", "Invalid value provided: foobar");
}
SUBCASE("Short form")
{
argv[3] = "-p";
expected_msg =
CreateUsageMessage("-p", "Invalid value provided: foobar");
}
CAPTURE(argv[3]);
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
check_params = false; // Usage message called
}
}
SUBCASE("Option : --concurrency-range")
{
SUBCASE("expected use")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range", "100:400:10"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->using_concurrency_range = true;
exp->concurrency_range.start = 100;
exp->concurrency_range.end = 400;
exp->concurrency_range.step = 10;
}
SUBCASE("only two options")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range", "100:400"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->using_concurrency_range = true;
exp->concurrency_range.start = 100;
exp->concurrency_range.end = 400;
}
SUBCASE("only one options")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range", "100"};
// QUESTION: What does this mean? Why pass only one?
//
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->using_concurrency_range = true;
exp->concurrency_range.start = 100;
}
SUBCASE("no options")
{
int argc = 4;
char* argv[argc] = {app_name, "-m", model_name, "--concurrency-range"};
opterr = 0; // Disable error output for GetOpt library for this case
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
// BUG: Usage message does not contain error. Error statement
// "option '--concurrency-range' requires an argument" written directly
// to std::out
//
CHECK_STRING("Usage Message", parser.GetUsageMessage(), "");
}
SUBCASE("too many options")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range", "200:100:25:10"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--concurrency-range", "The value does not match <start:end:step>.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->using_concurrency_range = true;
exp->concurrency_range.start = 200;
exp->concurrency_range.end = 100;
exp->concurrency_range.step = 25;
}
SUBCASE("way too many options")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range",
"200:100:25:10:20:30"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--concurrency-range", "The value does not match <start:end:step>.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->using_concurrency_range = true;
exp->concurrency_range.start = 200;
exp->concurrency_range.end = 100;
exp->concurrency_range.step = 25;
}
SUBCASE("wrong separator")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range", "100,400,10"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
// BUG: Should detect this and through an error. User will enter this and
// have no clue why the end and step sizes are not used correctly.
//
exp->using_concurrency_range = true;
exp->concurrency_range.start = 100;
}
SUBCASE("bad start value")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range", "bad:400:10"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--concurrency-range", "Invalid value provided: bad:400:10");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->using_concurrency_range = true;
}
SUBCASE("bad end value")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range", "100:bad:10"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--concurrency-range", "Invalid value provided: 100:bad:10");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->using_concurrency_range = true;
exp->concurrency_range.start = 100;
}
SUBCASE("bad step value")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--concurrency-range", "100:400:bad"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--concurrency-range", "Invalid value provided: 100:400:bad");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->using_concurrency_range = true;
exp->concurrency_range.start = 100;
exp->concurrency_range.end = 400;
}
SUBCASE("invalid condition - end and latency threshold are 0")
{
int argc = 7;
char* argv[argc] = {app_name, "-m",
model_name, "--concurrency-range",
"100:0:25", "--latency-threshold",
"0"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
CHECK_STRING(
"Usage Message", parser.GetUsageMessage(),
"The end of the search range and the latency limit can not be both 0 "
"(or 0.0) simultaneously");
exp->using_concurrency_range = true;
exp->concurrency_range.start = 100;
exp->concurrency_range.end = 0;
exp->concurrency_range.step = 25;
exp->latency_threshold_ms = 0;
}
}
SUBCASE("Option : --latency-threshold")
{
expected_msg = CreateUsageMessage(
"--latency-threshold (-l)", "The value must be >= 0 msecs.");
CHECK_INT_OPTION(
"--latency-threshold", exp->latency_threshold_ms, expected_msg);
SUBCASE("set to 0")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--latency-threshold", "0"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
}
}
SUBCASE("Option : --stability-percentage")
{
SUBCASE("valid value")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--stability-percentage", "80"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->stability_threshold = .8f;
}
SUBCASE("set to 0")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--stability-percentage", "0"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
}
SUBCASE("negative value")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--stability-percentage", "-20"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--stability-percentage (-s)", "The value must be >= 0.0.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
}
SUBCASE("floating point value")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--stability-percentage", "29.5"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(!parser.UsageCalled());
exp->stability_threshold = .295f;
}
SUBCASE("missing value")
{
int argc = 4;
char* argv[argc] = {app_name, "-m", model_name, "--stability-percentage"};
opterr = 0;
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
CHECK_STRING("Usage Message", parser.GetUsageMessage(), "");
}
}
SUBCASE("Option : --max-trials")
{
expected_msg =
CreateUsageMessage("--max-trials (-r)", "The value must be > 0.");
CHECK_INT_OPTION("--max-trials", exp->max_trials, expected_msg);
SUBCASE("set to 0")
{
int argc = 5;
char* argv[argc] = {app_name, "-m", model_name, "--max-trials", "0"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
}
}
SUBCASE("Option : --collect-metrics")
{
SUBCASE("with --service-kind != triton")
{
int argc = 8;
char* argv[argc] = {
app_name, "-m", model_name, "--collect-metrics",
"--service-kind", "tfserving", "-i", "grpc"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
CHECK_STRING(
"Usage Message", parser.GetUsageMessage(),
"Server-side metric collection is only supported with Triton client "
"backend.");
exp->kind = cb::BackendKind::TENSORFLOW_SERVING;
exp->url = "localhost:8500";
exp->batch_size = 0;
exp->protocol = cb::ProtocolType::GRPC;
}
}
SUBCASE("Option : --metrics-url")
{
// missing --collect-metrics
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--metrics-url", "localhost:8002/metrics"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
CHECK_STRING(
"Usage Message", parser.GetUsageMessage(),
"Must specify --collect-metrics when using the --metrics-url option.");
}
SUBCASE("Option : --metrics-interval")
{
SUBCASE("missing --collect-metrics")
{
int argc = 5;
char* argv[argc] = {
app_name, "-m", model_name, "--metrics-interval", "1000"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
CHECK_STRING(
"Usage Message", parser.GetUsageMessage(),
"Must specify --collect-metrics when using the --metrics-interval "
"option.");
}
SUBCASE("metrics interval 0")
{
int argc = 6;
char* argv[argc] = {
app_name, "-m", model_name, "--collect-metrics", "--metrics-interval",
"0"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(parser.UsageCalled());
expected_msg = CreateUsageMessage(
"--metrics-interval", "The value must be > 0 msecs.");
CHECK_STRING("Usage Message", parser.GetUsageMessage(), expected_msg);
exp->metrics_interval_ms = 0;
}
}
SUBCASE("Option : --bls-composing-models")
{
int argc = 5;
SUBCASE("one model")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models", "a"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
CHECK(act->bls_composing_models.size() == 1);
CHECK_STRING(act->bls_composing_models[0].first, "a");
CHECK_STRING(act->bls_composing_models[0].second, "");
}
SUBCASE("lists with no version")
{
SUBCASE("a,b,c")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models", "a,b,c"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
SUBCASE("a, b, c")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models", "a, b, c"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
SUBCASE("a,b, c")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models", "a,b, c"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
SUBCASE("a, b,c")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models", "a, b,c"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
SUBCASE("a, b, c")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models", "a, b, c"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
CHECK(!parser.UsageCalled());
REQUIRE(act->bls_composing_models.size() == 3);
CHECK_STRING(act->bls_composing_models[0].first, "a");
CHECK_STRING(act->bls_composing_models[1].first, "b");
CHECK_STRING(act->bls_composing_models[2].first, "c");
CHECK_STRING(act->bls_composing_models[0].second, "");
CHECK_STRING(act->bls_composing_models[1].second, "");
CHECK_STRING(act->bls_composing_models[2].second, "");
}
SUBCASE("list with version")
{
SUBCASE("a:1,b:2,c:1")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models",
"a:1,b:2,c:1"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
SUBCASE("a:1, b:2, c:1")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models",
"a:1, b:2, c:1"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
SUBCASE("a:1, b:2, c:1")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models",
"a:1, b:2, c:1"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
SUBCASE("a:1 , b:2, c:1")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models",
"a:1 , b:2, c:1"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
CHECK(!parser.UsageCalled());
REQUIRE(act->bls_composing_models.size() == 3);
CHECK_STRING(act->bls_composing_models[0].first, "a");
CHECK_STRING(act->bls_composing_models[1].first, "b");
CHECK_STRING(act->bls_composing_models[2].first, "c");
CHECK_STRING(act->bls_composing_models[0].second, "1");
CHECK_STRING(act->bls_composing_models[1].second, "2");
CHECK_STRING(act->bls_composing_models[2].second, "1");
}
SUBCASE("list with some versions")
{
SUBCASE("a,b:3,c")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models", "a,b:3,c"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
CHECK(!parser.UsageCalled());
REQUIRE(act->bls_composing_models.size() == 3);
CHECK_STRING(act->bls_composing_models[0].first, "a");
CHECK_STRING(act->bls_composing_models[1].first, "b");
CHECK_STRING(act->bls_composing_models[2].first, "c");
CHECK_STRING(act->bls_composing_models[0].second, "");
CHECK_STRING(act->bls_composing_models[1].second, "3");
CHECK_STRING(act->bls_composing_models[2].second, "");
}
SUBCASE("multiple versions of the same model")
{
SUBCASE("a:1,b:2,a:2")
{
char* argv[argc] = {
app_name, "-m", model_name, "--bls-composing-models", "a:1,b,a:2"};
REQUIRE_NOTHROW(act = parser.Parse(argc, argv));
}
CHECK(!parser.UsageCalled());
REQUIRE(act->bls_composing_models.size() == 3);
CHECK_STRING(act->bls_composing_models[0].first, "a");
CHECK_STRING(act->bls_composing_models[1].first, "b");
CHECK_STRING(act->bls_composing_models[2].first, "a");
CHECK_STRING(act->bls_composing_models[0].second, "1");
CHECK_STRING(act->bls_composing_models[1].second, "");
CHECK_STRING(act->bls_composing_models[2].second, "2");
}
}
if (check_params) {
CHECK_PARAMS(act, exp);
}
optind = 1; // Reset GotOpt index, needed to parse the next command line
}
}} // namespace triton::perfanalyzer
// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <future>
#include <memory>
#include "command_line_parser.h"
#include "concurrency_manager.h"
#include "doctest.h"
#include "mock_client_backend.h"
#include "mock_concurrency_worker.h"
#include "mock_data_loader.h"
#include "mock_infer_data_manager.h"
#include "mock_model_parser.h"
#include "mock_sequence_manager.h"
#include "sequence_manager.h"
#include "test_load_manager_base.h"
#include "test_utils.h"
namespace triton { namespace perfanalyzer {
class TestConcurrencyManager : public TestLoadManagerBase,
public ConcurrencyManager {
public:
TestConcurrencyManager(
PerfAnalyzerParameters params, bool is_sequence_model = false,
bool is_decoupled_model = false, bool use_mock_infer = false)
: use_mock_infer_(use_mock_infer),
TestLoadManagerBase(params, is_sequence_model, is_decoupled_model),
ConcurrencyManager(
params.async, params.streaming, params.batch_size,
params.max_threads, params.max_concurrency,
params.shared_memory_type, params.output_shm_size, GetParser(),
GetFactory())
{
}
std::shared_ptr<IWorker> MakeWorker(
std::shared_ptr<ThreadStat> thread_stat,
std::shared_ptr<ConcurrencyWorker::ThreadConfig> thread_config) override
{
size_t id = workers_.size();
auto worker = std::make_shared<MockConcurrencyWorker>(
id, thread_stat, thread_config, parser_, data_loader_, factory_,
on_sequence_model_, async_, max_concurrency_, using_json_data_,
streaming_, batch_size_, wake_signal_, wake_mutex_, active_threads_,
execute_, infer_data_manager_, sequence_manager_);
if (use_mock_infer_) {
EXPECT_CALL(*worker, Infer())
.WillRepeatedly(testing::Invoke(
worker.get(), &MockConcurrencyWorker::EmptyInfer));
}
return worker;
}
void TestReconfigThreads(
const size_t concurrent_request_count,
std::vector<ConcurrencyWorker::ThreadConfig>& expected_configs)
{
ConcurrencyManager::ReconfigThreads(concurrent_request_count);
auto expected_size = expected_configs.size();
// Check that the correct number of threads are created
//
CHECK(threads_.size() == expected_size);
// Check that threads_config has correct concurrency and seq stat index
// offset
for (auto i = 0; i < expected_configs.size(); i++) {
CHECK(
threads_config_[i]->concurrency_ == expected_configs[i].concurrency_);
CHECK(
threads_config_[i]->seq_stat_index_offset_ ==
expected_configs[i].seq_stat_index_offset_);
}
}
void StopWorkerThreads() { LoadManager::StopWorkerThreads(); }
/// Test that the correct Infer function is called in the backend
///
void TestInferType()
{
// FIXME TMA-982: This delay is to avoid deadlock. Investigate why delay is
// needed.
stats_->SetDelays({50});
ChangeConcurrencyLevel(params_.max_concurrency);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
CheckInferType();
}
/// Test that the correct concurrency is maintained in the load manager
///
void TestConcurrency(
size_t response_delay, std::chrono::milliseconds sleep_time)
{
stats_->SetDelays({response_delay});
ChangeConcurrencyLevel(params_.max_concurrency);
std::this_thread::sleep_for(sleep_time);
CheckConcurrency();
}
/// Test sequence handling
///
void TestSequences()
{
size_t delay_ms = 10;
stats_->SetDelays({delay_ms});
auto stats = cb::InferStat();
double concurrency1 = params_.max_concurrency / 2;
double concurrency2 = params_.max_concurrency;
int sleep_ms = 500;
auto sleep_time = std::chrono::milliseconds(sleep_ms);
size_t expected_count1 = sleep_ms * concurrency1 / delay_ms;
size_t expected_count2 =
sleep_ms * concurrency2 / delay_ms + expected_count1;
// Run and check request rate 1
//
ChangeConcurrencyLevel(concurrency1);
std::this_thread::sleep_for(sleep_time);
stats = cb::InferStat();
GetAccumulatedClientStat(&stats);
CHECK(
stats.completed_request_count ==
doctest::Approx(expected_count1).epsilon(0.10));
PauseSequenceWorkers();
CheckSequences(concurrency1);
// Make sure that the client and the manager are in agreement on the request
// count in between rates
//
stats = cb::InferStat();
GetAccumulatedClientStat(&stats);
int client_total_requests = stats_->num_async_infer_calls +
stats_->num_async_stream_infer_calls +
stats_->num_infer_calls;
CHECK(stats.completed_request_count == client_total_requests);
ResetStats();
// Run and check request rate 2
//
ChangeConcurrencyLevel(concurrency2);
std::this_thread::sleep_for(sleep_time);
stats = cb::InferStat();
GetAccumulatedClientStat(&stats);
CHECK(
stats.completed_request_count ==
doctest::Approx(expected_count2).epsilon(0.10));
// Stop all threads and make sure everything is as expected
//
StopWorkerThreads();
CheckSequences(concurrency2);
}
/// Test that tries to find deadlocks and livelocks
///
void TestTimeouts()
{
TestWatchDog watchdog(1000);
ChangeConcurrencyLevel(params_.max_concurrency);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
StopWorkerThreads();
watchdog.stop();
}
/// Test that idle time is tracked correctly
void TestOverhead()
{
stats_->SetDelays({1});
ChangeConcurrencyLevel(params_.max_concurrency);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// During a run of 100 ms (100,000,000 ns), make sure that the idle time is
// at least 95% of that
//
auto idle_time_ns = GetIdleTime();
CHECK(idle_time_ns > 95000000);
StopWorkerThreads();
}
std::shared_ptr<ModelParser>& parser_{LoadManager::parser_};
std::shared_ptr<DataLoader>& data_loader_{LoadManager::data_loader_};
std::shared_ptr<SequenceManager>& sequence_manager_{
LoadManager::sequence_manager_};
bool& using_json_data_{LoadManager::using_json_data_};
bool& execute_{ConcurrencyManager::execute_};
size_t& batch_size_{LoadManager::batch_size_};
size_t& max_threads_{LoadManager::max_threads_};
std::shared_ptr<cb::ClientBackendFactory> factory_{
TestLoadManagerBase::factory_};
std::shared_ptr<IInferDataManager>& infer_data_manager_{
LoadManager::infer_data_manager_};
private:
bool use_mock_infer_{false};
void CheckConcurrency()
{
if (params_.max_concurrency < 4) {
CHECK(stats_->num_active_infer_calls == params_.max_concurrency);
} else {
CHECK(
stats_->num_active_infer_calls ==
doctest::Approx(params_.max_concurrency).epsilon(0.25));
}
}
std::shared_ptr<SequenceManager> MakeSequenceManager(
const uint64_t start_sequence_id, const uint64_t sequence_id_range,
const size_t sequence_length, const bool sequence_length_specified,
const double sequence_length_variation, const bool using_json_data,
std::shared_ptr<DataLoader> data_loader) override
{
return std::make_shared<MockSequenceManager>(
start_sequence_id, sequence_id_range, sequence_length,
sequence_length_specified, sequence_length_variation, using_json_data,
data_loader);
}
};
/// Test that the correct Infer function is called in the backend
///
TEST_CASE("concurrency_infer_type")
{
PerfAnalyzerParameters params{};
params.max_concurrency = 1;
SUBCASE("async_streaming")
{
params.async = true;
params.streaming = true;
}
SUBCASE("async_no_streaming")
{
params.async = true;
params.streaming = false;
}
SUBCASE("no_async_streaming")
{
params.async = false;
params.streaming = true;
}
SUBCASE("no_async_no_streaming")
{
params.async = false;
params.streaming = false;
}
TestConcurrencyManager tcm(params);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
tcm.TestInferType();
}
/// Test that the correct concurrency is maintained in the load manager
///
TEST_CASE("concurrency_concurrency")
{
PerfAnalyzerParameters params{};
size_t response_delay{50};
std::chrono::milliseconds sleep_time{225};
SUBCASE("sync, no-streaming, 1 concurrency, 1 thread")
{
params.forced_sync = true;
params.async = false;
params.streaming = false;
params.max_concurrency = 1;
params.max_threads = 1;
}
SUBCASE("sync, no-streaming, 4 concurrency, 4 threads")
{
params.forced_sync = true;
params.async = false;
params.streaming = false;
params.max_concurrency = 4;
params.max_threads = 4;
}
SUBCASE("async, no-streaming, 1 concurrency, 1 thread")
{
params.forced_sync = false;
params.async = true;
params.streaming = false;
params.max_concurrency = 1;
params.max_threads = 1;
}
SUBCASE("async, no-streaming, 4 concurrency, 1 thread")
{
params.forced_sync = false;
params.async = true;
params.streaming = false;
params.max_concurrency = 4;
params.max_threads = 1;
}
SUBCASE("async, no-streaming, 4 concurrency, 2 threads")
{
params.forced_sync = false;
params.async = true;
params.streaming = false;
params.max_concurrency = 4;
params.max_threads = 2;
}
SUBCASE("async, no-streaming, 4 concurrency, 4 threads")
{
params.forced_sync = false;
params.async = true;
params.streaming = false;
params.max_concurrency = 4;
params.max_threads = 4;
}
SUBCASE("async, streaming, 1 concurrency, 1 thread")
{
params.forced_sync = false;
params.async = true;
params.streaming = true;
params.max_concurrency = 1;
params.max_threads = 1;
}
SUBCASE("async, streaming, 4 concurrency, 1 thread")
{
params.forced_sync = false;
params.async = true;
params.streaming = true;
params.max_concurrency = 4;
params.max_threads = 1;
}
SUBCASE("async, streaming, 4 concurrency, 2 threads")
{
params.forced_sync = false;
params.async = true;
params.streaming = true;
params.max_concurrency = 4;
params.max_threads = 2;
}
SUBCASE("async, streaming, 4 concurrency, 4 threads")
{
params.forced_sync = false;
params.async = true;
params.streaming = true;
params.max_concurrency = 4;
params.max_threads = 4;
}
TestConcurrencyManager tcm(params);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
tcm.TestConcurrency(response_delay, sleep_time);
}
/// Check that the inference requests for sequences follow all rules and
/// parameters
///
TEST_CASE("concurrency_sequence")
{
PerfAnalyzerParameters params = TestLoadManagerBase::GetSequenceTestParams();
const bool is_sequence_model{true};
TestConcurrencyManager tcm(params, is_sequence_model);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
tcm.TestSequences();
}
/// Create the case where the sequences do NOT go round robin due to
/// the first request taking longer than the rest.
///
/// This exposed a bug where we were constantly resetting ctx IDs
/// and issuing over and over again to the first sequence even though
/// it was the only sequence that should NOT be issued because it was
/// still outstanding
///
TEST_CASE("concurrency_free_ctx_ids")
{
PerfAnalyzerParameters params{};
params.async = true;
params.streaming = true;
params.max_concurrency = 6;
bool is_sequence_model{true};
TestConcurrencyManager tcm(params, is_sequence_model);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
// Have the first request (sequence ID 1) take very long, and all the other
// requests are fast
//
tcm.stats_->SetDelays({50, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5});
std::shared_ptr<ThreadStat> thread_stat{std::make_shared<ThreadStat>()};
std::shared_ptr<ConcurrencyWorker::ThreadConfig> thread_config{
std::make_shared<ConcurrencyWorker::ThreadConfig>(0)};
thread_config->concurrency_ = 4;
std::shared_ptr<IWorker> worker{tcm.MakeWorker(thread_stat, thread_config)};
std::future<void> infer_future{std::async(&IWorker::Infer, worker)};
std::this_thread::sleep_for(std::chrono::milliseconds(15));
early_exit = true;
infer_future.get();
// The first sequence should only be called two times, once at the very start,
// and once during shutdown
//
CHECK(tcm.stats_->sequence_status.seq_ids_to_count.at(1) == 2);
}
TEST_CASE("Concurrency - shared memory infer input calls")
{
PerfAnalyzerParameters params{};
params.max_concurrency = 4;
bool is_sequence_model{false};
const auto& ParameterizeAsyncAndStreaming{[&]() {
SUBCASE("sync non-streaming")
{
params.async = false;
params.streaming = false;
}
SUBCASE("async non-streaming")
{
params.async = true;
params.streaming = false;
}
SUBCASE("async streaming")
{
params.async = true;
params.streaming = true;
}
}};
const auto& ParameterizeSequence{[&]() {
SUBCASE("non-sequence")
{
is_sequence_model = false;
ParameterizeAsyncAndStreaming();
}
SUBCASE("sequence")
{
is_sequence_model = true;
params.num_of_sequences = 1;
ParameterizeAsyncAndStreaming();
}
}};
const auto& ParameterizeMemory{[&]() {
SUBCASE("No shared memory")
{
params.shared_memory_type = NO_SHARED_MEMORY;
ParameterizeSequence();
}
SUBCASE("system shared memory")
{
params.shared_memory_type = SYSTEM_SHARED_MEMORY;
ParameterizeSequence();
}
SUBCASE("cuda shared memory")
{
params.shared_memory_type = CUDA_SHARED_MEMORY;
ParameterizeSequence();
}
}};
ParameterizeMemory();
const std::string json_str{R"(
{
"data": [
{
"INPUT0": [2000000000]
},
{
"INPUT0": [2000000001]
}
]
}
)"};
MockInputPipeline mip =
TestLoadManagerBase::ProcessCustomJsonData(json_str, is_sequence_model);
TestConcurrencyManager tcm(params, is_sequence_model);
tcm.infer_data_manager_ =
MockInferDataManagerFactory::CreateMockInferDataManager(
params.max_threads, params.batch_size, params.shared_memory_type,
params.output_shm_size, mip.mock_model_parser_, tcm.factory_,
mip.mock_data_loader_);
std::shared_ptr<ThreadStat> thread_stat{std::make_shared<ThreadStat>()};
std::shared_ptr<ConcurrencyWorker::ThreadConfig> thread_config{
std::make_shared<ConcurrencyWorker::ThreadConfig>(0)};
thread_config->concurrency_ = 1;
tcm.parser_ = mip.mock_model_parser_;
tcm.data_loader_ = mip.mock_data_loader_;
tcm.using_json_data_ = true;
tcm.execute_ = true;
tcm.batch_size_ = 1;
tcm.max_threads_ = 1;
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
std::shared_ptr<IWorker> worker{tcm.MakeWorker(thread_stat, thread_config)};
std::future<void> infer_future{std::async(&IWorker::Infer, worker)};
std::this_thread::sleep_for(std::chrono::milliseconds(18));
early_exit = true;
infer_future.get();
const auto& actual_append_raw_calls{tcm.stats_->num_append_raw_calls};
const auto& actual_set_shared_memory_calls{
tcm.stats_->num_set_shared_memory_calls};
if (params.shared_memory_type == NO_SHARED_MEMORY) {
CHECK(actual_append_raw_calls > 0);
CHECK(actual_set_shared_memory_calls == 0);
} else {
CHECK(actual_append_raw_calls == 0);
CHECK(actual_set_shared_memory_calls > 0);
}
}
/// Verify Shared Memory api calls
///
TEST_CASE("Concurrency - Shared memory methods")
{
PerfAnalyzerParameters params;
bool is_sequence = false;
bool is_decoupled = false;
bool use_mock_infer = true;
const std::string json_str{R"(
{
"data": [
{
"INPUT0": [2123456789]
}
]
}
)"};
MockInputPipeline mip = TestLoadManagerBase::ProcessCustomJsonData(json_str);
cb::MockClientStats::SharedMemoryStats expected_stats;
SUBCASE("System shared memory usage")
{
params.shared_memory_type = SYSTEM_SHARED_MEMORY;
TestConcurrencyManager tcm(
params, is_sequence, is_decoupled, use_mock_infer);
tcm.infer_data_manager_ =
MockInferDataManagerFactory::CreateMockInferDataManager(
params.max_threads, params.batch_size, params.shared_memory_type,
params.output_shm_size, mip.mock_model_parser_, tcm.factory_,
mip.mock_data_loader_);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
expected_stats.num_unregister_all_shared_memory_calls = 1;
expected_stats.num_register_system_shared_memory_calls = 1;
expected_stats.num_create_shared_memory_region_calls = 1;
expected_stats.num_map_shared_memory_calls = 1;
tcm.CheckSharedMemory(expected_stats);
}
SUBCASE("Cuda shared memory usage")
{
params.shared_memory_type = CUDA_SHARED_MEMORY;
TestConcurrencyManager tcm(
params, is_sequence, is_decoupled, use_mock_infer);
tcm.infer_data_manager_ =
MockInferDataManagerFactory::CreateMockInferDataManager(
params.max_threads, params.batch_size, params.shared_memory_type,
params.output_shm_size, mip.mock_model_parser_, tcm.factory_,
mip.mock_data_loader_);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
expected_stats.num_unregister_all_shared_memory_calls = 1;
expected_stats.num_register_cuda_shared_memory_calls = 1;
tcm.CheckSharedMemory(expected_stats);
}
SUBCASE("No shared memory usage")
{
params.shared_memory_type = NO_SHARED_MEMORY;
TestConcurrencyManager tcm(
params, is_sequence, is_decoupled, use_mock_infer);
tcm.infer_data_manager_ =
MockInferDataManagerFactory::CreateMockInferDataManager(
params.max_threads, params.batch_size, params.shared_memory_type,
params.output_shm_size, mip.mock_model_parser_, tcm.factory_,
mip.mock_data_loader_);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
tcm.CheckSharedMemory(expected_stats);
}
}
TEST_CASE("concurrency_deadlock")
{
PerfAnalyzerParameters params{};
params.max_concurrency = 6;
bool is_sequence_model{true};
bool some_infer_failures{false};
const auto& ParameterizeSyncStreaming{[&]() {
SUBCASE("sync")
{
params.async = false;
params.streaming = false;
}
SUBCASE("aync no streaming")
{
params.async = true;
params.streaming = false;
}
SUBCASE("async streaming")
{
params.async = true;
params.streaming = true;
}
}};
const auto& ParameterizeConcurrency{[&]() {
SUBCASE("10 concurrency, 10 thread")
{
ParameterizeSyncStreaming();
params.max_concurrency = 10;
params.max_threads = 10;
}
SUBCASE("10 concurrency, 4 thread")
{
ParameterizeSyncStreaming();
params.max_concurrency = 10;
params.max_threads = 4;
}
}};
const auto& ParameterizeSequence{[&]() {
SUBCASE("non-sequence")
{
ParameterizeConcurrency();
is_sequence_model = false;
}
SUBCASE("sequence")
{
ParameterizeConcurrency();
is_sequence_model = true;
}
}};
const auto& ParameterizeFailures{[&]() {
SUBCASE("yes_failures")
{
some_infer_failures = true;
ParameterizeSequence();
}
SUBCASE("no_failures")
{
some_infer_failures = false;
ParameterizeSequence();
}
}};
std::vector<uint64_t> delays;
const auto& ParameterizeDelays{[&]() {
SUBCASE("no_delay")
{
delays = {0};
ParameterizeFailures();
}
SUBCASE("random_delay")
{
delays = {1, 5, 20, 4, 3};
ParameterizeFailures();
}
}};
ParameterizeDelays();
TestConcurrencyManager tcm(params, is_sequence_model);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
tcm.stats_->SetDelays(delays);
// Sometimes have a request fail
if (some_infer_failures) {
tcm.stats_->SetReturnStatuses({true, true, true, false});
}
tcm.TestTimeouts();
}
TEST_CASE("concurrency_overhead")
{
PerfAnalyzerParameters params{};
SUBCASE("sync, conc 1")
{
params.async = false;
params.max_concurrency = 1;
}
SUBCASE("sync, conc 4")
{
params.async = false;
params.max_concurrency = 4;
}
SUBCASE("async, conc 1")
{
params.async = true;
params.max_concurrency = 1;
}
SUBCASE("async, conc 1")
{
params.async = true;
params.max_concurrency = 4;
}
TestConcurrencyManager tcm(params, false);
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
tcm.TestOverhead();
}
TEST_CASE(
"send_request_rate_concurrency_manager: testing logic around detecting "
"send request count")
{
PerfAnalyzerParameters params{};
SUBCASE("sync")
{
params.async = false;
}
SUBCASE("async")
{
params.async = true;
}
TestConcurrencyManager tcm(params);
tcm.stats_->SetDelays({10});
tcm.InitManager(
params.string_length, params.string_data, params.zero_input,
params.user_data, params.start_sequence_id, params.sequence_id_range,
params.sequence_length, params.sequence_length_specified,
params.sequence_length_variation);
tcm.ChangeConcurrencyLevel(4);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
tcm.StopWorkerThreads();
const size_t num_sent_requests{tcm.GetAndResetNumSentRequests()};
CHECK(num_sent_requests == doctest::Approx(40).epsilon(0.1));
}
TEST_CASE(
"reconfigure_threads" *
doctest::description(
"This test confirms the side-effects of ReconfigureThreads(). Namely, "
"that the correct number of threads are created and that they are "
"configured properly"))
{
PerfAnalyzerParameters params{};
std::vector<ConcurrencyWorker::ThreadConfig> expected_config_values;
std::vector<size_t> expected_concurrencies;
std::vector<size_t> expected_seq_stat_index_offsets;
size_t target_concurrency = 0;
SUBCASE("normal")
{
params.max_threads = 10;
target_concurrency = 5;
expected_concurrencies = {1, 1, 1, 1, 1};
expected_seq_stat_index_offsets = {0, 1, 2, 3, 4};
}
SUBCASE("thread_limited")
{
params.max_threads = 5;
target_concurrency = 10;
expected_concurrencies = {2, 2, 2, 2, 2};
expected_seq_stat_index_offsets = {0, 2, 4, 6, 8};
}
SUBCASE("unbalanced")
{
params.max_threads = 6;
target_concurrency = 14;
expected_concurrencies = {3, 3, 2, 2, 2, 2};
expected_seq_stat_index_offsets = {0, 3, 6, 8, 10, 12};
}
for (auto i = 0; i < expected_concurrencies.size(); i++) {
ConcurrencyWorker::ThreadConfig tc(i);
tc.concurrency_ = expected_concurrencies[i];
tc.seq_stat_index_offset_ = expected_seq_stat_index_offsets[i];
expected_config_values.push_back(tc);
}
TestConcurrencyManager tcm(params);
tcm.TestReconfigThreads(target_concurrency, expected_config_values);
}
}} // namespace triton::perfanalyzer
// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of NVIDIA CORPORATION nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <cmath>
#include <iostream>
#include <memory>
#include <numeric>
#include "concurrency_ctx_id_tracker.h"
#include "doctest.h"
#include "fifo_ctx_id_tracker.h"
#include "rand_ctx_id_tracker.h"
namespace triton { namespace perfanalyzer {
TEST_CASE("CtxIdTrackers: FIFO")
{
std::shared_ptr<ICtxIdTracker> tracker = std::make_shared<FifoCtxIdTracker>();
// Reset will load up context IDs 0-9 into the queue and return them in order
// on consecutive Get calls
size_t count = 10;
CHECK_FALSE(tracker->IsAvailable());
tracker->Reset(count);
CHECK(tracker->IsAvailable());
for (size_t i = 0; i < count; i++) {
CHECK(tracker->Get() == i);
}
// Manually restoring values should be returned in-order
CHECK_FALSE(tracker->IsAvailable());
tracker->Restore(7);
CHECK(tracker->IsAvailable());
tracker->Restore(13);
CHECK(tracker->Get() == 7);
CHECK(tracker->Get() == 13);
// A reset should throw away any values on the old list
tracker->Reset(10);
tracker->Reset(1);
tracker->Get();
CHECK(!tracker->IsAvailable());
// Calling Get when not available should Throw
CHECK_THROWS_AS(tracker->Get(), const std::exception&);
}
TEST_CASE("CtxIdTrackers: Conc")
{
std::shared_ptr<ICtxIdTracker> tracker =
std::make_shared<ConcurrencyCtxIdTracker>();
// Reset will load up 10 instances of context IDs 0 into the queue and return
// them in order on consecutive Get calls
size_t count = 10;
tracker->Reset(count);
for (size_t i = 0; i < count; i++) {
CHECK(tracker->Get() == 0);
}
// Manually restoring values should be returned in-order
CHECK_FALSE(tracker->IsAvailable());
tracker->Restore(7);
tracker->Restore(13);
CHECK(tracker->IsAvailable());
CHECK(tracker->Get() == 7);
CHECK(tracker->Get() == 13);
// A reset should throw away any values on the old list
tracker->Reset(10);
tracker->Reset(1);
tracker->Get();
CHECK(!tracker->IsAvailable());
// Calling Get when not available should Throw
CHECK_THROWS_AS(tracker->Get(), const std::exception&);
}
TEST_CASE("CtxIdTrackers: Rand")
{
std::shared_ptr<ICtxIdTracker> tracker = std::make_shared<RandCtxIdTracker>();
size_t max;
auto check_range_and_variance = [&]() {
size_t num_trials = 1000;
std::vector<size_t> results(max, 0);
for (size_t i = 0; i < num_trials; i++) {
auto x = tracker->Get();
REQUIRE((x < max && x >= 0));
results[x]++;
}
// Confirm that the distribution of the picked CTX IDs is random
double mean =
std::accumulate(results.begin(), results.end(), 0.0) / results.size();
double variance = 0;
for (size_t i = 0; i < results.size(); i++) {
variance += std::pow(results[i] - mean, 2);
}
variance /= results.size();
CHECK((variance > 10 && variance < 100));
};
// IsAvailable is always true for this class
CHECK(tracker->IsAvailable());
// Reset should define the bounds of random CTX id picking
max = 10;
tracker->Reset(max);
// Restore should have no impact on this class.
tracker->Restore(9999);
check_range_and_variance();
// Reset should RE-define the bounds of random CTX id picking
max = 5;
tracker->Reset(max);
check_range_and_variance();
}
}} // namespace triton::perfanalyzer
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment