Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
tsoc
openmm
Commits
05ad28f2
Commit
05ad28f2
authored
May 28, 2026
by
one
Browse files
Merge OpenMM master into dtk26.04
parents
97239ca6
fc21fd19
Changes
62
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
286 additions
and
1 deletion
+286
-1
wrappers/python/tests/TestExpandedEnsembleSampler.py
wrappers/python/tests/TestExpandedEnsembleSampler.py
+176
-0
wrappers/python/tests/TestReplicaExchangeSampler.py
wrappers/python/tests/TestReplicaExchangeSampler.py
+110
-1
No files found.
wrappers/python/tests/TestExpandedEnsembleSampler.py
0 → 100644
View file @
05ad28f2
from
openmm
import
*
from
openmm.app
import
*
from
openmm.unit
import
*
import
numpy
as
np
import
os
import
tempfile
import
unittest
class
TestExpandedEnsembleSampler
(
unittest
.
TestCase
):
def
testTemperature
(
self
):
"""Test a set of states that differ in temperature."""
system
=
System
()
system
.
addParticle
(
1.0
)
force
=
CustomExternalForce
(
'x*x+y*y+z*z'
)
force
.
addParticle
(
0
)
system
.
addForce
(
force
)
states
=
[{
'temperature'
:
t
*
kelvin
}
for
t
in
np
.
geomspace
(
300.0
,
600.0
,
5
)]
for
reinitialize
in
[
False
,
True
]:
integrator
=
LangevinIntegrator
(
300
*
kelvin
,
10
/
picosecond
,
0.01
*
picosecond
)
simulation
=
Simulation
(
Topology
(),
system
,
integrator
,
Platform
.
getPlatform
(
'Reference'
))
simulation
.
context
.
setPositions
([
Vec3
(
0
,
0
,
0
)])
sampler
=
ExpandedEnsembleSampler
(
states
,
simulation
,
10
,
reinitialize
)
# Run for a little while to let the weights stabilize.
sampler
.
step
(
10000
)
# Run for a while and record the states and energies.
energies
=
[[]
for
_
in
range
(
len
(
states
))]
iterations
=
20000
for
i
in
range
(
iterations
):
sampler
.
step
(
10
)
energies
[
sampler
.
currentStateIndex
].
append
(
simulation
.
context
.
getState
(
energy
=
True
).
getPotentialEnergy
())
# Check that it spent roughly equal time in each state, and that the energies are correct.
for
energy
,
state
in
zip
(
energies
,
states
):
n
=
len
(
energy
)
assert
iterations
/
10
<
n
<
iterations
/
2
average
=
sum
(
energy
)
/
n
expected
=
1.5
*
(
state
[
'temperature'
]
*
MOLAR_GAS_CONSTANT_R
)
self
.
assertTrue
(
0.7
<
average
/
expected
<
1.3
)
def
testParameter
(
self
):
"""Test a set of states that differ in a force parameter."""
system
=
System
()
system
.
addParticle
(
1.0
)
force
=
CustomExternalForce
(
'0.5*k*x*x'
)
force
.
addGlobalParameter
(
'k'
,
1.0
)
force
.
addParticle
(
0
)
system
.
addForce
(
force
)
states
=
[{
'k'
:
k
*
kilojoules_per_mole
/
(
nanometer
**
2
)}
for
k
in
np
.
geomspace
(
10.0
,
100.0
,
5
)]
for
reinitialize
in
[
False
,
True
]:
integrator
=
LangevinIntegrator
(
300
*
kelvin
,
10
/
picosecond
,
0.01
*
picosecond
)
simulation
=
Simulation
(
Topology
(),
system
,
integrator
,
Platform
.
getPlatform
(
'Reference'
))
simulation
.
context
.
setPositions
([
Vec3
(
0
,
0
,
0
)])
sampler
=
ExpandedEnsembleSampler
(
states
,
simulation
,
10
,
reinitialize
)
# Run for a little while to let the weights stabilize.
sampler
.
step
(
10000
)
# Run for a while and record the states and displacements.
r2
=
[[]
for
_
in
range
(
len
(
states
))]
iterations
=
20000
for
i
in
range
(
iterations
):
sampler
.
step
(
10
)
x
=
simulation
.
context
.
getState
(
positions
=
True
).
getPositions
()[
0
][
0
]
r2
[
sampler
.
currentStateIndex
].
append
(
x
*
x
)
# Check that it spent roughly equal time in each state, and that the energies are correct.
expected
=
0.5
*
integrator
.
getTemperature
()
*
MOLAR_GAS_CONSTANT_R
for
i
in
range
(
len
(
r2
)):
n
=
len
(
r2
[
i
])
assert
iterations
/
10
<
n
<
iterations
/
2
average
=
0.5
*
states
[
i
][
'k'
]
*
sum
(
r2
[
i
])
/
n
self
.
assertTrue
(
0.7
<
average
/
expected
<
1.3
)
def
testReporter
(
self
):
"""Test reporting output from an expanded ensemble simulation."""
system
=
System
()
force
=
CustomExternalForce
(
'0.5*k*(x*x+y*y+z*z)'
)
force
.
addGlobalParameter
(
'k'
,
1.0
)
system
.
addForce
(
force
)
for
i
in
range
(
3
):
system
.
addParticle
(
1.0
)
force
.
addParticle
(
0
)
states
=
[{
'k'
:
k
}
for
k
in
(
200.0
,
300.0
,
400.0
)]
with
tempfile
.
NamedTemporaryFile
(
mode
=
'w'
,
delete
=
False
)
as
logFile
:
with
tempfile
.
NamedTemporaryFile
(
mode
=
'w'
,
delete
=
False
)
as
energyFile
:
with
tempfile
.
NamedTemporaryFile
(
mode
=
'w'
,
delete
=
False
)
as
checkpointFile
:
integrator
=
LangevinIntegrator
(
300
*
kelvin
,
1
/
picosecond
,
0.001
*
picosecond
)
simulation
=
Simulation
(
Topology
(),
system
,
integrator
,
Platform
.
getPlatform
(
'Reference'
))
simulation
.
context
.
setPositions
([
Vec3
(
0
,
0
,
0
)]
*
3
)
sampler
=
ExpandedEnsembleSampler
(
states
,
simulation
,
5
,
reportInterval
=
5
,
logFile
=
logFile
.
name
,
energyFile
=
energyFile
.
name
,
checkpointFile
=
checkpointFile
.
name
)
# Run a simulation.
step
=
[]
iteration
=
[]
stateIndex
=
[]
weights
=
[]
energies
=
[]
def
runIteration
():
simulation
.
step
(
5
)
step
.
append
(
simulation
.
currentStep
)
iteration
.
append
(
sampler
.
currentIteration
)
stateIndex
.
append
(
sampler
.
currentStateIndex
)
weights
.
append
(
sampler
.
weights
)
kT
=
MOLAR_GAS_CONSTANT_R
*
simulation
.
integrator
.
getTemperature
()
energies
.
append
(
sampler
.
_sampler
.
computeAllEnergies
()
/
kT
)
sampler
.
_sampler
.
applyState
(
sampler
.
currentStateIndex
)
try
:
for
_
in
range
(
4
):
runIteration
()
except
PermissionError
:
# tempfile is kind of broken on Windows. Just skip the test.
return
state1
=
simulation
.
context
.
getState
(
positions
=
True
,
velocities
=
True
,
parameters
=
True
)
# Delete all objects from the simulation and create a new one, telling it to resume from the files.
del
sampler
del
simulation
del
integrator
integrator
=
LangevinIntegrator
(
300
*
kelvin
,
1
/
picosecond
,
0.001
*
picosecond
)
simulation
=
Simulation
(
Topology
(),
system
,
integrator
,
Platform
.
getPlatform
(
'Reference'
))
sampler
=
ExpandedEnsembleSampler
(
states
,
simulation
,
5
,
reportInterval
=
5
,
logFile
=
logFile
.
name
,
energyFile
=
energyFile
.
name
,
checkpointFile
=
checkpointFile
.
name
,
resume
=
True
)
# Make sure everything was loaded correctly.
state2
=
simulation
.
context
.
getState
(
positions
=
True
,
velocities
=
True
,
parameters
=
True
)
self
.
assertEqual
(
XmlSerializer
.
serialize
(
state1
),
XmlSerializer
.
serialize
(
state2
))
self
.
assertEqual
(
step
[
-
1
],
simulation
.
currentStep
)
self
.
assertEqual
(
iteration
[
-
1
],
sampler
.
currentIteration
)
self
.
assertEqual
(
stateIndex
[
-
1
],
sampler
.
currentStateIndex
)
self
.
assertEqual
(
weights
[
-
1
],
sampler
.
weights
)
# Generate some more output.
for
_
in
range
(
4
):
runIteration
()
# Check the log file.
logFile
.
close
()
with
open
(
logFile
.
name
)
as
input
:
lines
=
input
.
readlines
()[
1
:]
os
.
remove
(
logFile
.
name
)
self
.
assertEqual
(
8
,
len
(
lines
))
for
i
,
line
in
enumerate
(
lines
):
fields
=
line
.
split
(
','
)
self
.
assertEqual
(
int
(
fields
[
0
]),
step
[
i
])
self
.
assertEqual
(
int
(
fields
[
1
]),
iteration
[
i
])
self
.
assertEqual
(
int
(
fields
[
2
]),
stateIndex
[
i
])
self
.
assertTrue
(
np
.
allclose
([
float
(
x
)
for
x
in
fields
[
3
:]],
weights
[
i
]))
# Check the energy file.
energyFile
.
close
()
with
open
(
energyFile
.
name
)
as
input
:
lines
=
input
.
readlines
()[
1
:]
os
.
remove
(
energyFile
.
name
)
self
.
assertEqual
(
8
,
len
(
lines
))
for
i
,
line
in
enumerate
(
lines
):
fields
=
line
.
split
(
','
)
self
.
assertEqual
(
int
(
fields
[
0
]),
step
[
i
])
self
.
assertTrue
(
np
.
allclose
([
float
(
x
)
for
x
in
fields
[
1
:]],
energies
[
i
]))
wrappers/python/tests/TestReplicaExchangeSampler.py
View file @
05ad28f2
from
openmm
import
*
from
openmm.app
import
*
from
openmm.unit
import
*
from
openmm.app.internal.xtc_utils
import
read_xtc
import
numpy
as
np
import
os
import
tempfile
import
unittest
class
TestReplicaExchangeSampler
(
unittest
.
TestCase
):
...
...
@@ -16,6 +19,7 @@ class TestReplicaExchangeSampler(unittest.TestCase):
for
reinitialize
in
[
False
,
True
]:
integrator
=
LangevinIntegrator
(
300
*
kelvin
,
10
/
picosecond
,
0.01
*
picosecond
)
simulation
=
Simulation
(
Topology
(),
system
,
integrator
,
Platform
.
getPlatform
(
'Reference'
))
simulation
.
context
.
setPositions
([
Vec3
(
0
,
0
,
0
)])
repex
=
ReplicaExchangeSampler
(
states
,
simulation
,
20
,
reinitialize
)
energies
=
[
0.0
*
kilojoules_per_mole
]
*
len
(
states
)
exchanged
=
False
...
...
@@ -48,10 +52,11 @@ class TestReplicaExchangeSampler(unittest.TestCase):
force
.
addGlobalParameter
(
'k'
,
1.0
)
force
.
addParticle
(
0
)
system
.
addForce
(
force
)
states
=
[{
'k'
:
k
*
kilojoules_per_mole
/
(
nanometer
**
2
)}
for
k
in
np
.
geomspace
(
5
.0
,
100.0
,
5
)]
states
=
[{
'k'
:
k
*
kilojoules_per_mole
/
(
nanometer
**
2
)}
for
k
in
np
.
geomspace
(
10
.0
,
100.0
,
5
)]
for
reinitialize
in
[
False
,
True
]:
integrator
=
LangevinIntegrator
(
300
*
kelvin
,
10
/
picosecond
,
0.01
*
picosecond
)
simulation
=
Simulation
(
Topology
(),
system
,
integrator
,
Platform
.
getPlatform
(
'Reference'
))
simulation
.
context
.
setPositions
([
Vec3
(
0
,
0
,
0
)])
repex
=
ReplicaExchangeSampler
(
states
,
simulation
,
20
,
reinitialize
)
r2
=
[
0.0
*
nanometer
**
2
]
*
len
(
states
)
exchanged
=
False
...
...
@@ -74,3 +79,107 @@ class TestReplicaExchangeSampler(unittest.TestCase):
for
i
in
range
(
len
(
r2
)):
average
=
0.5
*
states
[
i
][
'k'
]
*
r2
[
i
]
/
steps
self
.
assertTrue
(
0.7
<
average
/
expected
<
1.3
)
def
testReporter
(
self
):
"""Test reporting output from a replica exchange simulation."""
# Set up a replica exchange simulation.
pdb
=
PDBFile
(
'systems/alanine-dipeptide-implicit.pdb'
)
ff
=
ForceField
(
'amber19-all.xml'
)
system
=
ff
.
createSystem
(
pdb
.
topology
)
integrator
=
LangevinIntegrator
(
300
*
kelvin
,
1
/
picosecond
,
0.001
*
picosecond
)
simulation
=
Simulation
(
pdb
.
topology
,
system
,
integrator
,
Platform
.
getPlatform
(
'Reference'
))
simulation
.
context
.
setPositions
(
pdb
.
positions
)
states
=
[{
'temperature'
:
t
*
kelvin
}
for
t
in
[
300.0
,
320.0
,
340.0
]]
sampler
=
ReplicaExchangeSampler
(
states
,
simulation
,
5
)
# Add reporters to it.
stateIndices
=
[]
energies
=
[]
conformations
=
[]
def
report
(
sampler
):
if
sampler
.
currentIteration
%
3
==
0
:
stateIndices
.
append
(
sampler
.
replicaStateIndex
[:])
energies
.
append
(
sampler
.
replicaStateEnergy
[:])
conformations
.
append
(
sampler
.
replicaConformation
[:])
with
tempfile
.
TemporaryDirectory
()
as
directory
:
sampler
.
reporters
.
append
(
ReplicaExchangeReporter
(
directory
,
3
,
sampler
,
trajectoryPerState
=
True
,
trajectoryPerReplica
=
True
,
energy
=
True
))
sampler
.
reporters
.
append
(
report
)
# Generate some output.
sampler
.
simulate
(
15
)
# Delete all objects from the simulation and create a new one, telling it to resume from the files.
del
sampler
del
simulation
del
integrator
integrator
=
LangevinIntegrator
(
300
*
kelvin
,
1
/
picosecond
,
0.001
*
picosecond
)
simulation
=
Simulation
(
pdb
.
topology
,
system
,
integrator
,
Platform
.
getPlatform
(
'Reference'
))
sampler
=
ReplicaExchangeSampler
(
states
,
simulation
,
5
)
sampler
.
reporters
.
append
(
ReplicaExchangeReporter
(
directory
,
3
,
sampler
,
trajectoryPerState
=
True
,
trajectoryPerReplica
=
True
,
energy
=
True
,
resume
=
True
))
sampler
.
reporters
.
append
(
report
)
# Check that it loaded the checkpoints correctly.
for
i
in
range
(
len
(
states
)):
xml1
=
XmlSerializer
.
serialize
(
sampler
.
replicaConformation
[
i
])
xml2
=
open
(
os
.
path
.
join
(
directory
,
f
'checkpoint_
{
i
}
.xml'
)).
read
()
self
.
assertEqual
(
xml1
,
xml2
)
# Generate some more output.
sampler
.
simulate
(
15
)
# Check the log file.
with
open
(
os
.
path
.
join
(
directory
,
'log.csv'
))
as
input
:
lines
=
input
.
readlines
()[
1
:]
for
i
,
line
in
enumerate
(
lines
):
fields
=
[
int
(
x
)
for
x
in
line
.
split
(
','
)]
self
.
assertEqual
(
fields
[
0
],
3
*
(
i
+
1
))
self
.
assertEqual
(
fields
[
1
],
15
*
(
i
+
1
))
for
j
in
range
(
len
(
states
)):
self
.
assertEqual
(
stateIndices
[
i
][
j
],
fields
[
j
+
2
])
# Check the energy file.
energy
=
np
.
loadtxt
(
os
.
path
.
join
(
directory
,
'energy.csv'
),
delimiter
=
','
).
reshape
(
-
1
,
len
(
states
),
len
(
states
))
for
i
in
range
(
10
):
for
j
in
range
(
len
(
states
)):
for
k
in
range
(
len
(
states
)):
self
.
assertAlmostEqual
(
energy
[
i
][
j
][
k
],
energies
[
i
][
j
][
k
]
/
sampler
.
_kT
[
k
])
# Check the trajectory files.
for
i
in
range
(
len
(
states
)):
# Check the per-replica trajectories.
coords_read
,
box_read
,
time
,
step
=
read_xtc
(
os
.
path
.
join
(
directory
,
f
'replica_
{
i
}
.xtc'
).
encode
(
'utf-8'
))
self
.
assertTrue
(
np
.
allclose
(
step
,
np
.
linspace
(
15
,
150
,
10
)))
self
.
assertTrue
(
np
.
allclose
(
time
,
0.005
*
np
.
linspace
(
3
,
30
,
10
)))
for
j
in
range
(
10
):
conf
=
conformations
[
j
][
i
].
getPositions
().
value_in_unit
(
nanometers
)
self
.
assertTrue
(
np
.
allclose
(
conf
,
coords_read
[:,:,
j
],
atol
=
0.001
))
# Check the per-state trajectories.
coords_read
,
box_read
,
time
,
step
=
read_xtc
(
os
.
path
.
join
(
directory
,
f
'state_
{
i
}
.xtc'
).
encode
(
'utf-8'
))
self
.
assertTrue
(
np
.
allclose
(
step
,
np
.
linspace
(
15
,
150
,
10
)))
self
.
assertTrue
(
np
.
allclose
(
time
,
0.005
*
np
.
linspace
(
3
,
30
,
10
)))
for
j
in
range
(
10
):
replica
=
stateIndices
[
j
].
index
(
i
)
conf
=
conformations
[
j
][
replica
].
getPositions
().
value_in_unit
(
nanometers
)
self
.
assertTrue
(
np
.
allclose
(
conf
,
coords_read
[:,:,
j
],
atol
=
0.001
))
# Creating a new reporter for the same directory should fail.
with
self
.
assertRaises
(
ValueError
):
ReplicaExchangeReporter
(
directory
,
3
,
sampler
)
del
sampler
del
simulation
del
integrator
Prev
1
2
3
4
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment