Commit 3c15726c authored by yangzhong's avatar yangzhong
Browse files

git init

parents
loadgen_build
build
\ No newline at end of file
Note: please install jemalloc first. See: http://jemalloc.net/
Command: bash run.sh <target_qps> <0=Basic,1=Queue> <numCompleteThreads> <maxSizeInComplete> <server_coalesce_queries=0or1>
Experiments:
- On Intel(R) Xeon(R) CPU E5-1650 v4 @ 3.60GHz
- Basic SUT : 500-600k i/s
- Basic SUT + jemalloc: 800-900k i/s (`bash run.sh 800000 0`)
- Queued SUT (2 complete threads) + jemalloc: 1.2-1.3M i/s (`bash run.sh 1200000 1 2 2048`)
- Queued SUT (2 complete threads) + jemalloc + server_coalesce_queries: 1.4-1.5M is/ (`bash run.sh 1400000 1 2 512 1`)
- Basic SUT + jemalloc + server_coalesce_queries + 4 IssueQueryThreads: 2.4-2.5M is/ (`bash run.sh 2400000 0 2 512 1 4`)
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cassert>
#include <condition_variable>
#include <deque>
#include <iostream>
#include <map>
#include <mutex>
#include <thread>
#include <vector>
#include "loadgen.h"
#include "query_sample_library.h"
#include "system_under_test.h"
#include "test_settings.h"
class QSL : public mlperf::QuerySampleLibrary {
public:
~QSL() override{};
const std::string& Name() override { return mName; }
size_t TotalSampleCount() override { return 1000000; }
size_t PerformanceSampleCount() override { return TotalSampleCount(); }
void LoadSamplesToRam(const std::vector<mlperf::QuerySampleIndex>&) override {
}
void UnloadSamplesFromRam(
const std::vector<mlperf::QuerySampleIndex>&) override {}
private:
std::string mName{"Dummy QSL"};
};
class BasicSUT : public mlperf::SystemUnderTest {
public:
BasicSUT() {
// Start with some large value so that we don't reallocate memory.
initResponse(10000);
}
~BasicSUT() override {}
const std::string& Name() override { return mName; }
void IssueQuery(const std::vector<mlperf::QuerySample>& samples) override {
size_t n = samples.size();
if (n > mResponses.size()) {
std::cerr << "Warning: reallocating response buffer in BasicSUT. Maybe "
"you should initResponse with larger value!?"
<< std::endl;
initResponse(samples.size());
}
for (size_t i = 0; i < n; i++) {
mResponses[i].id = samples[i].id;
}
mlperf::QuerySamplesComplete(mResponses.data(), n);
}
void FlushQueries() override {}
private:
void initResponse(int size) {
mResponses.resize(size,
{0, reinterpret_cast<uintptr_t>(&mBuf), sizeof(int)});
}
int mBuf{0};
std::string mName{"BasicSUT"};
std::vector<mlperf::QuerySampleResponse> mResponses;
};
class QueueSUT : public mlperf::SystemUnderTest {
public:
QueueSUT(int numCompleteThreads, int maxSize) {
// Each thread handle at most maxSize at a time.
std::cout << "QueueSUT: maxSize = " << maxSize << std::endl;
initResponse(numCompleteThreads, maxSize);
// Launch complete threads
for (int i = 0; i < numCompleteThreads; i++) {
mThreads.emplace_back(&QueueSUT::CompleteThread, this, i);
}
}
~QueueSUT() override {
{
std::unique_lock<std::mutex> lck(mMtx);
mDone = true;
mCondVar.notify_all();
}
for (auto& thread : mThreads) {
thread.join();
}
}
const std::string& Name() override { return mName; }
void IssueQuery(const std::vector<mlperf::QuerySample>& samples) override {
std::unique_lock<std::mutex> lck(mMtx);
for (const auto& sample : samples) {
mIdQueue.push_back(sample.id);
}
// Let some worker thread to consume tasks
mCondVar.notify_one();
}
void FlushQueries() override {}
private:
void CompleteThread(int threadIdx) {
auto& responses = mResponses[threadIdx];
size_t maxSize{responses.size()};
size_t actualSize{0};
while (true) {
{
std::unique_lock<std::mutex> lck(mMtx);
mCondVar.wait(lck, [&]() { return !mIdQueue.empty() || mDone; });
if (mDone) {
break;
}
actualSize = std::min(maxSize, mIdQueue.size());
for (size_t i = 0; i < actualSize; i++) {
responses[i].id = mIdQueue.front();
mIdQueue.pop_front();
}
mCondVar.notify_one();
}
mlperf::QuerySamplesComplete(responses.data(), actualSize);
}
}
void initResponse(int numCompleteThreads, int size) {
mResponses.resize(numCompleteThreads);
for (auto& responses : mResponses) {
responses.resize(size,
{0, reinterpret_cast<uintptr_t>(&mBuf), sizeof(int)});
}
}
int mBuf{0};
std::string mName{"QueueSUT"};
std::vector<std::vector<mlperf::QuerySampleResponse>> mResponses;
std::vector<std::thread> mThreads;
std::deque<mlperf::ResponseId> mIdQueue;
std::mutex mMtx;
std::condition_variable mCondVar;
bool mDone{false};
};
class MultiBasicSUT : public mlperf::SystemUnderTest {
public:
MultiBasicSUT(int numThreads)
: mNumThreads(numThreads), mResponses(numThreads) {
// Start with some large value so that we don't reallocate memory.
initResponse(10000);
for (int i = 0; i < mNumThreads; ++i) {
mThreads.emplace_back(&MultiBasicSUT::startIssueThread, this, i);
}
}
~MultiBasicSUT() override {
for (auto& thread : mThreads) {
thread.join();
}
}
const std::string& Name() override { return mName; }
void IssueQuery(const std::vector<mlperf::QuerySample>& samples) override {
int thread_idx = mThreadMap[std::this_thread::get_id()];
size_t n = samples.size();
auto& reponses = mResponses[thread_idx];
if (n > reponses.size()) {
std::cout
<< "Warning: reallocating response buffer in MultiBasicSUT. Maybe "
"you should initResponse with larger value!?"
<< std::endl;
initResponse(samples.size());
}
for (size_t i = 0; i < n; i++) {
reponses[i].id = samples[i].id;
}
mlperf::QuerySamplesComplete(reponses.data(), n);
}
void FlushQueries() override {}
private:
void initResponse(int size) {
for (auto& responses : mResponses) {
responses.resize(size,
{0, reinterpret_cast<uintptr_t>(&mBuf), sizeof(int)});
}
}
void startIssueThread(int thread_idx) {
{
std::lock_guard<std::mutex> lock(mMtx);
mThreadMap[std::this_thread::get_id()] = thread_idx;
}
mlperf::RegisterIssueQueryThread();
}
int mBuf{0};
int mNumThreads{0};
std::string mName{"MultiBasicSUT"};
std::vector<std::vector<mlperf::QuerySampleResponse>> mResponses;
std::mutex mMtx;
std::vector<std::thread> mThreads;
std::map<std::thread::id, int> mThreadMap;
};
int main(int argc, char** argv) {
assert(argc >= 2 && "Need to pass in at least one argument: target_qps");
int target_qps = std::stoi(argv[1]);
std::cout << "target_qps = " << target_qps << std::endl;
bool useQueue{false};
int numCompleteThreads{4};
int maxSize{1};
bool server_coalesce_queries{false};
int num_issue_threads{0};
if (argc >= 3) {
useQueue = std::stoi(argv[2]) != 0;
}
if (argc >= 4) {
numCompleteThreads = std::stoi(argv[3]);
}
if (argc >= 5) {
maxSize = std::stoi(argv[4]);
}
if (argc >= 6) {
server_coalesce_queries = std::stoi(argv[5]) != 0;
}
if (argc >= 7) {
num_issue_threads = std::stoi(argv[6]);
}
QSL qsl;
std::unique_ptr<mlperf::SystemUnderTest> sut;
// Configure the test settings
mlperf::TestSettings testSettings;
testSettings.scenario = mlperf::TestScenario::Server;
testSettings.mode = mlperf::TestMode::PerformanceOnly;
testSettings.server_target_qps = target_qps;
testSettings.server_target_latency_ns = 10000000; // 10ms
testSettings.server_target_latency_percentile = 0.99;
testSettings.min_duration_ms = 60000;
testSettings.min_query_count = 270000;
testSettings.server_coalesce_queries = server_coalesce_queries;
std::cout << "testSettings.server_coalesce_queries = "
<< (server_coalesce_queries ? "True" : "False") << std::endl;
testSettings.server_num_issue_query_threads = num_issue_threads;
std::cout << "num_issue_threads = " << num_issue_threads << std::endl;
// Configure the logging settings
mlperf::LogSettings logSettings;
logSettings.log_output.outdir = "build";
logSettings.log_output.prefix = "mlperf_log_";
logSettings.log_output.suffix = "";
logSettings.log_output.prefix_with_datetime = false;
logSettings.log_output.copy_detail_to_stdout = false;
logSettings.log_output.copy_summary_to_stdout = true;
logSettings.log_mode = mlperf::LoggingMode::AsyncPoll;
logSettings.log_mode_async_poll_interval_ms = 1000;
logSettings.enable_trace = false;
// Choose SUT
if (num_issue_threads == 0) {
if (useQueue) {
std::cout << "Using QueueSUT with " << numCompleteThreads
<< " complete threads" << std::endl;
sut.reset(new QueueSUT(numCompleteThreads, maxSize));
} else {
std::cout << "Using BasicSUT" << std::endl;
sut.reset(new BasicSUT());
}
} else {
if (useQueue) {
std::cout << "Using MultiQueueSUT with " << numCompleteThreads
<< " complete threads" << std::endl;
std::cerr << "!!!! MultiQueueSUT is NOT implemented yet !!!!"
<< std::endl;
return 1;
// sut.reset(new MultiQueueSUT(num_issue_threads, numCompleteThreads,
// maxSize));
} else {
std::cout << "Using MultiBasicSUT" << std::endl;
sut.reset(new MultiBasicSUT(num_issue_threads));
}
}
// Start test
std::cout << "Start test..." << std::endl;
mlperf::StartTest(sut.get(), &qsl, testSettings, logSettings);
std::cout << "Test done. Clean up SUT..." << std::endl;
sut.reset();
std::cout << "Done!" << std::endl;
return 0;
}
#!/usr/bin/bash
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
echo "Building loadgen..."
if [ ! -e loadgen_build ]; then mkdir loadgen_build; fi;
cd loadgen_build && cmake ../.. && make -j && cd ..
echo "Building test program..."
if [ ! -e build ]; then mkdir build; fi;
g++ --std=c++11 -O3 -I.. -o build/repro.exe repro.cpp -Lloadgen_build -lmlperf_loadgen -lpthread && \
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 build/repro.exe $1 $2 $3 $4 $5 $6
#!/usr/bin/bash
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
echo "Building loadgen in Debug mode..."
if [ ! -e loadgen_build ]; then mkdir loadgen_build; fi;
cd loadgen_build && cmake -DCMAKE_BUILD_TYPE=Debug ../.. && make -j && cd ..
echo "Building test program in Debug mode..."
if [ ! -e build ]; then mkdir build; fi;
g++ --std=c++11 -O0 -g -I.. -o build/repro.exe repro.cpp -Lloadgen_build -lmlperf_loadgen -lpthread && \
gdb --args build/repro.exe $1 $2 $3 $4 $5 $6
/* Copyright 2019 The MLPerf Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "c_api.h"
#include <string>
#include "../loadgen.h"
#include "../query_sample.h"
#include "../query_sample_library.h"
#include "../system_under_test.h"
#include "../test_settings.h"
namespace mlperf {
namespace c {
namespace {
// Forwards SystemUnderTest calls to relevant callbacks.
class SystemUnderTestTrampoline : public SystemUnderTest {
public:
SystemUnderTestTrampoline(ClientData client_data, std::string name,
IssueQueryCallback issue_cb,
FlushQueriesCallback flush_queries_cb)
: client_data_(client_data),
name_(std::move(name)),
issue_cb_(issue_cb),
flush_queries_cb_(flush_queries_cb) {}
~SystemUnderTestTrampoline() override = default;
const std::string& Name() override { return name_; }
void IssueQuery(const std::vector<QuerySample>& samples) override {
(*issue_cb_)(client_data_, samples.data(), samples.size());
}
void FlushQueries() override { (*flush_queries_cb_)(); }
private:
ClientData client_data_;
std::string name_;
IssueQueryCallback issue_cb_;
FlushQueriesCallback flush_queries_cb_;
};
} // namespace
void* ConstructSUT(ClientData client_data, const char* name, size_t name_length,
IssueQueryCallback issue_cb,
FlushQueriesCallback flush_queries_cb) {
SystemUnderTestTrampoline* sut = new SystemUnderTestTrampoline(
client_data, std::string(name, name_length), issue_cb, flush_queries_cb);
return reinterpret_cast<void*>(sut);
}
void DestroySUT(void* sut) {
SystemUnderTestTrampoline* sut_cast =
reinterpret_cast<SystemUnderTestTrampoline*>(sut);
delete sut_cast;
}
namespace {
// Forwards QuerySampleLibrary calls to relevant callbacks.
class QuerySampleLibraryTrampoline : public QuerySampleLibrary {
public:
QuerySampleLibraryTrampoline(
ClientData client_data, std::string name, size_t total_sample_count,
size_t performance_sample_count,
LoadSamplesToRamCallback load_samples_to_ram_cb,
UnloadSamplesFromRamCallback unload_samples_from_ram_cb)
: client_data_(client_data),
name_(std::move(name)),
total_sample_count_(total_sample_count),
performance_sample_count_(performance_sample_count),
load_samples_to_ram_cb_(load_samples_to_ram_cb),
unload_samples_from_ram_cb_(unload_samples_from_ram_cb) {}
~QuerySampleLibraryTrampoline() override = default;
const std::string& Name() override { return name_; }
size_t TotalSampleCount() override { return total_sample_count_; }
size_t PerformanceSampleCount() override { return performance_sample_count_; }
void LoadSamplesToRam(const std::vector<QuerySampleIndex>& samples) override {
(*load_samples_to_ram_cb_)(client_data_, samples.data(), samples.size());
}
void UnloadSamplesFromRam(
const std::vector<QuerySampleIndex>& samples) override {
(*unload_samples_from_ram_cb_)(client_data_, samples.data(),
samples.size());
}
private:
ClientData client_data_;
std::string name_;
size_t total_sample_count_;
size_t performance_sample_count_;
LoadSamplesToRamCallback load_samples_to_ram_cb_;
UnloadSamplesFromRamCallback unload_samples_from_ram_cb_;
};
} // namespace
void* ConstructQSL(ClientData client_data, const char* name, size_t name_length,
size_t total_sample_count, size_t performance_sample_count,
LoadSamplesToRamCallback load_samples_to_ram_cb,
UnloadSamplesFromRamCallback unload_samples_from_ram_cb) {
QuerySampleLibraryTrampoline* qsl = new QuerySampleLibraryTrampoline(
client_data, std::string(name, name_length), total_sample_count,
performance_sample_count, load_samples_to_ram_cb,
unload_samples_from_ram_cb);
return reinterpret_cast<void*>(qsl);
}
void DestroyQSL(void* qsl) {
QuerySampleLibraryTrampoline* qsl_cast =
reinterpret_cast<QuerySampleLibraryTrampoline*>(qsl);
delete qsl_cast;
}
// mlperf::c::StartTest just forwards to mlperf::StartTest after doing the
// proper cast.
void StartTest(void* sut, void* qsl, const TestSettings& settings,
const std::string& audit_config_filename = "audit.config") {
SystemUnderTestTrampoline* sut_cast =
reinterpret_cast<SystemUnderTestTrampoline*>(sut);
QuerySampleLibraryTrampoline* qsl_cast =
reinterpret_cast<QuerySampleLibraryTrampoline*>(qsl);
LogSettings default_log_settings;
mlperf::StartTest(sut_cast, qsl_cast, settings, default_log_settings,
audit_config_filename);
}
void QuerySamplesComplete(QuerySampleResponse* responses,
size_t response_count) {
mlperf::QuerySamplesComplete(responses, response_count);
}
void QuerySamplesCompleteResponseCb(QuerySampleResponse* responses,
size_t response_count,
ResponseCallback response_cb,
ClientData client_data) {
mlperf::QuerySamplesComplete(
responses, response_count,
[client_data, response_cb](QuerySampleResponse* response) {
response_cb(client_data, response);
});
}
void FirstTokenComplete(QuerySampleResponse* responses, size_t response_count) {
mlperf::FirstTokenComplete(responses, response_count);
}
void FirstTokenCompleteResponseCb(QuerySampleResponse* responses,
size_t response_count,
ResponseCallback response_cb,
ClientData client_data) {
mlperf::FirstTokenComplete(
responses, response_count,
[client_data, response_cb](QuerySampleResponse* response) {
response_cb(client_data, response);
});
}
void RegisterIssueQueryThread() { mlperf::RegisterIssueQueryThread(); }
} // namespace c
} // namespace mlperf
/* Copyright 2019 The MLPerf Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
/// \file
/// \brief A C API wrapping the C++ loadgen. Not tested. Needs work.
/// \details The C API allows a C or Python client to easily create
/// a SystemUnderTest without having to expose the SystemUnderTest class
/// directly.
/// ConstructSUT works with a bunch of function poitners instead that are
/// called from an underlying trampoline class.
#ifndef SYSTEM_UNDER_TEST_C_API_H_
#define SYSTEM_UNDER_TEST_C_API_H_
#include <stddef.h>
#include <stdint.h>
#include "../query_sample.h"
#include "../test_settings.h"
namespace mlperf {
namespace c {
/// \brief Optional opaque client data that creators of SUTs and QSLs can have
/// the loadgen pass back to their callback invocations.
/// Helps avoids global variables.
typedef uintptr_t ClientData;
typedef void (*IssueQueryCallback)(ClientData, const QuerySample*, size_t);
typedef void (*FlushQueriesCallback)();
typedef void (*ResponseCallback)(ClientData, QuerySampleResponse*);
/// \brief SUT calls this function to report query result back to loadgen
void QuerySamplesComplete(QuerySampleResponse* responses,
size_t response_count);
void QuerySamplesCompleteResponseCb(QuerySampleResponse* responses,
size_t response_count,
ResponseCallback response_cb,
ClientData client_data);
void FirstTokenComplete(QuerySampleResponse* responses, size_t response_count);
void FirstTokenCompleteResponseCb(QuerySampleResponse* responses,
size_t response_count,
ResponseCallback response_cb,
ClientData client_data);
/// \brief Create an opaque SUT pointer based on C callbacks.
void* ConstructSUT(ClientData client_data, const char* name, size_t name_length,
IssueQueryCallback issue_cb,
FlushQueriesCallback flush_queries_cb);
/// \brief Destroys the SUT created by ConstructSUT.
void DestroySUT(void* sut);
typedef void (*LoadSamplesToRamCallback)(ClientData, const QuerySampleIndex*,
size_t);
typedef void (*UnloadSamplesFromRamCallback)(ClientData,
const QuerySampleIndex*, size_t);
/// \brief Create an opaque QSL pointer based on C callbacks.
void* ConstructQSL(ClientData client_data, const char* name, size_t name_length,
size_t total_sample_count, size_t performance_sample_count,
LoadSamplesToRamCallback load_samples_to_ram_cb,
UnloadSamplesFromRamCallback unload_samples_from_ram_cb);
/// \brief Destroys the QSL created by ConsructQSL.
void DestroyQSL(void* qsl);
/// \brief Run tests on a SUT created by ConstructSUT().
/// \details This is the C entry point. See mlperf::StartTest for the C++ entry
/// point.
void StartTest(void* sut, void* qsl, const TestSettings& settings,
const std::string& audit_config_filename);
///
/// \brief Register a thread for query issuing in Server scenario.
/// \details This is the C entry point. See mlperf::RegisterIssueQueryThread for
/// the C++ entry point.
///
void RegisterIssueQueryThread();
} // namespace c
} // namespace mlperf
#endif // SYSTEM_UNDER_TEST_C_API_H_
/* Copyright 2019 The MLPerf Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
/// \file
/// \brief Python bindings for the loadgen using pybind11.
#ifndef PYTHON_BINDINGS_H
#define PYTHON_BINDINGS_H
#include <functional>
#include "../loadgen.h"
#include "../query_dispatch_library.h"
#include "../query_sample.h"
#include "../query_sample_library.h"
#include "../system_under_test.h"
#include "../test_settings.h"
#include "pybind11/functional.h"
#include "pybind11/pybind11.h"
#include "pybind11/stl.h"
#include "pybind11/stl_bind.h"
namespace mlperf {
namespace {
using IssueQueryCallback = std::function<void(std::vector<QuerySample>)>;
using FastIssueQueriesCallback =
std::function<void(std::vector<ResponseId>, std::vector<QuerySampleIndex>)>;
using FlushQueriesCallback = std::function<void()>;
using NameCallback = std::function<std::string()>;
// Forwards SystemUnderTest calls to relevant callbacks.
class SystemUnderTestTrampoline : public SystemUnderTest {
public:
SystemUnderTestTrampoline(std::string name, IssueQueryCallback issue_cb,
FlushQueriesCallback flush_queries_cb)
: name_(std::move(name)),
issue_cb_(issue_cb),
flush_queries_cb_(flush_queries_cb) {}
~SystemUnderTestTrampoline() override = default;
const std::string& Name() override { return name_; }
void IssueQuery(const std::vector<QuerySample>& samples) override {
pybind11::gil_scoped_acquire gil_acquirer;
issue_cb_(samples);
}
void FlushQueries() override { flush_queries_cb_(); }
protected:
std::string name_;
IssueQueryCallback issue_cb_;
FlushQueriesCallback flush_queries_cb_;
};
class FastSystemUnderTestTrampoline : public SystemUnderTestTrampoline {
public:
FastSystemUnderTestTrampoline(std::string name,
FastIssueQueriesCallback fast_issue_cb,
FlushQueriesCallback flush_queries_cb)
: SystemUnderTestTrampoline(name, nullptr, flush_queries_cb),
fast_issue_cb_(fast_issue_cb) {}
~FastSystemUnderTestTrampoline() override = default;
void IssueQuery(const std::vector<QuerySample>& samples) override {
pybind11::gil_scoped_acquire gil_acquirer;
std::vector<ResponseId> responseIds;
std::vector<QuerySampleIndex> querySampleIndices;
for (auto& s : samples) {
responseIds.push_back(s.id);
querySampleIndices.push_back(s.index);
}
fast_issue_cb_(responseIds, querySampleIndices);
}
private:
FastIssueQueriesCallback fast_issue_cb_;
};
using LoadSamplesToRamCallback =
std::function<void(std::vector<QuerySampleIndex>)>;
using UnloadSamplesFromRamCallback =
std::function<void(std::vector<QuerySampleIndex>)>;
// Forwards QuerySampleLibrary calls to relevant callbacks.
class QuerySampleLibraryTrampoline : public QuerySampleLibrary {
public:
QuerySampleLibraryTrampoline(
std::string name, size_t total_sample_count,
size_t performance_sample_count,
LoadSamplesToRamCallback load_samples_to_ram_cb,
UnloadSamplesFromRamCallback unload_samples_from_ram_cb)
: name_(std::move(name)),
total_sample_count_(total_sample_count),
performance_sample_count_(performance_sample_count),
load_samples_to_ram_cb_(load_samples_to_ram_cb),
unload_samples_from_ram_cb_(unload_samples_from_ram_cb) {}
~QuerySampleLibraryTrampoline() override = default;
const std::string& Name() override { return name_; }
size_t TotalSampleCount() { return total_sample_count_; }
size_t PerformanceSampleCount() { return performance_sample_count_; }
void LoadSamplesToRam(const std::vector<QuerySampleIndex>& samples) override {
pybind11::gil_scoped_acquire gil_acquirer;
load_samples_to_ram_cb_(samples);
}
void UnloadSamplesFromRam(
const std::vector<QuerySampleIndex>& samples) override {
pybind11::gil_scoped_acquire gil_acquirer;
unload_samples_from_ram_cb_(samples);
}
private:
std::string name_;
size_t total_sample_count_;
size_t performance_sample_count_;
LoadSamplesToRamCallback load_samples_to_ram_cb_;
UnloadSamplesFromRamCallback unload_samples_from_ram_cb_;
};
// A QDL that allows defining callbacks for
// IssueQuery, FlushQueries, and Name methods.
class QueryDispatchLibraryTrampoline : public QueryDispatchLibrary {
public:
QueryDispatchLibraryTrampoline(IssueQueryCallback issue_query_callback,
FlushQueriesCallback flush_queries_callback,
NameCallback name_callback)
: issue_query_callback_(issue_query_callback),
flush_queries_callback_(flush_queries_callback),
name_callback_(name_callback) {}
// Returns the name of the SUT. Name shall be returned over the network
// TODO: other bindings should also be fixed eventually to be used over the
// network
const std::string& Name() override {
static std::string name; // HACK: avoid returning a reference to temporary.
pybind11::gil_scoped_acquire gil_acquirer;
name = name_callback_(); // name_callback_() shall returned name over the
// network.
return name;
}
void IssueQuery(const std::vector<QuerySample>& samples) override {
pybind11::gil_scoped_acquire gil_acquirer;
issue_query_callback_(samples);
}
void FlushQueries() override { flush_queries_callback_(); }
protected:
IssueQueryCallback issue_query_callback_;
FlushQueriesCallback flush_queries_callback_;
NameCallback name_callback_;
};
} // namespace
/// \brief Python bindings.
namespace py {
uintptr_t ConstructSUT(IssueQueryCallback issue_cb,
FlushQueriesCallback flush_queries_cb) {
SystemUnderTestTrampoline* sut =
new SystemUnderTestTrampoline("PySUT", issue_cb, flush_queries_cb);
return reinterpret_cast<uintptr_t>(sut);
}
void DestroySUT(uintptr_t sut) {
SystemUnderTestTrampoline* sut_cast =
reinterpret_cast<SystemUnderTestTrampoline*>(sut);
delete sut_cast;
}
uintptr_t ConstructFastSUT(FastIssueQueriesCallback fast_issue_cb,
FlushQueriesCallback flush_queries_cb) {
FastSystemUnderTestTrampoline* sut = new FastSystemUnderTestTrampoline(
"PyFastSUT", fast_issue_cb, flush_queries_cb);
return reinterpret_cast<uintptr_t>(sut);
}
void DestroyFastSUT(uintptr_t sut) {
FastSystemUnderTestTrampoline* sut_cast =
reinterpret_cast<FastSystemUnderTestTrampoline*>(sut);
delete sut_cast;
}
uintptr_t ConstructQSL(
size_t total_sample_count, size_t performance_sample_count,
LoadSamplesToRamCallback load_samples_to_ram_cb,
UnloadSamplesFromRamCallback unload_samples_from_ram_cb) {
QuerySampleLibraryTrampoline* qsl = new QuerySampleLibraryTrampoline(
"PyQSL", total_sample_count, performance_sample_count,
load_samples_to_ram_cb, unload_samples_from_ram_cb);
return reinterpret_cast<uintptr_t>(qsl);
}
void DestroyQSL(uintptr_t qsl) {
QuerySampleLibraryTrampoline* qsl_cast =
reinterpret_cast<QuerySampleLibraryTrampoline*>(qsl);
delete qsl_cast;
}
uintptr_t ConstructQDL(IssueQueryCallback issue_cb,
FlushQueriesCallback flush_queries_cb,
NameCallback name_callback) {
QueryDispatchLibraryTrampoline* qdl = new QueryDispatchLibraryTrampoline(
issue_cb, flush_queries_cb, name_callback);
return reinterpret_cast<uintptr_t>(qdl);
}
void DestroyQDL(uintptr_t qdl) {
QueryDispatchLibraryTrampoline* qdl_cast =
reinterpret_cast<QueryDispatchLibraryTrampoline*>(qdl);
delete qdl_cast;
}
void StartTest(uintptr_t sut, uintptr_t qsl, mlperf::TestSettings test_settings,
const std::string& audit_config_filename) {
pybind11::gil_scoped_release gil_releaser;
SystemUnderTestTrampoline* sut_cast =
reinterpret_cast<SystemUnderTestTrampoline*>(sut);
QuerySampleLibraryTrampoline* qsl_cast =
reinterpret_cast<QuerySampleLibraryTrampoline*>(qsl);
LogSettings default_log_settings;
mlperf::StartTest(sut_cast, qsl_cast, test_settings, default_log_settings,
audit_config_filename);
}
void StartTestWithLogSettings(uintptr_t sut, uintptr_t qsl,
mlperf::TestSettings test_settings,
mlperf::LogSettings log_settings,
const std::string& audit_config_filename) {
pybind11::gil_scoped_release gil_releaser;
SystemUnderTestTrampoline* sut_cast =
reinterpret_cast<SystemUnderTestTrampoline*>(sut);
QuerySampleLibraryTrampoline* qsl_cast =
reinterpret_cast<QuerySampleLibraryTrampoline*>(qsl);
mlperf::StartTest(sut_cast, qsl_cast, test_settings, log_settings,
audit_config_filename);
}
using ResponseCallback = std::function<void(QuerySampleResponse*)>;
/// TODO: Get rid of copies.
void QuerySamplesComplete(std::vector<QuerySampleResponse> responses,
ResponseCallback response_cb = {}) {
pybind11::gil_scoped_release gil_releaser;
mlperf::QuerySamplesComplete(responses.data(), responses.size(), response_cb);
}
void FirstTokenComplete(std::vector<QuerySampleResponse> responses,
ResponseCallback response_cb = {}) {
pybind11::gil_scoped_release gil_releaser;
mlperf::FirstTokenComplete(responses.data(), responses.size(), response_cb);
}
PYBIND11_MODULE(mlperf_loadgen, m) {
m.doc() = "MLPerf Inference load generator.";
pybind11::enum_<TestScenario>(m, "TestScenario")
.value("SingleStream", TestScenario::SingleStream)
.value("MultiStream", TestScenario::MultiStream)
.value("Server", TestScenario::Server)
.value("Offline", TestScenario::Offline);
pybind11::enum_<TestMode>(m, "TestMode")
.value("SubmissionRun", TestMode::SubmissionRun)
.value("AccuracyOnly", TestMode::AccuracyOnly)
.value("PerformanceOnly", TestMode::PerformanceOnly)
.value("FindPeakPerformance", TestMode::FindPeakPerformance);
pybind11::class_<TestSettings>(m, "TestSettings")
.def(pybind11::init<>())
.def_readwrite("scenario", &TestSettings::scenario)
.def_readwrite("mode", &TestSettings::mode)
.def_readwrite("single_stream_expected_latency_ns",
&TestSettings::single_stream_expected_latency_ns)
.def_readwrite("single_stream_target_latency_percentile",
&TestSettings::single_stream_target_latency_percentile)
.def_readwrite("multi_stream_expected_latency_ns",
&TestSettings::multi_stream_expected_latency_ns)
.def_readwrite("multi_stream_target_latency_percentile",
&TestSettings::multi_stream_target_latency_percentile)
.def_readwrite("multi_stream_samples_per_query",
&TestSettings::multi_stream_samples_per_query)
.def_readwrite("server_target_qps", &TestSettings::server_target_qps)
.def_readwrite("server_target_latency_ns",
&TestSettings::server_target_latency_ns)
.def_readwrite("server_target_latency_percentile",
&TestSettings::server_target_latency_percentile)
.def_readwrite("server_coalesce_queries",
&TestSettings::server_coalesce_queries)
.def_readwrite("server_find_peak_qps_decimals_of_precision",
&TestSettings::server_find_peak_qps_decimals_of_precision)
.def_readwrite("server_find_peak_qps_boundary_step_size",
&TestSettings::server_find_peak_qps_boundary_step_size)
.def_readwrite("server_max_async_queries",
&TestSettings::server_max_async_queries)
.def_readwrite("server_num_issue_query_threads",
&TestSettings::server_num_issue_query_threads)
.def_readwrite("offline_expected_qps",
&TestSettings::offline_expected_qps)
.def_readwrite("min_duration_ms", &TestSettings::min_duration_ms)
.def_readwrite("max_duration_ms", &TestSettings::max_duration_ms)
.def_readwrite("min_query_count", &TestSettings::min_query_count)
.def_readwrite("max_query_count", &TestSettings::max_query_count)
.def_readwrite("qsl_rng_seed", &TestSettings::qsl_rng_seed)
.def_readwrite("sample_index_rng_seed",
&TestSettings::sample_index_rng_seed)
.def_readwrite("schedule_rng_seed", &TestSettings::schedule_rng_seed)
.def_readwrite("accuracy_log_rng_seed",
&TestSettings::accuracy_log_rng_seed)
.def_readwrite("accuracy_log_probability",
&TestSettings::accuracy_log_probability)
.def_readwrite("print_timestamps", &TestSettings::print_timestamps)
.def_readwrite("performance_issue_unique",
&TestSettings::performance_issue_unique)
.def_readwrite("performance_issue_same",
&TestSettings::performance_issue_same)
.def_readwrite("performance_issue_same_index",
&TestSettings::performance_issue_same_index)
.def_readwrite("performance_sample_count_override",
&TestSettings::performance_sample_count_override)
.def_readwrite("test05", &TestSettings::test05)
.def_readwrite("test05_qsl_rng_seed", &TestSettings::test05_qsl_rng_seed)
.def_readwrite("test05_sample_index_rng_seed",
&TestSettings::test05_sample_index_rng_seed)
.def_readwrite("test05_schedule_rng_seed",
&TestSettings::test05_schedule_rng_seed)
.def_readwrite("use_token_latencies", &TestSettings::use_token_latencies)
.def_readwrite("ttft_latency", &TestSettings::server_ttft_latency)
.def_readwrite("tpot_latency", &TestSettings::server_tpot_latency)
.def_readwrite("infer_token_latencies",
&TestSettings::infer_token_latencies)
.def_readwrite("token_latency_scaling_factor",
&TestSettings::token_latency_scaling_factor)
.def("FromConfig", &TestSettings::FromConfig, pybind11::arg("path"),
pybind11::arg("model"), pybind11::arg("scenario"),
pybind11::arg("conf_type") = 1,
"This function configures settings from the given user "
"configuration file, model, and scenario. The conf_type flag "
"should be set to 1 for loading user.conf or else only the default "
"mlperf_conf file "
"will be loaded by the loadgen.");
pybind11::enum_<LoggingMode>(m, "LoggingMode")
.value("AsyncPoll", LoggingMode::AsyncPoll)
.value("EndOfTestOnly", LoggingMode::EndOfTestOnly)
.value("Synchronous", LoggingMode::Synchronous);
pybind11::class_<LogOutputSettings>(m, "LogOutputSettings")
.def(pybind11::init<>())
.def_readwrite("outdir", &LogOutputSettings::outdir)
.def_readwrite("prefix", &LogOutputSettings::prefix)
.def_readwrite("suffix", &LogOutputSettings::suffix)
.def_readwrite("prefix_with_datetime",
&LogOutputSettings::prefix_with_datetime)
.def_readwrite("copy_detail_to_stdout",
&LogOutputSettings::copy_detail_to_stdout)
.def_readwrite("copy_summary_to_stdout",
&LogOutputSettings::copy_summary_to_stdout);
pybind11::class_<LogSettings>(m, "LogSettings")
.def(pybind11::init<>())
.def_readwrite("log_output", &LogSettings::log_output)
.def_readwrite("log_mode", &LogSettings::log_mode)
.def_readwrite("log_mode_async_poll_interval_ms",
&LogSettings::log_mode_async_poll_interval_ms)
.def_readwrite("enable_trace", &LogSettings::enable_trace);
pybind11::class_<QuerySample>(m, "QuerySample")
.def(pybind11::init<>())
.def(pybind11::init<ResponseId, QuerySampleIndex>())
.def_readwrite("id", &QuerySample::id)
.def_readwrite("index", &QuerySample::index)
.def(pybind11::pickle(
[](const QuerySample& qs) { // __getstate__
/*Return a tuple that fully encodes state of object*/
return pybind11::make_tuple(qs.id, qs.index);
},
[](pybind11::tuple t) { // __setstate__
if (t.size() != 2)
throw std::runtime_error("Invalid state for QuerySample");
/* Create a new C++ instance*/
QuerySample q;
q.id = t[0].cast<uintptr_t>();
q.index = t[1].cast<size_t>();
return q;
}));
pybind11::class_<QuerySampleResponse>(m, "QuerySampleResponse")
.def(pybind11::init<>())
.def(pybind11::init<ResponseId, uintptr_t, size_t>())
.def(pybind11::init<ResponseId, uintptr_t, size_t, int64_t>())
.def_readwrite("id", &QuerySampleResponse::id)
.def_readwrite("data", &QuerySampleResponse::data)
.def_readwrite("size", &QuerySampleResponse::size)
.def_readwrite("n_tokens", &QuerySampleResponse::n_tokens)
.def(pybind11::pickle(
[](const QuerySampleResponse& qsr) { // __getstate__
/* Return a tuple that fully encodes state of object*/
return pybind11::make_tuple(qsr.id, qsr.data, qsr.size);
},
[](pybind11::tuple t) { // __setstate__
if ((t.size() != 3) || (t.size() != 4))
throw std::runtime_error("Invalid state for QuerySampleResponse");
/* Create a new C++ instance*/
QuerySampleResponse q;
q.id = t[0].cast<uintptr_t>();
q.data = t[1].cast<uintptr_t>();
q.size = t[2].cast<size_t>();
if (t.size() == 4) {
q.n_tokens = t[3].cast<int64_t>();
} else {
q.n_tokens = 0;
}
return q;
}));
// TODO: Use PYBIND11_MAKE_OPAQUE for the following vector types.
pybind11::bind_vector<std::vector<QuerySample>>(m, "VectorQuerySample");
pybind11::bind_vector<std::vector<QuerySampleResponse>>(
m, "VectorQuerySampleResponse");
m.def("ConstructSUT", &py::ConstructSUT, "Construct the system under test.");
m.def("DestroySUT", &py::DestroySUT,
"Destroy the object created by ConstructSUT.");
m.def("ConstructFastSUT", &py::ConstructFastSUT,
"Construct the system under test, fast issue query");
m.def("DestroyFastSUT", &py::DestroyFastSUT,
"Destroy the object created by ConstructFastSUT.");
m.def("ConstructQSL", &py::ConstructQSL,
"Construct the query sample library.");
m.def("DestroyQSL", &py::DestroyQSL,
"Destroy the object created by ConstructQSL.");
m.def("ConstructQDL", &py::ConstructQDL,
"Construct the query sample library, communicating with the SUT over "
"the network.");
m.def("DestroyQDL", &py::DestroyQDL,
"Destroy the object created by ConstructQDL.");
m.def("StartTest", &py::StartTest,
"Run tests on a SUT created by ConstructSUT() with the provided QSL. "
"Uses default log settings.",
pybind11::arg("sut"), pybind11::arg("qsl"),
pybind11::arg("test_settings"),
pybind11::arg("audit_config_filename") = "audit.config");
m.def("StartTestWithLogSettings", &py::StartTestWithLogSettings,
"Run tests on a SUT created by ConstructSUT() with the provided QSL. "
"Accepts custom log settings.",
pybind11::arg("sut"), pybind11::arg("qsl"),
pybind11::arg("test_settings"), pybind11::arg("log_settings"),
pybind11::arg("audit_config_filename") = "audit.config");
m.def("QuerySamplesComplete", &py::QuerySamplesComplete,
"Called by the SUT to indicate that samples from some combination of"
"IssueQuery calls have finished.",
pybind11::arg("responses"),
pybind11::arg("response_cb") = ResponseCallback{});
m.def("FirstTokenComplete", &py::FirstTokenComplete,
"Called by the SUT to indicate that tokens from some combination of"
"IssueQuery calls have finished.",
pybind11::arg("responses"),
pybind11::arg("response_cb") = ResponseCallback{});
}
} // namespace py
} // namespace mlperf
#endif // PYTHON_BINDINGS_H
# Demo
## Loadgen Over the Network
### Overview
This folder provides a demo implementation for LoadGen over the network.\
Two sides are implemented:
1. The SUT side which is implemented in [sut_over_network_demo.py](sut_over_network_demo.py). Each Node should run it for multiple Nodes operation.
2. The LoadGen node running the LoadGen, QSL and QDL instances, implemented in [py_demo_server_lon.py](py_demo_server_lon.py)
The demo SUT is implemented with a Flask server. the LON node implements a Flask client for network operation.
The test runs in MLPerf Server mode. the SUT is not implementing a benchmark but contains dummy interface to preprocessing, postprocessing and model calling functions.
### Setup
Install python packages:
```sh
pip install absl-py numpy wheel flask requests
```
Clone:
```sh
git clone --recurse-submodules https://github.com/mlcommons/inference.git mlperf_inference
```
Build:
```sh
cd mlperf_inference/loadgen
CFLAGS="-std=c++14 -O3" python setup.py bdist_wheel
cd ..; pip install --force-reinstall loadgen/dist/`ls -r loadgen/dist/ | head -n1` ; cd -
```
### Run the demo (single machine)
Start the demo SUT server (run this at a separate terminal):
```sh
python demos/lon/sut_over_network_demo.py --port 8000
```
Start the test:
```sh
python demos/lon/py_demo_server_lon.py --sut_server http://localhost:8000
```
### Run the demo (over the network)
To run over a network - simply run the demo SUT over on a different machine. For multiple Nodes run the demo SUT on each machine specifying the node number.\
```sh
python demos/lon/sut_over_network_demo.py --port 8000 --node N1
```
Then, when running the client, replace `localhost` with the correct IP.
```sh
python demos/lon/py_demo_server_lon.py --sut_server IP1:8000,IP2:8000,IP3:8000
```
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""
Python demo showing how to use the MLPerf Inference LoadGen over the Network bindings.
This programs runs in the LON Node side.
It runs the demo in MLPerf server mode over the network.
It communicates over the network with a Network SUT node,
which is running the Network SUT demo based on a flask server, implemented in SUT_over_network.py
"""
import threading
import requests
import array
import time
from absl import app
from absl import flags
import mlperf_loadgen
FLAGS = flags.FLAGS
flags.DEFINE_list(
"sut_server", "http://localhost:8000", "Address of the server(s) under test."
)
class QSL:
"""Demo QuerySampleLibrary with dummy features."""
def __init__(self, total_sample_count, performance_sample_count):
self.eval_features = {
i: f"what_is_my_dummy_feature_{i}?" for i in range(total_sample_count)
}
self.qsl = mlperf_loadgen.ConstructQSL(
total_sample_count,
performance_sample_count,
self.load_samples_to_ram,
self.unload_samples_from_ram,
)
def get_features(self, sample_id):
"""Returns the feature for a given sample id."""
return self.eval_features[sample_id]
def load_samples_to_ram(self, query_samples):
"""Loads the features for the given query samples into RAM."""
# Current implementation is not using this functionality.
del query_samples
return
def unload_samples_from_ram(self, query_samples):
"""Unloads the features for the given query samples from RAM."""
# Current implementation is not using this functionality.
del query_samples
return
def __del__(self):
mlperf_loadgen.DestroyQSL(self.qsl)
class QDL:
"""QDL acting as a proxy to the SUT.
This QDL communicates with the SUT via HTTP.
It uses two endpoints to communicate with the SUT:
- /predict/ : Send a query to the SUT and get a response.
- /getname/ : Get the name of the SUT. Send a getname to the SUT and get a response.
"""
def __init__(self, qsl: QSL, sut_server_addr: list):
"""
Constructor for the QDL.
Args:
qsl: The QSL to use.
sut_server_addr: A list of addresses of the SUT.
"""
self.qsl = qsl
# Construct QDL from the python binding
self.qdl = mlperf_loadgen.ConstructQDL(
self.issue_query, self.flush_queries, self.client_get_name
)
self.sut_server_addr = sut_server_addr
self.num_nodes = len(sut_server_addr)
# For round robin between the SUTs:
self.next_sut_id = 0
self.lock = threading.Lock()
def issue_query(self, query_samples):
"""Process the query to send to the SUT"""
threading.Thread(
target=self.process_query_async,
args=[query_samples]).start()
def flush_queries(self):
"""Flush the queries. Dummy implementation."""
pass
def process_query_async(self, query_samples):
"""
This function is called by the Loadgen in a separate thread.
It is responsible for
1. Creating a query for the SUT, by reading the features from the QSL.
2. Sending the query to the SUT.
3. Waiting for the response from the SUT.
4. Deserializing the response.
5. Calling mlperf_loadgen.QuerySamplesComplete(query_samples, response)
Args:
query_samples: A list of QuerySample objects.
"""
responses = []
for s in query_samples:
# Overall process:
# QDL builds a real-world query and sends to SUT --> SUT processes --> SUT sends back to QDL
# Read features from the QSL
features = self.qsl.get_features(s.index)
time.sleep(0.001) # Ensure a maximal rate of queries to the SUT
# Send the query to SUT in round robin
# Wait for a response
sut_result = self.client_predict(features, s.index)
response_array = array.array("B", sut_result.encode("utf-8"))
bi = response_array.buffer_info()
responses.append(
mlperf_loadgen.QuerySampleResponse(
s.id, bi[0], bi[1]))
mlperf_loadgen.QuerySamplesComplete(responses)
def get_sut_id_round_robin(self):
"""Get the SUT id in round robin."""
with self.lock:
res = self.next_sut_id
self.next_sut_id = (self.next_sut_id + 1) % self.num_nodes
return res
def client_predict(self, query, id):
"""Serialize the query, send it to the SUT in round robin, and return the deserialized response."""
url = "{}/predict/".format(
self.sut_server_addr[self.get_sut_id_round_robin()])
response = requests.post(url, json={"query": query, id: id})
return response.json()["result"]
def client_get_name(self):
"""Get the name of the SUT from ALL the SUTS."""
if len(self.sut_server_addr) == 1:
return requests.post(
f"{self.sut_server_addr[0]}/getname/").json()["name"]
sut_names = [
requests.post(f"{addr}/getname/").json()["name"]
for addr in self.sut_server_addr
]
return "Multi-node SUT: " + ", ".join(sut_names)
def __del__(self):
mlperf_loadgen.DestroyQDL(self.qdl)
def main(argv):
del argv
settings = mlperf_loadgen.TestSettings()
settings.scenario = mlperf_loadgen.TestScenario.Server
settings.mode = mlperf_loadgen.TestMode.PerformanceOnly
settings.server_target_qps = 100
settings.server_target_latency_ns = 100000000
settings.min_query_count = 100
settings.min_duration_ms = 10000
# QDL and QSL
qsl = QSL(1024, 128)
qdl = QDL(qsl, sut_server_addr=FLAGS.sut_server)
mlperf_loadgen.StartTest(qdl.qdl, qsl.qsl, settings)
if __name__ == "__main__":
app.run(main)
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""
Python demo showing how to use the MLPerf Inference load generator bindings over the network.
This part of the demo runs the "demo SUT" which is connected over the network to the LON node.
A corresponding "demo LON node" with the demo test is implemented in py_demo_server_lon.py.
The SUT is implemented using a Flask server, with dummy implementation of the inference processing.
Two endpoints are exposed:
- /predict/ : Receives a query (e.g., a text) runs inference, and returns a prediction.
- /getname/ : Get the name of the SUT.
The current implementation is a dummy implementation, which does not use
a real DNN model, batching, or pre/postprocessing code,
but rather just returns subset of the input query as a response,
Yet, it illustrates the basic structure of a SUT server.
"""
import argparse
from flask import Flask, request, jsonify
app = Flask(__name__)
node = ""
def preprocess(query):
"""[SUT Node] A dummy preprocess."""
# Here may come for example batching, tokenization, resizing,
# normalization, etc.
response = query
return response
def dnn_model(query):
"""[SUT Node] A dummy DNN model."""
# Here may come for example a call to a dnn model such as resnet, bert,
# etc.
response = query
return response
def postprocess(query):
"""[SUT Node] A dummy postprocess."""
# Here may come for example a postprocessing call, e.g., NMS,
# detokenization, etc.
response = query
return response
@app.route("/predict/", methods=["POST"])
def predict():
"""Receives a query (e.g., a text) runs inference, and returns a prediction."""
query = request.get_json(force=True)["query"]
result = postprocess(dnn_model(preprocess(query)))
return jsonify(result=result)
@app.route("/getname/", methods=["POST", "GET"])
def getname():
"""Returns the name of the SUT."""
return jsonify(name=f"Demo SUT (Network SUT) node" +
(" " + node) if node else "")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--node", type=str, default="")
args = parser.parse_args()
node = args.node
app.run(debug=False, port=args.port)
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Python demo showing how to use the MLPerf Inference load generator bindings.
"""
from __future__ import print_function
import threading
import time
from absl import app
import mlperf_loadgen
from datetime import datetime
# Global var
NUM_AGENTS = 8
LOOPBACK_LATENCY_S = 0.001
def load_samples_to_ram(query_samples):
del query_samples
return
def unload_samples_from_ram(query_samples):
del query_samples
return
# Processes queries in NUM_AGENTS slices that complete at different times.
def process_query_async(query_samples, i_slice):
time.sleep(LOOPBACK_LATENCY_S * (i_slice + 1))
responses = []
samples_to_complete = query_samples[i_slice: len(
query_samples): NUM_AGENTS]
for j, s in enumerate(samples_to_complete):
responses.append(mlperf_loadgen.QuerySampleResponse(s.id, 0, 0))
mlperf_loadgen.QuerySamplesComplete(responses)
def issue_query(query_samples):
for i in range(8):
threading.Thread(
target=process_query_async, args=(
query_samples, i)).start()
def flush_queries():
pass
def main(argv):
del argv
settings = mlperf_loadgen.TestSettings()
settings.scenario = mlperf_loadgen.TestScenario.MultiStream
settings.mode = mlperf_loadgen.TestMode.PerformanceOnly
settings.multi_stream_expected_latency_ns = 8000000
settings.multi_stream_samples_per_query = 8
settings.min_query_count = 100
settings.min_duration_ms = 10000
sut = mlperf_loadgen.ConstructSUT(issue_query, flush_queries)
qsl = mlperf_loadgen.ConstructQSL(
1024, 128, load_samples_to_ram, unload_samples_from_ram
)
mlperf_loadgen.StartTest(sut, qsl, settings)
mlperf_loadgen.DestroyQSL(qsl)
mlperf_loadgen.DestroySUT(sut)
if __name__ == "__main__":
app.run(main)
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Python demo showing how to use the MLPerf Inference load generator bindings.
"""
from __future__ import print_function
import threading
import time
from absl import app
import mlperf_loadgen
def load_samples_to_ram(query_samples):
del query_samples
return
def unload_samples_from_ram(query_samples):
del query_samples
return
# Processes queries in 3 slices that complete at different times.
def process_query_async(query_samples, i_slice):
time.sleep(3 * (i_slice + 1))
responses = []
samples_to_complete = query_samples[i_slice: len(query_samples): 3]
for s in samples_to_complete:
responses.append(mlperf_loadgen.QuerySampleResponse(s.id, 0, 0))
mlperf_loadgen.QuerySamplesComplete(responses)
def issue_query(query_samples):
threading.Thread(
target=process_query_async, args=(
query_samples, 0)).start()
threading.Thread(
target=process_query_async, args=(
query_samples, 1)).start()
threading.Thread(
target=process_query_async, args=(
query_samples, 2)).start()
def flush_queries():
pass
def main(argv):
del argv
settings = mlperf_loadgen.TestSettings()
settings.scenario = mlperf_loadgen.TestScenario.Offline
settings.mode = mlperf_loadgen.TestMode.PerformanceOnly
settings.offline_expected_qps = 1000
sut = mlperf_loadgen.ConstructSUT(issue_query, flush_queries)
qsl = mlperf_loadgen.ConstructQSL(
1024, 128, load_samples_to_ram, unload_samples_from_ram
)
mlperf_loadgen.StartTest(sut, qsl, settings)
mlperf_loadgen.DestroyQSL(qsl)
mlperf_loadgen.DestroySUT(sut)
if __name__ == "__main__":
app.run(main)
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Python demo showing how to use the MLPerf Inference load generator bindings.
"""
from __future__ import print_function
import threading
import time
from absl import app
import mlperf_loadgen
def load_samples_to_ram(query_samples):
del query_samples
return
def unload_samples_from_ram(query_samples):
del query_samples
return
def process_query_async(query_samples):
time.sleep(0.001)
responses = []
for s in query_samples:
responses.append(mlperf_loadgen.QuerySampleResponse(s.id, 0, 0))
mlperf_loadgen.QuerySamplesComplete(responses)
def issue_query(query_samples):
threading.Thread(target=process_query_async, args=[query_samples]).start()
def flush_queries():
pass
def main(argv):
del argv
settings = mlperf_loadgen.TestSettings()
settings.scenario = mlperf_loadgen.TestScenario.Server
settings.mode = mlperf_loadgen.TestMode.PerformanceOnly
settings.server_target_qps = 100
settings.server_target_latency_ns = 100000000
settings.min_query_count = 100
settings.min_duration_ms = 10000
sut = mlperf_loadgen.ConstructSUT(issue_query, flush_queries)
qsl = mlperf_loadgen.ConstructQSL(
1024, 128, load_samples_to_ram, unload_samples_from_ram
)
mlperf_loadgen.StartTest(sut, qsl, settings)
mlperf_loadgen.DestroyQSL(qsl)
mlperf_loadgen.DestroySUT(sut)
if __name__ == "__main__":
app.run(main)
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Python demo showing how to use the MLPerf Inference load generator bindings.
"""
from __future__ import print_function
import array
import threading
import time
from absl import app
import mlperf_loadgen
def load_samples_to_ram(query_samples):
del query_samples
return
def unload_samples_from_ram(query_samples):
del query_samples
return
def process_query_async(query_samples):
"""Processes the list of queries."""
time.sleep(0.001)
responses = []
response_array = array.array(
"f", [0, 1, 7, 8, 15, 16, 31, 32, 63, 64, 127, 128, 254, 255]
)
response_info = response_array.buffer_info()
response_data = response_info[0]
response_size = response_info[1] * response_array.itemsize
for s in query_samples:
responses.append(
mlperf_loadgen.QuerySampleResponse(
s.id, response_data, response_size)
)
mlperf_loadgen.QuerySamplesComplete(responses)
def issue_query(query_samples):
threading.Thread(target=process_query_async, args=[query_samples]).start()
def flush_queries():
pass
def main(argv):
del argv
settings = mlperf_loadgen.TestSettings()
settings.scenario = mlperf_loadgen.TestScenario.SingleStream
settings.mode = mlperf_loadgen.TestMode.PerformanceOnly
settings.single_stream_expected_latency_ns = 1000000
settings.min_query_count = 100
settings.min_duration_ms = 10000
sut = mlperf_loadgen.ConstructSUT(issue_query, flush_queries)
qsl = mlperf_loadgen.ConstructQSL(
1024, 128, load_samples_to_ram, unload_samples_from_ram
)
mlperf_loadgen.StartTest(sut, qsl, settings)
mlperf_loadgen.DestroyQSL(qsl)
mlperf_loadgen.DestroySUT(sut)
if __name__ == "__main__":
app.run(main)
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Python demo showing how to use the MLPerf Inference load generator bindings.
"""
from __future__ import print_function
import argparse
import threading
import time
import numpy as np
import array
import mlperf_loadgen
from datetime import datetime
# Global var
NUM_AGENTS = 8
LOOPBACK_LATENCY_S = 0.001
def f(x, y):
return 4 + 3 * x * y + x**3 + y**2
def create_responses(n, m, mod=4):
r = []
for i in range(n):
r.append([f(i, j) for j in range(m + (i % mod))])
return r
responses = create_responses(1024, 20)
def load_samples_to_ram(query_samples):
del query_samples
return
def unload_samples_from_ram(query_samples):
del query_samples
return
# Processes queries in NUM_AGENTS slices that complete at different times.
def process_query_async(query_samples, i_slice):
time.sleep(LOOPBACK_LATENCY_S * (i_slice + 1))
query_responses = []
samples_to_complete = query_samples[i_slice: len(
query_samples): NUM_AGENTS]
for j, s in enumerate(samples_to_complete):
response_array = np.array(responses[s.index], np.int32)
token = response_array[0]
time.sleep(0.0002)
response_token = array.array("B", token.tobytes())
response_token_info = response_token.buffer_info()
response_token_data = response_token_info[0]
response_token_size = response_token_info[1] * response_token.itemsize
mlperf_loadgen.FirstTokenComplete(
[
mlperf_loadgen.QuerySampleResponse(
s.id, response_token_data, response_token_size
)
]
)
time.sleep(0.02)
n_tokens = len(response_array)
response_array = array.array("B", response_array.tobytes())
response_info = response_array.buffer_info()
response_data = response_info[0]
response_size = response_info[1] * response_array.itemsize
query_responses.append(
mlperf_loadgen.QuerySampleResponse(
s.id, response_data, response_size, n_tokens
)
)
mlperf_loadgen.QuerySamplesComplete(query_responses)
def issue_query(query_samples):
for i in range(8):
threading.Thread(
target=process_query_async, args=(
query_samples, i)).start()
def flush_queries():
pass
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode", choices=["performance", "accuracy"], default="performance"
)
parser.add_argument("--expected-latency", type=int, default=8000000)
parser.add_argument("--samples-per-query", type=int, default=8)
parser.add_argument("--min-query-count", type=int, default=100)
parser.add_argument("--min-duration-ms", type=int, default=30000)
return parser.parse_args()
def main():
args = get_args()
settings = mlperf_loadgen.TestSettings()
settings.scenario = mlperf_loadgen.TestScenario.MultiStream
if args.mode == "performance":
settings.mode = mlperf_loadgen.TestMode.PerformanceOnly
else:
settings.mode = mlperf_loadgen.TestMode.AccuracyOnly
settings.multi_stream_expected_latency_ns = args.expected_latency
settings.multi_stream_samples_per_query = args.samples_per_query
settings.min_query_count = args.min_query_count
settings.min_duration_ms = args.min_duration_ms
settings.use_token_latencies = True
sut = mlperf_loadgen.ConstructSUT(issue_query, flush_queries)
qsl = mlperf_loadgen.ConstructQSL(
1024, 128, load_samples_to_ram, unload_samples_from_ram
)
mlperf_loadgen.StartTest(sut, qsl, settings)
mlperf_loadgen.DestroyQSL(qsl)
mlperf_loadgen.DestroySUT(sut)
if __name__ == "__main__":
main()
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Python demo showing how to use the MLPerf Inference load generator bindings.
"""
from __future__ import print_function
import argparse
import threading
import time
import numpy as np
import array
import mlperf_loadgen
def f(x, y):
return 4 + 3 * x * y + x**3 + y**2
def create_responses(n, m, mod=4):
r = []
for i in range(n):
r.append([f(i, j) for j in range(m + (i % mod))])
return r
responses = create_responses(1024, 20)
def load_samples_to_ram(query_samples):
del query_samples
return
def unload_samples_from_ram(query_samples):
del query_samples
return
# Processes queries in 3 slices that complete at different times.
def process_query_async(query_samples, i_slice):
time.sleep(3 * (i_slice + 1))
query_responses = []
samples_to_complete = query_samples[i_slice: len(query_samples): 3]
for s in samples_to_complete:
response_array = np.array(responses[s.index], np.int32)
token = response_array[0]
time.sleep(0.0002)
response_token = array.array("B", token.tobytes())
response_token_info = response_token.buffer_info()
response_token_data = response_token_info[0]
response_token_size = response_token_info[1] * response_token.itemsize
# mlperf_loadgen.FirstTokenComplete([mlperf_loadgen.QuerySampleResponse(s.id, response_token_data, response_token_size)])
time.sleep(0.02)
n_tokens = len(response_array)
response_array = array.array("B", response_array.tobytes())
response_info = response_array.buffer_info()
response_data = response_info[0]
response_size = response_info[1] * response_array.itemsize
query_responses.append(
mlperf_loadgen.QuerySampleResponse(
s.id, response_data, response_size, n_tokens
)
)
mlperf_loadgen.QuerySamplesComplete(query_responses)
def issue_query(query_samples):
threading.Thread(
target=process_query_async, args=(
query_samples, 0)).start()
threading.Thread(
target=process_query_async, args=(
query_samples, 1)).start()
threading.Thread(
target=process_query_async, args=(
query_samples, 2)).start()
def flush_queries():
pass
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode", choices=["performance", "accuracy"], default="performance"
)
parser.add_argument("--expected-qps", type=int, default=1000)
parser.add_argument("--min-duration-ms", type=int, default=30000)
return parser.parse_args()
def main():
args = get_args()
settings = mlperf_loadgen.TestSettings()
settings.scenario = mlperf_loadgen.TestScenario.Offline
if args.mode == "performance":
settings.mode = mlperf_loadgen.TestMode.PerformanceOnly
else:
settings.mode = mlperf_loadgen.TestMode.AccuracyOnly
settings.offline_expected_qps = args.expected_qps
settings.min_duration_ms = args.min_duration_ms
settings.use_token_latencies = True
sut = mlperf_loadgen.ConstructSUT(issue_query, flush_queries)
qsl = mlperf_loadgen.ConstructQSL(
1024, 128, load_samples_to_ram, unload_samples_from_ram
)
mlperf_loadgen.StartTest(sut, qsl, settings)
mlperf_loadgen.DestroyQSL(qsl)
mlperf_loadgen.DestroySUT(sut)
if __name__ == "__main__":
main()
# Copyright 2019 The MLPerf Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Python demo showing how to use the MLPerf Inference load generator bindings.
"""
from __future__ import print_function
import argparse
import threading
import time
import numpy as np
import array
import mlperf_loadgen
def f(x, y):
return 4 + 3 * x * y + x**3 + y**2
def create_responses(n, m, mod=4):
r = []
for i in range(n):
r.append([f(i, j) for j in range(m + (i % mod))])
return r
responses = create_responses(1024, 20, mod=3)
def load_samples_to_ram(query_samples):
del query_samples
return
def unload_samples_from_ram(query_samples):
del query_samples
return
# Processes queries in 3 slices that complete at different times.
def process_query_async(query_samples, i_slice):
time.sleep(3 * (i_slice + 1))
query_responses = []
samples_to_complete = query_samples[i_slice: len(query_samples): 3]
for s in samples_to_complete:
response_array = np.array(responses[s.index], np.int32)
token = response_array[0]
time.sleep(0.0002)
response_token = array.array("B", token.tobytes())
response_token_info = response_token.buffer_info()
response_token_data = response_token_info[0]
response_token_size = response_token_info[1] * response_token.itemsize
# mlperf_loadgen.FirstTokenComplete([mlperf_loadgen.QuerySampleResponse(s.id, response_token_data, response_token_size)])
time.sleep(0.02)
n_tokens = len(response_array)
response_array = array.array("B", response_array.tobytes())
response_info = response_array.buffer_info()
response_data = response_info[0]
response_size = response_info[1] * response_array.itemsize
query_responses.append(
mlperf_loadgen.QuerySampleResponse(
s.id, response_data, response_size)
)
mlperf_loadgen.QuerySamplesComplete(query_responses)
def issue_query(query_samples):
threading.Thread(
target=process_query_async, args=(
query_samples, 0)).start()
threading.Thread(
target=process_query_async, args=(
query_samples, 1)).start()
threading.Thread(
target=process_query_async, args=(
query_samples, 2)).start()
def flush_queries():
pass
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode", choices=["performance", "accuracy"], default="performance"
)
parser.add_argument("--expected-qps", type=int, default=1000)
parser.add_argument("--min-duration-ms", type=int, default=30000)
return parser.parse_args()
def main():
args = get_args()
settings = mlperf_loadgen.TestSettings()
settings.scenario = mlperf_loadgen.TestScenario.Offline
if args.mode == "performance":
settings.mode = mlperf_loadgen.TestMode.PerformanceOnly
else:
settings.mode = mlperf_loadgen.TestMode.AccuracyOnly
settings.offline_expected_qps = args.expected_qps
settings.min_duration_ms = args.min_duration_ms
settings.infer_token_latencies = 1
settings.token_latency_scaling_factor = 21
sut = mlperf_loadgen.ConstructSUT(issue_query, flush_queries)
qsl = mlperf_loadgen.ConstructQSL(
1024, 128, load_samples_to_ram, unload_samples_from_ram
)
mlperf_loadgen.StartTest(sut, qsl, settings)
mlperf_loadgen.DestroyQSL(qsl)
mlperf_loadgen.DestroySUT(sut)
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment