Commit f3fe550a authored by peastman's avatar peastman
Browse files

Created ThreadPool to simplify parallel code

parent 0be0772b
#ifndef OPENMM_THREAD_POOL_H_
#define OPENMM_THREAD_POOL_H_
/* -------------------------------------------------------------------------- *
* OpenMM *
* -------------------------------------------------------------------------- *
* This is part of the OpenMM molecular simulation toolkit originating from *
* Simbios, the NIH National Center for Physics-Based Simulation of *
* Biological Structures at Stanford, funded under the NIH Roadmap for *
* Medical Research, grant U54 GM072970. See https://simtk.org. *
* *
* Portions copyright (c) 2013 Stanford University and the Authors. *
* Authors: Peter Eastman *
* Contributors: *
* *
* Permission is hereby granted, free of charge, to any person obtaining a *
* copy of this software and associated documentation files (the "Software"), *
* to deal in the Software without restriction, including without limitation *
* the rights to use, copy, modify, merge, publish, distribute, sublicense, *
* and/or sell copies of the Software, and to permit persons to whom the *
* Software is furnished to do so, subject to the following conditions: *
* *
* The above copyright notice and this permission notice shall be included in *
* all copies or substantial portions of the Software. *
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *
* THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, *
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR *
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE *
* USE OR OTHER DEALINGS IN THE SOFTWARE. *
* -------------------------------------------------------------------------- */
#include "windowsExport.h"
#include <pthread.h>
#include <vector>
namespace OpenMM {
/**
* A ThreadPool creates a set of worker threads that can be used to execute tasks in parallel.
* After creating a ThreadPool, call execute() to start a task running then waitForThreads()
* to block until all threads have finished. You also can synchronize the threads in the middle
* of the task by having them call syncThreads(). In this case, the parent thread should call
* waitForThreads() an additional time; each call waits until all worker threads have reached the
* next syncThreads(), and the final call waits until they exit from the Task's execute() method.
* After calling waitForThreads() to block at a synchronization point, the parent thread should
* call resumeThreads() to instruct the worker threads to resume.
*/
class OPENMM_EXPORT ThreadPool {
public:
class Task;
class ThreadData;
ThreadPool();
~ThreadPool();
/**
* Get the number of worker threads in the pool.
*/
int getNumThreads() const;
/**
* Execute a Task in parallel on the worker threads.
*/
void execute(Task& task);
/**
* This is called by the worker threads to block until all threads have reached the same point
* and the master thread instructs them to continue by calling resumeThreads().
*/
void syncThreads();
/**
* This is called by the master thread to wait until all threads have completed the Task. Alternatively,
* if the threads call syncThreads(), this blocks until all threads have reached the synchronization point.
*/
void waitForThreads();
/**
* Instruct the threads to resume running after blocking at a synchronization point.
*/
void resumeThreads();
private:
bool isDeleted;
int numThreads, waitCount;
std::vector<pthread_t> thread;
std::vector<ThreadData*> threadData;
pthread_cond_t startCondition, endCondition;
pthread_mutex_t lock;
};
/**
* This defines a task that can be executed in parallel by the worker threads.
*/
class OPENMM_EXPORT ThreadPool::Task {
public:
/**
* Execute the task on each thread.
*
* @param pool the ThreadPool being used to execute the task
* @param threadIndex the index of the thread invoking this method
*/
virtual void execute(ThreadPool& pool, int threadIndex) = 0;
};
} // namespace OpenMM
#endif // OPENMM_THREAD_POOL_H_
/* -------------------------------------------------------------------------- *
* OpenMM *
* -------------------------------------------------------------------------- *
* This is part of the OpenMM molecular simulation toolkit originating from *
* Simbios, the NIH National Center for Physics-Based Simulation of *
* Biological Structures at Stanford, funded under the NIH Roadmap for *
* Medical Research, grant U54 GM072970. See https://simtk.org. *
* *
* Portions copyright (c) 2013 Stanford University and the Authors. *
* Authors: Peter Eastman *
* Contributors: *
* *
* Permission is hereby granted, free of charge, to any person obtaining a *
* copy of this software and associated documentation files (the "Software"), *
* to deal in the Software without restriction, including without limitation *
* the rights to use, copy, modify, merge, publish, distribute, sublicense, *
* and/or sell copies of the Software, and to permit persons to whom the *
* Software is furnished to do so, subject to the following conditions: *
* *
* The above copyright notice and this permission notice shall be included in *
* all copies or substantial portions of the Software. *
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *
* THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, *
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR *
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE *
* USE OR OTHER DEALINGS IN THE SOFTWARE. *
* -------------------------------------------------------------------------- */
#include "openmm/internal/ThreadPool.h"
#include "openmm/internal/hardware.h"
using namespace std;
namespace OpenMM {
class ThreadPool::ThreadData {
public:
ThreadData(ThreadPool& owner, int index) : owner(owner), index(index), isDeleted(false) {
}
ThreadPool& owner;
int index;
bool isDeleted;
Task* currentTask;
};
static void* threadBody(void* args) {
ThreadPool::ThreadData& data = *reinterpret_cast<ThreadPool::ThreadData*>(args);
while (true) {
// Wait for the signal to start running.
data.owner.syncThreads();
if (data.isDeleted)
break;
data.currentTask->execute(data.owner, data.index);
}
delete &data;
return 0;
}
ThreadPool::ThreadPool() {
numThreads = getNumProcessors();
pthread_cond_init(&startCondition, NULL);
pthread_cond_init(&endCondition, NULL);
pthread_mutex_init(&lock, NULL);
thread.resize(numThreads);
pthread_mutex_lock(&lock);
waitCount = 0;
for (int i = 0; i < numThreads; i++) {
ThreadData* data = new ThreadData(*this, i);
data->isDeleted = false;
threadData.push_back(data);
pthread_create(&thread[i], NULL, threadBody, data);
}
while (waitCount < numThreads)
pthread_cond_wait(&endCondition, &lock);
pthread_mutex_unlock(&lock);
}
ThreadPool::~ThreadPool() {
for (int i = 0; i < (int) threadData.size(); i++)
threadData[i]->isDeleted = true;
pthread_mutex_lock(&lock);
pthread_cond_broadcast(&startCondition);
pthread_mutex_unlock(&lock);
for (int i = 0; i < (int) thread.size(); i++)
pthread_join(thread[i], NULL);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&startCondition);
pthread_cond_destroy(&endCondition);
}
int ThreadPool::getNumThreads() const {
return numThreads;
}
void ThreadPool::execute(Task& task) {
for (int i = 0; i < (int) threadData.size(); i++)
threadData[i]->currentTask = &task;
resumeThreads();
}
void ThreadPool::syncThreads() {
pthread_mutex_lock(&lock);
waitCount++;
pthread_cond_signal(&endCondition);
pthread_cond_wait(&startCondition, &lock);
pthread_mutex_unlock(&lock);
}
void ThreadPool::waitForThreads() {
pthread_mutex_lock(&lock);
while (waitCount < numThreads)
pthread_cond_wait(&endCondition, &lock);
pthread_mutex_unlock(&lock);
}
void ThreadPool::resumeThreads() {
pthread_mutex_lock(&lock);
waitCount = 0;
pthread_cond_broadcast(&startCondition);
pthread_mutex_unlock(&lock);
}
} // namespace OpenMM
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "CpuNonbondedForce.h" #include "CpuNonbondedForce.h"
#include "openmm/kernels.h" #include "openmm/kernels.h"
#include "openmm/System.h" #include "openmm/System.h"
#include "openmm/internal/ThreadPool.h"
namespace OpenMM { namespace OpenMM {
...@@ -90,6 +91,7 @@ private: ...@@ -90,6 +91,7 @@ private:
NonbondedMethod nonbondedMethod; NonbondedMethod nonbondedMethod;
CpuNeighborList neighborList; CpuNeighborList neighborList;
CpuNonbondedForce nonbonded; CpuNonbondedForce nonbonded;
ThreadPool threads;
Kernel optimizedPme; Kernel optimizedPme;
}; };
......
#ifndef OPENMM_CPU_NEIGHBORLIST_H_ #ifndef OPENMM_CPU_NEIGHBORLIST_H_
#define OPENMM_CPU_NEIGHBORLIST_H_ #define OPENMM_CPU_NEIGHBORLIST_H_
/* -------------------------------------------------------------------------- *
* OpenMM *
* -------------------------------------------------------------------------- *
* This is part of the OpenMM molecular simulation toolkit originating from *
* Simbios, the NIH National Center for Physics-Based Simulation of *
* Biological Structures at Stanford, funded under the NIH Roadmap for *
* Medical Research, grant U54 GM072970. See https://simtk.org. *
* *
* Portions copyright (c) 2013 Stanford University and the Authors. *
* Authors: Peter Eastman *
* Contributors: *
* *
* Permission is hereby granted, free of charge, to any person obtaining a *
* copy of this software and associated documentation files (the "Software"), *
* to deal in the Software without restriction, including without limitation *
* the rights to use, copy, modify, merge, publish, distribute, sublicense, *
* and/or sell copies of the Software, and to permit persons to whom the *
* Software is furnished to do so, subject to the following conditions: *
* *
* The above copyright notice and this permission notice shall be included in *
* all copies or substantial portions of the Software. *
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *
* THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, *
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR *
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE *
* USE OR OTHER DEALINGS IN THE SOFTWARE. *
* -------------------------------------------------------------------------- */
#include "windowsExportCpu.h" #include "windowsExportCpu.h"
#include <pthread.h> #include "openmm/internal/ThreadPool.h"
#include <set> #include <set>
#include <utility> #include <utility>
#include <vector> #include <vector>
...@@ -11,13 +42,12 @@ namespace OpenMM { ...@@ -11,13 +42,12 @@ namespace OpenMM {
class OPENMM_EXPORT_CPU CpuNeighborList { class OPENMM_EXPORT_CPU CpuNeighborList {
public: public:
class ThreadData; class ThreadTask;
class Voxels; class Voxels;
static const int BlockSize; static const int BlockSize;
CpuNeighborList(); CpuNeighborList();
~CpuNeighborList();
void computeNeighborList(int numAtoms, const std::vector<float>& atomLocations, const std::vector<std::set<int> >& exclusions, void computeNeighborList(int numAtoms, const std::vector<float>& atomLocations, const std::vector<std::set<int> >& exclusions,
const float* periodicBoxSize, bool usePeriodic, float maxDistance); const float* periodicBoxSize, bool usePeriodic, float maxDistance, ThreadPool& threads);
int getNumBlocks() const; int getNumBlocks() const;
const std::vector<int>& getSortedAtoms() const; const std::vector<int>& getSortedAtoms() const;
const std::vector<int>& getBlockNeighbors(int blockIndex) const; const std::vector<int>& getBlockNeighbors(int blockIndex) const;
...@@ -25,25 +55,12 @@ public: ...@@ -25,25 +55,12 @@ public:
/** /**
* This routine contains the code executed by each thread. * This routine contains the code executed by each thread.
*/ */
void threadComputeNeighborList(ThreadPool& threads, int threadIndex);
void runThread(int index); void runThread(int index);
private: private:
/**
* This is called by the worker threads to wait until the master thread instructs them to advance.
*/
void threadWait();
/**
* This is called by the master thread to instruct all the worker threads to advance.
*/
void advanceThreads();
bool isDeleted;
int numThreads, waitCount;
std::vector<int> sortedAtoms; std::vector<int> sortedAtoms;
std::vector<std::vector<int> > blockNeighbors; std::vector<std::vector<int> > blockNeighbors;
std::vector<std::vector<char> > blockExclusions; std::vector<std::vector<char> > blockExclusions;
std::vector<pthread_t> thread;
std::vector<ThreadData*> threadData;
pthread_cond_t startCondition, endCondition;
pthread_mutex_t lock;
// The following variables are used to make information accessible to the individual threads. // The following variables are used to make information accessible to the individual threads.
float minx, maxx, miny, maxy, minz, maxz; float minx, maxx, miny, maxy, minz, maxz;
std::vector<std::pair<int, int> > atomBins; std::vector<std::pair<int, int> > atomBins;
...@@ -58,4 +75,4 @@ private: ...@@ -58,4 +75,4 @@ private:
} // namespace OpenMM } // namespace OpenMM
#endif // OPENMM_REFERENCE_NEIGHBORLIST_H_ #endif // OPENMM_CPU_NEIGHBORLIST_H_
...@@ -27,8 +27,8 @@ ...@@ -27,8 +27,8 @@
#include "CpuNeighborList.h" #include "CpuNeighborList.h"
#include "ReferencePairIxn.h" #include "ReferencePairIxn.h"
#include "openmm/internal/ThreadPool.h"
#include "openmm/internal/vectorize.h" #include "openmm/internal/vectorize.h"
#include <pthread.h>
#include <set> #include <set>
#include <utility> #include <utility>
#include <vector> #include <vector>
...@@ -38,7 +38,7 @@ namespace OpenMM { ...@@ -38,7 +38,7 @@ namespace OpenMM {
class CpuNonbondedForce { class CpuNonbondedForce {
public: public:
class ThreadData; class ComputeDirectTask;
/**--------------------------------------------------------------------------------------- /**---------------------------------------------------------------------------------------
...@@ -48,14 +48,6 @@ class CpuNonbondedForce { ...@@ -48,14 +48,6 @@ class CpuNonbondedForce {
CpuNonbondedForce(); CpuNonbondedForce();
/**---------------------------------------------------------------------------------------
Destructor
--------------------------------------------------------------------------------------- */
~CpuNonbondedForce();
/**--------------------------------------------------------------------------------------- /**---------------------------------------------------------------------------------------
Set the force to use a cutoff. Set the force to use a cutoff.
...@@ -145,16 +137,17 @@ class CpuNonbondedForce { ...@@ -145,16 +137,17 @@ class CpuNonbondedForce {
exclusions[atomIndex] contains the list of exclusions for that atom exclusions[atomIndex] contains the list of exclusions for that atom
@param forces force array (forces added) @param forces force array (forces added)
@param totalEnergy total energy @param totalEnergy total energy
@param threads the thread pool to use
--------------------------------------------------------------------------------------- */ --------------------------------------------------------------------------------------- */
void calculateDirectIxn(int numberOfAtoms, float* posq, const std::vector<std::pair<float, float> >& atomParameters, void calculateDirectIxn(int numberOfAtoms, float* posq, const std::vector<std::pair<float, float> >& atomParameters,
const std::vector<std::set<int> >& exclusions, float* forces, float* totalEnergy); const std::vector<std::set<int> >& exclusions, float* forces, float* totalEnergy, ThreadPool& threads);
/** /**
* This routine contains the code executed by each thread. * This routine contains the code executed by each thread.
*/ */
void runThread(int index, std::vector<float>& threadForce, double& threadEnergy); void threadComputeDirect(ThreadPool& threads, int threadIndex);
private: private:
bool cutoff; bool cutoff;
...@@ -171,12 +164,8 @@ private: ...@@ -171,12 +164,8 @@ private:
int meshDim[3]; int meshDim[3];
std::vector<float> ewaldScaleTable; std::vector<float> ewaldScaleTable;
float ewaldDX, ewaldDXInv; float ewaldDX, ewaldDXInv;
bool isDeleted; std::vector<std::vector<float> > threadForce;
int numThreads, waitCount; std::vector<double> threadEnergy;
std::vector<pthread_t> thread;
std::vector<ThreadData*> threadData;
pthread_cond_t startCondition, endCondition;
pthread_mutex_t lock;
// The following variables are used to make information accessible to the individual threads. // The following variables are used to make information accessible to the individual threads.
int numberOfAtoms; int numberOfAtoms;
float* posq; float* posq;
......
...@@ -260,7 +260,7 @@ double CpuCalcNonbondedForceKernel::execute(ContextImpl& context, bool includeFo ...@@ -260,7 +260,7 @@ double CpuCalcNonbondedForceKernel::execute(ContextImpl& context, bool includeFo
} }
} }
if (needRecompute) { if (needRecompute) {
neighborList.computeNeighborList(numParticles, posq, exclusions, floatBoxSize, periodic || ewald || pme, nonbondedCutoff+padding); neighborList.computeNeighborList(numParticles, posq, exclusions, floatBoxSize, periodic || ewald || pme, nonbondedCutoff+padding, threads);
lastPositions = posData; lastPositions = posData;
} }
nonbonded.setUseCutoff(nonbondedCutoff, neighborList, rfDielectric); nonbonded.setUseCutoff(nonbondedCutoff, neighborList, rfDielectric);
...@@ -279,7 +279,7 @@ double CpuCalcNonbondedForceKernel::execute(ContextImpl& context, bool includeFo ...@@ -279,7 +279,7 @@ double CpuCalcNonbondedForceKernel::execute(ContextImpl& context, bool includeFo
nonbonded.setUseSwitchingFunction(switchingDistance); nonbonded.setUseSwitchingFunction(switchingDistance);
float nonbondedEnergy = 0; float nonbondedEnergy = 0;
if (includeDirect) if (includeDirect)
nonbonded.calculateDirectIxn(numParticles, &posq[0], particleParams, exclusions, &forces[0], includeEnergy ? &nonbondedEnergy : NULL); nonbonded.calculateDirectIxn(numParticles, &posq[0], particleParams, exclusions, &forces[0], includeEnergy ? &nonbondedEnergy : NULL, threads);
if (includeReciprocal) { if (includeReciprocal) {
if (useOptimizedPme) { if (useOptimizedPme) {
PmeIO io(&posq[0], &forces[0], numParticles); PmeIO io(&posq[0], &forces[0], numParticles);
......
/* -------------------------------------------------------------------------- *
* OpenMM *
* -------------------------------------------------------------------------- *
* This is part of the OpenMM molecular simulation toolkit originating from *
* Simbios, the NIH National Center for Physics-Based Simulation of *
* Biological Structures at Stanford, funded under the NIH Roadmap for *
* Medical Research, grant U54 GM072970. See https://simtk.org. *
* *
* Portions copyright (c) 2013 Stanford University and the Authors. *
* Authors: Peter Eastman *
* Contributors: *
* *
* Permission is hereby granted, free of charge, to any person obtaining a *
* copy of this software and associated documentation files (the "Software"), *
* to deal in the Software without restriction, including without limitation *
* the rights to use, copy, modify, merge, publish, distribute, sublicense, *
* and/or sell copies of the Software, and to permit persons to whom the *
* Software is furnished to do so, subject to the following conditions: *
* *
* The above copyright notice and this permission notice shall be included in *
* all copies or substantial portions of the Software. *
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *
* THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, *
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR *
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE *
* USE OR OTHER DEALINGS IN THE SOFTWARE. *
* -------------------------------------------------------------------------- */
#include "CpuNeighborList.h" #include "CpuNeighborList.h"
#include "openmm/internal/hardware.h" #include "openmm/internal/hardware.h"
#include "openmm/internal/vectorize.h" #include "openmm/internal/vectorize.h"
...@@ -255,54 +286,21 @@ private: ...@@ -255,54 +286,21 @@ private:
vector<vector<vector<pair<float, int> > > > bins; vector<vector<vector<pair<float, int> > > > bins;
}; };
class CpuNeighborList::ThreadData { class CpuNeighborList::ThreadTask : public ThreadPool::Task {
public: public:
ThreadData(int index, CpuNeighborList& owner) : index(index), owner(owner) { ThreadTask(CpuNeighborList& owner) : owner(owner) {
}
void execute(ThreadPool& threads, int threadIndex) {
owner.threadComputeNeighborList(threads, threadIndex);
} }
int index;
CpuNeighborList& owner; CpuNeighborList& owner;
}; };
static void* threadBody(void* args) {
CpuNeighborList::ThreadData& data = *reinterpret_cast<CpuNeighborList::ThreadData*>(args);
data.owner.runThread(data.index);
delete &data;
return 0;
}
CpuNeighborList::CpuNeighborList() { CpuNeighborList::CpuNeighborList() {
isDeleted = false;
numThreads = getNumProcessors();
pthread_cond_init(&startCondition, NULL);
pthread_cond_init(&endCondition, NULL);
pthread_mutex_init(&lock, NULL);
thread.resize(numThreads);
pthread_mutex_lock(&lock);
waitCount = 0;
for (int i = 0; i < numThreads; i++) {
ThreadData* data = new ThreadData(i, *this);
threadData.push_back(data);
pthread_create(&thread[i], NULL, threadBody, data);
}
while (waitCount < numThreads)
pthread_cond_wait(&endCondition, &lock);
pthread_mutex_unlock(&lock);
}
CpuNeighborList::~CpuNeighborList() {
isDeleted = true;
pthread_mutex_lock(&lock);
pthread_cond_broadcast(&startCondition);
pthread_mutex_unlock(&lock);
for (int i = 0; i < (int) thread.size(); i++)
pthread_join(thread[i], NULL);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&startCondition);
pthread_cond_destroy(&endCondition);
} }
void CpuNeighborList::computeNeighborList(int numAtoms, const vector<float>& atomLocations, const vector<set<int> >& exclusions, void CpuNeighborList::computeNeighborList(int numAtoms, const vector<float>& atomLocations, const vector<set<int> >& exclusions,
const float* periodicBoxSize, bool usePeriodic, float maxDistance) { const float* periodicBoxSize, bool usePeriodic, float maxDistance, ThreadPool& threads) {
int numBlocks = (numAtoms+BlockSize-1)/BlockSize; int numBlocks = (numAtoms+BlockSize-1)/BlockSize;
blockNeighbors.resize(numBlocks); blockNeighbors.resize(numBlocks);
blockExclusions.resize(numBlocks); blockExclusions.resize(numBlocks);
...@@ -336,8 +334,9 @@ void CpuNeighborList::computeNeighborList(int numAtoms, const vector<float>& ato ...@@ -336,8 +334,9 @@ void CpuNeighborList::computeNeighborList(int numAtoms, const vector<float>& ato
// Sort the atoms based on a Hilbert curve. // Sort the atoms based on a Hilbert curve.
atomBins.resize(numAtoms); atomBins.resize(numAtoms);
pthread_mutex_lock(&lock); ThreadTask task(*this);
advanceThreads(); threads.execute(task);
threads.waitForThreads();
sort(atomBins.begin(), atomBins.end()); sort(atomBins.begin(), atomBins.end());
// Build the voxel hash. // Build the voxel hash.
...@@ -360,8 +359,8 @@ void CpuNeighborList::computeNeighborList(int numAtoms, const vector<float>& ato ...@@ -360,8 +359,8 @@ void CpuNeighborList::computeNeighborList(int numAtoms, const vector<float>& ato
// Signal the threads to start running and wait for them to finish. // Signal the threads to start running and wait for them to finish.
advanceThreads(); threads.resumeThreads();
pthread_mutex_unlock(&lock); threads.waitForThreads();
// Add padding atoms to fill up the last block. // Add padding atoms to fill up the last block.
...@@ -393,82 +392,55 @@ const std::vector<char>& CpuNeighborList::getBlockExclusions(int blockIndex) con ...@@ -393,82 +392,55 @@ const std::vector<char>& CpuNeighborList::getBlockExclusions(int blockIndex) con
} }
void CpuNeighborList::runThread(int index) { void CpuNeighborList::threadComputeNeighborList(ThreadPool& threads, int threadIndex) {
while (true) { // Compute the positions of atoms along the Hilbert curve.
// Wait for the signal to start running.
threadWait();
if (isDeleted)
break;
// Compute the positions of atoms along the Hilbert curve.
float binWidth = max(max(maxx-minx, maxy-miny), maxz-minz)/255.0f; float binWidth = max(max(maxx-minx, maxy-miny), maxz-minz)/255.0f;
float invBinWidth = 1.0f/binWidth; float invBinWidth = 1.0f/binWidth;
bitmask_t coords[3]; bitmask_t coords[3];
for (int i = index; i < numAtoms; i += numThreads) { int numThreads = threads.getNumThreads();
const float* pos = &atomLocations[4*i]; for (int i = threadIndex; i < numAtoms; i += numThreads) {
coords[0] = (bitmask_t) ((pos[0]-minx)*invBinWidth); const float* pos = &atomLocations[4*i];
coords[1] = (bitmask_t) ((pos[1]-miny)*invBinWidth); coords[0] = (bitmask_t) ((pos[0]-minx)*invBinWidth);
coords[2] = (bitmask_t) ((pos[2]-minz)*invBinWidth); coords[1] = (bitmask_t) ((pos[1]-miny)*invBinWidth);
int bin = (int) hilbert_c2i(3, 8, coords); coords[2] = (bitmask_t) ((pos[2]-minz)*invBinWidth);
atomBins[i] = pair<int, int>(bin, i); int bin = (int) hilbert_c2i(3, 8, coords);
} atomBins[i] = pair<int, int>(bin, i);
threadWait(); }
threads.syncThreads();
// Compute this thread's subset of neighbors.
int numBlocks = blockNeighbors.size();
vector<int> blockAtoms;
for (int i = index; i < numBlocks; i += numThreads) {
{
int firstIndex = BlockSize*i;
int atomsInBlock = min(BlockSize, numAtoms-firstIndex);
blockAtoms.resize(atomsInBlock);
for (int j = 0; j < atomsInBlock; j++)
blockAtoms[j] = sortedAtoms[firstIndex+j];
}
// Compute this thread's subset of neighbors.
int firstIndex = BlockSize*i;
fvec4 minPos(&atomLocations[4*sortedAtoms[firstIndex]]); int numBlocks = blockNeighbors.size();
fvec4 maxPos = minPos; vector<int> blockAtoms;
int atomsInBlock = min(BlockSize, numAtoms-firstIndex); for (int i = threadIndex; i < numBlocks; i += numThreads) {
for (int j = 1; j < atomsInBlock; j++) { // Find the atoms in this block and compute their bounding box.
fvec4 pos(&atomLocations[4*sortedAtoms[firstIndex+j]]);
minPos = min(minPos, pos); int firstIndex = BlockSize*i;
maxPos = max(maxPos, pos); int atomsInBlock = min(BlockSize, numAtoms-firstIndex);
} blockAtoms.resize(atomsInBlock);
voxels->getNeighbors(blockNeighbors[i], i, (maxPos+minPos)*0.5f, (maxPos-minPos)*0.5f, sortedAtoms, blockExclusions[i], maxDistance, blockAtoms, atomLocations); for (int j = 0; j < atomsInBlock; j++)
blockAtoms[j] = sortedAtoms[firstIndex+j];
// Record the exclusions for this block. fvec4 minPos(&atomLocations[4*sortedAtoms[firstIndex]]);
fvec4 maxPos = minPos;
for (int j = 0; j < atomsInBlock; j++) { for (int j = 1; j < atomsInBlock; j++) {
const set<int>& atomExclusions = (*exclusions)[sortedAtoms[firstIndex+j]]; fvec4 pos(&atomLocations[4*sortedAtoms[firstIndex+j]]);
char mask = 1<<j; minPos = min(minPos, pos);
for (int k = 0; k < (int) blockNeighbors[i].size(); k++) { maxPos = max(maxPos, pos);
int atomIndex = blockNeighbors[i][k];
if (atomExclusions.find(atomIndex) != atomExclusions.end())
blockExclusions[i][k] |= mask;
}
}
} }
} voxels->getNeighbors(blockNeighbors[i], i, (maxPos+minPos)*0.5f, (maxPos-minPos)*0.5f, sortedAtoms, blockExclusions[i], maxDistance, blockAtoms, atomLocations);
}
void CpuNeighborList::threadWait() { // Record the exclusions for this block.
pthread_mutex_lock(&lock);
waitCount++;
pthread_cond_signal(&endCondition);
pthread_cond_wait(&startCondition, &lock);
pthread_mutex_unlock(&lock);
}
void CpuNeighborList::advanceThreads() { for (int j = 0; j < atomsInBlock; j++) {
waitCount = 0; const set<int>& atomExclusions = (*exclusions)[sortedAtoms[firstIndex+j]];
pthread_cond_broadcast(&startCondition); char mask = 1<<j;
while (waitCount < numThreads) { for (int k = 0; k < (int) blockNeighbors[i].size(); k++) {
pthread_cond_wait(&endCondition, &lock); int atomIndex = blockNeighbors[i][k];
if (atomExclusions.find(atomIndex) != atomExclusions.end())
blockExclusions[i][k] |= mask;
}
}
} }
} }
......
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
#include "CpuNonbondedForce.h" #include "CpuNonbondedForce.h"
#include "ReferenceForce.h" #include "ReferenceForce.h"
#include "ReferencePME.h" #include "ReferencePME.h"
#include "openmm/internal/hardware.h"
#include "openmm/internal/SplineFitter.h" #include "openmm/internal/SplineFitter.h"
#include "openmm/internal/vectorize.h" #include "openmm/internal/vectorize.h"
...@@ -44,23 +43,16 @@ using namespace OpenMM; ...@@ -44,23 +43,16 @@ using namespace OpenMM;
const float CpuNonbondedForce::TWO_OVER_SQRT_PI = (float) (2/sqrt(PI_M)); const float CpuNonbondedForce::TWO_OVER_SQRT_PI = (float) (2/sqrt(PI_M));
const int CpuNonbondedForce::NUM_TABLE_POINTS = 1025; const int CpuNonbondedForce::NUM_TABLE_POINTS = 1025;
class CpuNonbondedForce::ThreadData { class CpuNonbondedForce::ComputeDirectTask : public ThreadPool::Task {
public: public:
ThreadData(int index, CpuNonbondedForce& owner) : index(index), owner(owner) { ComputeDirectTask(CpuNonbondedForce& owner) : owner(owner) {
}
void execute(ThreadPool& threads, int threadIndex) {
owner.threadComputeDirect(threads, threadIndex);
} }
int index;
CpuNonbondedForce& owner; CpuNonbondedForce& owner;
vector<float> threadForce;
double threadEnergy;
}; };
static void* threadBody(void* args) {
CpuNonbondedForce::ThreadData& data = *reinterpret_cast<CpuNonbondedForce::ThreadData*>(args);
data.owner.runThread(data.index, data.threadForce, data.threadEnergy);
delete &data;
return 0;
}
/**--------------------------------------------------------------------------------------- /**---------------------------------------------------------------------------------------
CpuNonbondedForce constructor CpuNonbondedForce constructor
...@@ -68,40 +60,6 @@ static void* threadBody(void* args) { ...@@ -68,40 +60,6 @@ static void* threadBody(void* args) {
--------------------------------------------------------------------------------------- */ --------------------------------------------------------------------------------------- */
CpuNonbondedForce::CpuNonbondedForce() : cutoff(false), useSwitch(false), periodic(false), ewald(false), pme(false) { CpuNonbondedForce::CpuNonbondedForce() : cutoff(false), useSwitch(false), periodic(false), ewald(false), pme(false) {
isDeleted = false;
numThreads = getNumProcessors();
pthread_cond_init(&startCondition, NULL);
pthread_cond_init(&endCondition, NULL);
pthread_mutex_init(&lock, NULL);
thread.resize(numThreads);
pthread_mutex_lock(&lock);
waitCount = 0;
for (int i = 0; i < numThreads; i++) {
ThreadData* data = new ThreadData(i, *this);
threadData.push_back(data);
pthread_create(&thread[i], NULL, threadBody, data);
}
while (waitCount < numThreads)
pthread_cond_wait(&endCondition, &lock);
pthread_mutex_unlock(&lock);
}
/**---------------------------------------------------------------------------------------
CpuNonbondedForce destructor
--------------------------------------------------------------------------------------- */
CpuNonbondedForce::~CpuNonbondedForce(){
isDeleted = true;
pthread_mutex_lock(&lock);
pthread_cond_broadcast(&startCondition);
pthread_mutex_unlock(&lock);
for (int i = 0; i < (int) thread.size(); i++)
pthread_join(thread[i], NULL);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&startCondition);
pthread_cond_destroy(&endCondition);
} }
/**--------------------------------------------------------------------------------------- /**---------------------------------------------------------------------------------------
...@@ -334,7 +292,7 @@ void CpuNonbondedForce::calculateReciprocalIxn(int numberOfAtoms, float* posq, v ...@@ -334,7 +292,7 @@ void CpuNonbondedForce::calculateReciprocalIxn(int numberOfAtoms, float* posq, v
void CpuNonbondedForce::calculateDirectIxn(int numberOfAtoms, float* posq, const vector<pair<float, float> >& atomParameters, void CpuNonbondedForce::calculateDirectIxn(int numberOfAtoms, float* posq, const vector<pair<float, float> >& atomParameters,
const vector<set<int> >& exclusions, float* forces, float* totalEnergy) { const vector<set<int> >& exclusions, float* forces, float* totalEnergy, ThreadPool& threads) {
// Record the parameters for the threads. // Record the parameters for the threads.
this->numberOfAtoms = numberOfAtoms; this->numberOfAtoms = numberOfAtoms;
...@@ -342,25 +300,25 @@ void CpuNonbondedForce::calculateDirectIxn(int numberOfAtoms, float* posq, const ...@@ -342,25 +300,25 @@ void CpuNonbondedForce::calculateDirectIxn(int numberOfAtoms, float* posq, const
this->atomParameters = &atomParameters[0]; this->atomParameters = &atomParameters[0];
this->exclusions = &exclusions[0]; this->exclusions = &exclusions[0];
includeEnergy = (totalEnergy != NULL); includeEnergy = (totalEnergy != NULL);
threadEnergy.resize(threads.getNumThreads());
threadForce.resize(threads.getNumThreads());
// Signal the threads to start running and wait for them to finish. // Signal the threads to start running and wait for them to finish.
pthread_mutex_lock(&lock); ComputeDirectTask task(*this);
waitCount = 0; threads.execute(task);
pthread_cond_broadcast(&startCondition); threads.waitForThreads();
while (waitCount < numThreads)
pthread_cond_wait(&endCondition, &lock);
pthread_mutex_unlock(&lock);
// Combine the results from all the threads. // Combine the results from all the threads.
double directEnergy = 0; double directEnergy = 0;
int numThreads = threads.getNumThreads();
for (int i = 0; i < numThreads; i++) for (int i = 0; i < numThreads; i++)
directEnergy += threadData[i]->threadEnergy; directEnergy += threadEnergy[i];
for (int i = 0; i < numberOfAtoms; i++) { for (int i = 0; i < numberOfAtoms; i++) {
fvec4 f(forces+4*i); fvec4 f(forces+4*i);
for (int j = 0; j < numThreads; j++) for (int j = 0; j < numThreads; j++)
f += fvec4(&threadData[j]->threadForce[4*i]); f += fvec4(&threadForce[j][4*i]);
f.store(forces+4*i); f.store(forces+4*i);
} }
...@@ -368,76 +326,65 @@ void CpuNonbondedForce::calculateDirectIxn(int numberOfAtoms, float* posq, const ...@@ -368,76 +326,65 @@ void CpuNonbondedForce::calculateDirectIxn(int numberOfAtoms, float* posq, const
*totalEnergy += (float) directEnergy; *totalEnergy += (float) directEnergy;
} }
void CpuNonbondedForce::threadComputeDirect(ThreadPool& threads, int threadIndex) {
// Compute this thread's subset of interactions.
int numThreads = threads.getNumThreads();
threadEnergy[threadIndex] = 0;
double* energyPtr = (includeEnergy ? &threadEnergy[threadIndex] : NULL);
threadForce[threadIndex].resize(4*numberOfAtoms, 0.0f);
float* forces = &threadForce[threadIndex][0];
for (int i = 0; i < 4*numberOfAtoms; i++)
forces[i] = 0.0f;
fvec4 boxSize(periodicBoxSize[0], periodicBoxSize[1], periodicBoxSize[2], 0);
fvec4 invBoxSize((1/periodicBoxSize[0]), (1/periodicBoxSize[1]), (1/periodicBoxSize[2]), 0);
if (ewald || pme) {
// Compute the interactions from the neighbor list.
for (int i = threadIndex; i < neighborList->getNumBlocks(); i += numThreads)
calculateBlockEwaldIxn(i, forces, energyPtr, boxSize, invBoxSize);
// Now subtract off the exclusions, since they were implicitly included in the reciprocal space sum.
void CpuNonbondedForce::runThread(int index, vector<float>& threadForce, double& threadEnergy) {
while (true) {
// Wait for the signal to start running.
pthread_mutex_lock(&lock);
waitCount++;
pthread_cond_signal(&endCondition);
pthread_cond_wait(&startCondition, &lock);
pthread_mutex_unlock(&lock);
if (isDeleted)
break;
// Compute this thread's subset of interactions.
threadEnergy = 0;
double* energyPtr = (includeEnergy ? &threadEnergy : NULL);
threadForce.resize(4*numberOfAtoms, 0.0f);
for (int i = 0; i < 4*numberOfAtoms; i++)
threadForce[i] = 0.0f;
fvec4 boxSize(periodicBoxSize[0], periodicBoxSize[1], periodicBoxSize[2], 0); fvec4 boxSize(periodicBoxSize[0], periodicBoxSize[1], periodicBoxSize[2], 0);
fvec4 invBoxSize((1/periodicBoxSize[0]), (1/periodicBoxSize[1]), (1/periodicBoxSize[2]), 0); fvec4 invBoxSize((1/periodicBoxSize[0]), (1/periodicBoxSize[1]), (1/periodicBoxSize[2]), 0);
if (ewald || pme) { for (int i = threadIndex; i < numberOfAtoms; i += numThreads)
// Compute the interactions from the neighbor list. for (set<int>::const_iterator iter = exclusions[i].begin(); iter != exclusions[i].end(); ++iter) {
if (*iter > i) {
for (int i = index; i < neighborList->getNumBlocks(); i += numThreads) int j = *iter;
calculateBlockEwaldIxn(i, &threadForce[0], energyPtr, boxSize, invBoxSize); fvec4 deltaR;
fvec4 posI(posq+4*i);
// Now subtract off the exclusions, since they were implicitly included in the reciprocal space sum. fvec4 posJ(posq+4*j);
float r2;
fvec4 boxSize(periodicBoxSize[0], periodicBoxSize[1], periodicBoxSize[2], 0); getDeltaR(posJ, posI, deltaR, r2, false, boxSize, invBoxSize);
fvec4 invBoxSize((1/periodicBoxSize[0]), (1/periodicBoxSize[1]), (1/periodicBoxSize[2]), 0); float r = sqrtf(r2);
for (int i = index; i < numberOfAtoms; i += numThreads) float inverseR = 1/r;
for (set<int>::const_iterator iter = exclusions[i].begin(); iter != exclusions[i].end(); ++iter) { float chargeProd = ONE_4PI_EPS0*posq[4*i+3]*posq[4*j+3];
if (*iter > i) { float alphaR = alphaEwald*r;
int j = *iter; float erfcAlphaR = erfcApprox(alphaR)[0];
fvec4 deltaR; float dEdR = (float) (chargeProd * inverseR * inverseR * inverseR);
fvec4 posI(posq+4*i); dEdR = (float) (dEdR * (1.0f-erfcAlphaR-TWO_OVER_SQRT_PI*alphaR*exp(-alphaR*alphaR)));
fvec4 posJ(posq+4*j); fvec4 result = deltaR*dEdR;
float r2; (fvec4(forces+4*i)-result).store(forces+4*i);
getDeltaR(posJ, posI, deltaR, r2, false, boxSize, invBoxSize); (fvec4(forces+4*j)+result).store(forces+4*j);
float r = sqrtf(r2); if (includeEnergy)
float inverseR = 1/r; threadEnergy[threadIndex] -= chargeProd*inverseR*(1.0f-erfcAlphaR);
float chargeProd = ONE_4PI_EPS0*posq[4*i+3]*posq[4*j+3];
float alphaR = alphaEwald*r;
float erfcAlphaR = erfcApprox(alphaR)[0];
float dEdR = (float) (chargeProd * inverseR * inverseR * inverseR);
dEdR = (float) (dEdR * (1.0f-erfcAlphaR-TWO_OVER_SQRT_PI*alphaR*exp(-alphaR*alphaR)));
fvec4 result = deltaR*dEdR;
(fvec4(&threadForce[4*i])-result).store(&threadForce[4*i]);
(fvec4(&threadForce[4*j])+result).store(&threadForce[4*j]);
if (includeEnergy)
threadEnergy -= chargeProd*inverseR*(1.0f-erfcAlphaR);
}
} }
} }
else if (cutoff) { }
// Compute the interactions from the neighbor list. else if (cutoff) {
// Compute the interactions from the neighbor list.
for (int i = index; i < neighborList->getNumBlocks(); i += numThreads) for (int i = threadIndex; i < neighborList->getNumBlocks(); i += numThreads)
calculateBlockIxn(i, &threadForce[0], energyPtr, boxSize, invBoxSize); calculateBlockIxn(i, forces, energyPtr, boxSize, invBoxSize);
} }
else { else {
// Loop over all atom pairs // Loop over all atom pairs
for (int i = index; i < numberOfAtoms; i += numThreads){ for (int i = threadIndex; i < numberOfAtoms; i += numThreads){
for (int j = i+1; j < numberOfAtoms; j++) for (int j = i+1; j < numberOfAtoms; j++)
if (exclusions[j].find(i) == exclusions[j].end()) if (exclusions[j].find(i) == exclusions[j].end())
calculateOneIxn(i, j, &threadForce[0], energyPtr, boxSize, invBoxSize); calculateOneIxn(i, j, forces, energyPtr, boxSize, invBoxSize);
}
} }
} }
} }
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
*/ */
#include "openmm/internal/AssertionUtilities.h" #include "openmm/internal/AssertionUtilities.h"
#include "openmm/internal/ThreadPool.h"
#include "CpuNeighborList.h" #include "CpuNeighborList.h"
#include "CpuPlatform.h" #include "CpuPlatform.h"
#include "sfmt/SFMT.h" #include "sfmt/SFMT.h"
...@@ -63,8 +64,9 @@ void testNeighborList(bool periodic) { ...@@ -63,8 +64,9 @@ void testNeighborList(bool periodic) {
exclusions[i-j].insert(i); exclusions[i-j].insert(i);
} }
} }
ThreadPool threads;
CpuNeighborList neighborList; CpuNeighborList neighborList;
neighborList.computeNeighborList(numParticles, positions, exclusions, boxSize, periodic, cutoff); neighborList.computeNeighborList(numParticles, positions, exclusions, boxSize, periodic, cutoff, threads);
// Convert the neighbor list to a set for faster lookup. // Convert the neighbor list to a set for faster lookup.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment