Commit 6e3526b4 authored by Peter Eastman's avatar Peter Eastman
Browse files

Use a separate host thread for each device

parent b51a1b3c
......@@ -68,7 +68,9 @@ public:
PlatformData(int numParticles, const std::string& deviceIndexProperty);
~PlatformData();
void initializeContexts(const System& system);
void syncContexts();
std::vector<OpenCLContext*> contexts;
std::vector<double> contextEnergy;
bool removeCM;
int cmMotionFrequency;
int stepCount, computeForceCount;
......
......@@ -54,7 +54,7 @@ const int OpenCLContext::TileSize = 32;
OpenCLContext::OpenCLContext(int numParticles, int deviceIndex, OpenCLPlatform::PlatformData& platformData) :
time(0.0), platformData(platformData), stepCount(0), computeForceCount(0), posq(NULL), velm(NULL),
forceBuffers(NULL), energyBuffer(NULL), atomIndex(NULL), integration(NULL), nonbonded(NULL) {
forceBuffers(NULL), energyBuffer(NULL), atomIndex(NULL), integration(NULL), nonbonded(NULL), thread(NULL) {
try {
contextIndex = platformData.contexts.size();
std::vector<cl::Platform> platforms;
......@@ -166,6 +166,10 @@ OpenCLContext::OpenCLContext(int numParticles, int deviceIndex, OpenCLPlatform::
compilationOptions += " -DLOG=native_log";
else
compilationOptions += " -DLOG=log";
// Create the work thread used for parallelization when running on multiple devices.
thread = new WorkThread();
}
OpenCLContext::~OpenCLContext() {
......@@ -187,6 +191,8 @@ OpenCLContext::~OpenCLContext() {
delete integration;
if (nonbonded != NULL)
delete nonbonded;
if (thread != NULL)
delete thread;
}
void OpenCLContext::initialize(const System& system) {
......@@ -625,3 +631,86 @@ void OpenCLContext::reorderAtoms() {
velm->upload();
atomIndex->upload();
}
struct OpenCLContext::WorkThread::ThreadData {
ThreadData(std::queue<OpenCLContext::WorkTask*>& tasks, bool& waiting, bool& finished,
pthread_mutex_t& queueLock, pthread_cond_t& waitForTaskCondition, pthread_cond_t& queueEmptyCondition) :
tasks(tasks), waiting(waiting), finished(finished), queueLock(queueLock),
waitForTaskCondition(waitForTaskCondition), queueEmptyCondition(queueEmptyCondition) {
}
std::queue<OpenCLContext::WorkTask*>& tasks;
bool& waiting;
bool& finished;
pthread_mutex_t& queueLock;
pthread_cond_t& waitForTaskCondition;
pthread_cond_t& queueEmptyCondition;
};
static void* threadBody(void* args) {
OpenCLContext::WorkThread::ThreadData& data = *reinterpret_cast<OpenCLContext::WorkThread::ThreadData*>(args);
while (!data.finished || data.tasks.size() > 0) {
pthread_mutex_lock(&data.queueLock);
while (data.tasks.empty() && !data.finished) {
data.waiting = true;
pthread_cond_signal(&data.queueEmptyCondition);
pthread_cond_wait(&data.waitForTaskCondition, &data.queueLock);
}
OpenCLContext::WorkTask* task = NULL;
if (!data.tasks.empty()) {
data.waiting = false;
task = data.tasks.front();
data.tasks.pop();
}
pthread_mutex_unlock(&data.queueLock);
if (task != NULL) {
task->execute();
delete task;
}
}
data.waiting = true;
pthread_cond_signal(&data.queueEmptyCondition);
delete &data;
return 0;
}
OpenCLContext::WorkThread::WorkThread() : waiting(true), finished(false) {
pthread_mutex_init(&queueLock, NULL);
pthread_cond_init(&waitForTaskCondition, NULL);
pthread_cond_init(&queueEmptyCondition, NULL);
ThreadData* data = new ThreadData(tasks, waiting, finished, queueLock, waitForTaskCondition, queueEmptyCondition);
pthread_create(&thread, NULL, threadBody, data);
}
OpenCLContext::WorkThread::~WorkThread() {
pthread_mutex_lock(&queueLock);
finished = true;
pthread_cond_broadcast(&waitForTaskCondition);
pthread_mutex_unlock(&queueLock);
pthread_join(thread, NULL);
pthread_mutex_destroy(&queueLock);
pthread_cond_destroy(&waitForTaskCondition);
pthread_cond_destroy(&queueEmptyCondition);
}
void OpenCLContext::WorkThread::addTask(OpenCLContext::WorkTask* task) {
pthread_mutex_lock(&queueLock);
tasks.push(task);
waiting = false;
pthread_cond_signal(&waitForTaskCondition);
pthread_mutex_unlock(&queueLock);
}
bool OpenCLContext::WorkThread::isWaiting() {
return waiting;
}
bool OpenCLContext::WorkThread::isFinished() {
return finished;
}
void OpenCLContext::WorkThread::flush() {
pthread_mutex_lock(&queueLock);
while (!waiting)
pthread_cond_wait(&queueEmptyCondition, &queueLock);
pthread_mutex_unlock(&queueLock);
}
......@@ -28,7 +28,9 @@
* -------------------------------------------------------------------------- */
#include <map>
#include <queue>
#include <string>
#include <pthread.h>
#define __CL_ENABLE_EXCEPTIONS
#ifdef _MSC_VER
// Prevent Windows from defining macros that interfere with other code.
......@@ -130,10 +132,16 @@ struct mm_int16 {
* specific to a particular device, and manages data structures and kernels for that device. When running a simulation
* in parallel on multiple devices, there is a separate OpenCLContext for each one. The list of all contexts is
* stored in the OpenCLPlatform::PlatformData.
* <p>
* In addition, a worker thread is created for each OpenCLContext. This is used for parallel computations, so that
* blocking calls to one device will not block other devices. When only a single device is being used, the worker
* thread is not used and calculations are performed on the main application thread.
*/
class OPENMM_EXPORT OpenCLContext {
public:
class WorkTask;
class WorkThread;
static const int ThreadBlockSize;
static const int TileSize;
OpenCLContext(int numParticles, int deviceIndex, OpenCLPlatform::PlatformData& platformData);
......@@ -407,6 +415,12 @@ public:
OpenCLNonbondedUtilities& getNonbondedUtilities() {
return *nonbonded;
}
/**
* Get the thread used by this context for executing parallel computations.
*/
WorkThread& getWorkThread() {
return *thread;
}
/**
* Reorder the internal arrays of atoms to try to keep spatially contiguous atoms close
* together in the arrays.
......@@ -454,6 +468,7 @@ private:
std::vector<int> autoclearBufferSizes;
OpenCLIntegrationUtilities* integration;
OpenCLNonbondedUtilities* nonbonded;
WorkThread* thread;
};
struct OpenCLContext::MoleculeGroup {
......@@ -461,6 +476,44 @@ struct OpenCLContext::MoleculeGroup {
std::vector<int> instances;
};
/**
* This abstract class defines a task to be executed on the worker thread.
*/
class OpenCLContext::WorkTask {
public:
virtual void execute() = 0;
};
class OpenCLContext::WorkThread {
public:
struct ThreadData;
WorkThread();
~WorkThread();
/**
* Request that a task be executed on the worker thread. The argument should have been allocated on the
* heap with the "new" operator. After its execute() method finishes, the object will be deleted automatically.
*/
void addTask(OpenCLContext::WorkTask* task);
/**
* Get whether the worker thread is idle, waiting for a task to be added.
*/
bool isWaiting();
/**
* Get whether the worker thread has exited.
*/
bool isFinished();
/**
* Block until all tasks have finished executing and the worker thread is idle.
*/
void flush();
private:
std::queue<OpenCLContext::WorkTask*> tasks;
bool waiting, finished;
pthread_mutex_t queueLock;
pthread_cond_t waitForTaskCondition, queueEmptyCondition;
pthread_t thread;
};
} // namespace OpenMM
#endif /*OPENMM_OPENCLCONTEXT_H_*/
......@@ -56,6 +56,8 @@ KernelImpl* OpenCLKernelFactory::createKernelImpl(std::string name, const Platfo
return new OpenCLParallelCalcCustomTorsionForceKernel(name, platform, data, context.getSystem());
if (name == CalcNonbondedForceKernel::Name())
return new OpenCLParallelCalcNonbondedForceKernel(name, platform, data, context.getSystem());
if (name == CalcCustomNonbondedForceKernel::Name())
return new OpenCLParallelCalcCustomNonbondedForceKernel(name, platform, data, context.getSystem());
if (name == CalcCustomExternalForceKernel::Name())
return new OpenCLParallelCalcCustomExternalForceKernel(name, platform, data, context.getSystem());
if (name == CalcCustomHbondForceKernel::Name())
......
......@@ -72,6 +72,8 @@ public:
*/
double finishComputation(ContextImpl& context, bool includeForce, bool includeEnergy);
private:
class BeginComputationTask;
class FinishComputationTask;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -102,6 +104,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -132,6 +135,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -162,6 +166,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -192,6 +197,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -221,6 +227,7 @@ public:
* @return the potential energy due to the force
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
class Task;
private:
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
......@@ -252,6 +259,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -282,6 +290,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -312,6 +321,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -342,6 +352,38 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
/**
* This kernel is invoked by CustomNonbondedForce to calculate the forces acting on the system.
*/
class OpenCLParallelCalcCustomNonbondedForceKernel : public CalcCustomNonbondedForceKernel {
public:
OpenCLParallelCalcCustomNonbondedForceKernel(std::string name, const Platform& platform, OpenCLPlatform::PlatformData& data, System& system);
OpenCLCalcCustomNonbondedForceKernel& getKernel(int index) {
return dynamic_cast<OpenCLCalcCustomNonbondedForceKernel&>(kernels[index].getImpl());
}
/**
* Initialize the kernel.
*
* @param system the System this kernel will be applied to
* @param force the CustomNonbondedForce this kernel will be used for
*/
void initialize(const System& system, const CustomNonbondedForce& force);
/**
* Execute the kernel to calculate the forces and/or energy.
*
* @param context the context in which to execute this kernel
* @param includeForces true if forces should be calculated
* @param includeEnergy true if the energy should be calculated
* @return the potential energy due to the force
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -372,6 +414,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......@@ -402,6 +445,7 @@ public:
*/
double execute(ContextImpl& context, bool includeForces, bool includeEnergy);
private:
class Task;
OpenCLPlatform::PlatformData& data;
std::vector<Kernel> kernels;
};
......
......@@ -114,7 +114,7 @@ OpenCLPlatform::PlatformData::PlatformData(int numParticles, const string& devic
for (int i = 0; i < (int) devices.size(); i++) {
if (devices[i].length() > 0) {
unsigned int deviceIndex;
stringstream(deviceIndexProperty) >> deviceIndex;
stringstream(devices[i]) >> deviceIndex;
contexts.push_back(new OpenCLContext(numParticles, deviceIndex, *this));
}
}
......@@ -127,6 +127,7 @@ OpenCLPlatform::PlatformData::PlatformData(int numParticles, const string& devic
device << contexts[i]->getDeviceIndex();
}
propertyValues[OpenCLPlatform::OpenCLDeviceIndex()] = device.str();
contextEnergy.resize(contexts.size());
}
OpenCLPlatform::PlatformData::~PlatformData() {
......@@ -138,3 +139,8 @@ void OpenCLPlatform::PlatformData::initializeContexts(const System& system) {
for (int i = 0; i < (int) contexts.size(); i++)
contexts[i]->initialize(system);
}
void OpenCLPlatform::PlatformData::syncContexts() {
for (int i = 0; i < (int) contexts.size(); i++)
contexts[i]->getWorkThread().flush();
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment