Unverified Commit 46376ea3 authored by Peter Eastman's avatar Peter Eastman Committed by GitHub
Browse files

Created ReplicaExchangeReporter (#5261)

* Created ReplicaExchangeReporter

* Documentation for ReplicaExchangeReporter
parent efd89169
......@@ -38,6 +38,7 @@ from .charmmpsffile import CharmmPsfFile, CharmmPSFWarning
from .simulatedtempering import SimulatedTempering
from .metadynamics import Metadynamics, BiasVariable
from .replicaexchangesampler import ReplicaExchangeSampler
from .replicaexchangereporter import ReplicaExchangeReporter
# Enumerated values
......
"""
replicaexchangereporter.py: Writes output for replica exchange simulations
This is part of the OpenMM molecular simulation toolkit.
See https://openmm.org/development.
Portions copyright (c) 2026 Stanford University and the Authors.
Authors: Peter Eastman
Contributors:
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import openmm as mm
import openmm.app as app
import openmm.unit as unit
from openmm.app.internal import safesave
import os
class ReplicaExchangeReporter(object):
"""
This is a reporter for use with ReplicaExchangeSampler. It outputs the most important information required for
nearly all replica exchange simulations. It must be added to the sampler's list of reporters:
>>> sampler.reporters.append(ReplicaExchangeReporter(directory, interval, sampler))
As the simulation runs, it creates the following files in a user specified directory.
- A CSV file containing a log of what state each replica was in at each iteration.
- A set of XML files containing serialized State objects for each replica. These serve as checkpoints, allowing a
simulation to be resumed later.
- (optional) A trajectory file for each thermodynamic state. The coordinates in these files jump discontinuously
whenever an exchange happens.
- (optional) A trajectory file for each replica. The coordinates in these files are continuous, but the
thermodynamic states they explore change with time.
- (optional) A CSV file containing the reduced energy (potential energy divided by kT) of every replica in every
state at each iteration. This information is useful for calculating free energies.
The reduced energies are written as a matrix in flattened order for each iteration. You can load the file with
NumPy using the command
>>> energy = np.loadtxt('energy.csv', delimiter=',').reshape(-1, n_states, n_states)
`energy[i][j][k]` is the reduced energy of state k for replica j in iteration i.
To resume a simulation from the saved files, pass `resume=True` to the constructor. It will load all necessary
information and configure the ReplicaExchangeSampler correctly.
"""
def __init__(self, directory: str, reportInterval: int, sampler: "app.ReplicaExchangeSampler",
trajectoryPerState: bool = True, trajectoryPerReplica: bool = False, trajectoryFormat: str = 'xtc',
enforcePeriodicBox: bool | None = None, energy: bool = False, resume: bool = False):
"""
Create a ReplicaExchangeReporter.
Parameters
----------
directory: str
The path to the directory where the files will be written. The directory will be created if it does not
already exist. The directory must be empty unless resume is True, in which case it must contain the files
from an earlier simulation.
reportInterval: int
The frequency at which to write output, measured in iterations
sampler: openmm.app.ReplicaExchangeSampler
The sampler this reporter will be added to
trajectoryPerState: bool
If True, a trajectory file will be saved for each thermodynamic state.
trajectoryPerReplica: bool
If True, a trajectory file will be saved for each replica.
trajectoryFormat: str
The format in which to save trajectories. Supported options are 'xtc' and 'dcd'.
enforcePeriodicBox: bool
Specifies whether particle positions should be translated so the center of every molecule
lies in the same periodic box. If None (the default), it will automatically decide whether
to translate molecules based on whether the system being simulated uses periodic boundary
conditions.
energy: bool
If True, a CSV file will be saved containing reduced energies.
resume: bool
Specifies whether to resume an earlier simulation. If True, the checkpoint and log data will be loaded into
the ReplicaExchangeSampler, and future output will be appended to the existing files.
"""
trajectoryFormat = trajectoryFormat.lower()
self.directory = directory
self.reportInterval = reportInterval
self.trajectoryPerState = trajectoryPerState
self.trajectoryPerReplica = trajectoryPerReplica
self.format = format
self._log = None
self._energy = None
numStates = len(sampler.states)
# Validate the inputs and create the output directory if necessary.
if trajectoryFormat not in ('xtc', 'dcd'):
raise ValueError(f'Unsupported trajectory format: {trajectoryFormat}. Allowed values are "xtc" and "dcd".')
if resume:
if not os.path.isdir(directory):
raise ValueError(f'Cannot resume because the directory does not exist: {directory}')
def checkExists(filename):
if not os.path.isfile(os.path.join(directory, filename)):
raise ValueError(f'Cannot resume because the file {filename} does not exist.')
checkExists('log.csv')
if energy:
checkExists('energy.csv')
for i in range(numStates):
checkExists(f'checkpoint_{i}.xml')
if trajectoryPerState:
checkExists(f'state_{i}.{trajectoryFormat}')
if trajectoryPerReplica:
checkExists(f'replica_{i}.{trajectoryFormat}')
else:
if os.path.isdir(directory):
if len(os.listdir(directory)) > 0:
raise ValueError(f'The directory {directory} is not empty.')
else:
os.makedirs(directory)
# Load state assignments and checkpoints if resuming another simulation.
if resume:
for i in range(numStates):
with open(os.path.join(directory, f'checkpoint_{i}.xml')) as input:
sampler.replicaConformation[i] = mm.XmlSerializer.deserialize(input.read())
log = None
with open(os.path.join(directory, 'log.csv')) as input:
for line in input:
log = line
if log is None:
raise ValueError('Cannot resume because log file is empty.')
fields = log.split(',')
sampler.replicaStateIndex = [int(x) for x in fields[2:(numStates+2)]]
sampler._previousReplicaStateIndex = sampler.replicaStateIndex[:]
sampler.currentIteration = int(fields[0])
# Create reporters and open files for writing output.
def createReporter(filename):
path = os.path.join(directory, filename)
interval = reportInterval*sampler.stepsPerIteration
if trajectoryFormat == 'xtc':
return app.XTCReporter(path, reportInterval*sampler.stepsPerIteration, append=resume, enforcePeriodicBox=enforcePeriodicBox)
return app.DCDReporter(path, interval, append=resume, enforcePeriodicBox=enforcePeriodicBox)
self._stateReporters = []
self._replicaReporters = []
for i in range(numStates):
if trajectoryPerState:
self._stateReporters.append(createReporter(f'state_{i}.{trajectoryFormat}'))
if trajectoryPerReplica:
self._replicaReporters.append(createReporter(f'replica_{i}.{trajectoryFormat}'))
self._log = open(os.path.join(directory, 'log.csv'), 'a' if resume else 'w')
if not resume:
print(','.join(['Iteration', 'Step']+[f'Replica_{i}_State' for i in range(numStates)]), file=self._log)
self._log.flush()
if energy:
self._energy = open(os.path.join(directory, 'energy.csv'), 'a' if resume else 'w')
def __del__(self):
if self._log is not None:
self._log.close()
if self._energy is not None:
self._energy.close()
def __call__(self, sampler: "app.ReplicaExchangeSampler"):
"""This is invoked by the ReplicaExchangeSampler at the end of every iteration to generate output."""
if sampler.currentIteration % self.reportInterval != 0:
return
logData = [sampler.currentIteration, sampler.simulation.currentStep]+sampler.replicaStateIndex
print(','.join([str(x) for x in logData]), file=self._log)
self._log.flush()
if self._energy is not None:
import numpy as np
energy = np.array(sampler.replicaStateEnergy)
if sampler._kT is None:
energy /= unit.MOLAR_GAS_CONSTANT_R*sampler.simulation.integrator.getTemperature()
else:
energy /= sampler._kT
print(','.join([str(x) for x in energy.flatten()]), file=self._energy)
self._energy.flush()
for i, conf in enumerate(sampler.replicaConformation):
if len(self._stateReporters) > 0:
self._stateReporters[sampler.replicaStateIndex[i]].report(sampler.simulation, conf)
if len(self._replicaReporters) > 0:
self._replicaReporters[i].report(sampler.simulation, conf)
safesave.save(mm.XmlSerializer.serialize(conf), os.path.join(self.directory, f'checkpoint_{i}.xml'))
......@@ -35,7 +35,7 @@ import random
class ReplicaExchangeSampler(object):
"""
ReplicaExchangeSampler uses replica exchange to simulate a system in a collection of different states. It supports
ReplicaExchangeSampler uses replica exchange to simulate a system in a collection of thermodynamic states. It supports
both temperature replica exchange, in which the states correspond to different temperatures, and Hamiltonian replica
exchange, in which they correspond to different potential functions. It also can combine them to vary temperature
and potential function at the same time.
......@@ -54,9 +54,9 @@ class ReplicaExchangeSampler(object):
- Hamiltonian replica exchange may be used to sample an alchemical transition between two endpoints. It produces
efficient sampling at every point along the transition pathway from which a free energy difference can be computed.
Each state (not to be confused with a State object) is represented by a dict containing property values. All states
must specify values for the same properties. They describe the ways in which the states differ from each other. The
following properties are supported.
Each thermodynamic state (not to be confused with a State object) is represented by a dict containing property
values. All states must specify values for the same properties. They describe the ways in which the states differ
from each other. The following properties are supported.
- 'temperature': the simulation temperature
- 'groups': a set containing the force groups to include when computing the energy, for example {0, 2}. It also may
......@@ -90,7 +90,16 @@ class ReplicaExchangeSampler(object):
Because replica exchange involves simulating many distinct replicas, the standard reporters used to generate output
from a simulation are often not appropriate. ReplicaExchangeSampler instead provides its own reporting mechanism.
Define a function that takes a ReplicaExchangeSampler as its only argument:
For most purposes, you can use the ReplicaExchangeReporter class. Create a reporter and add it to the sampler:
>>> sampler.reporters.append(ReplicaExchangeReporter(directory, interval, sampler))
It can write out a variety of types of information including a log of state assignments, trajectories for each
replica and/or state, reduced energies, and checkpoint files for resuming a simulation. See the documentation on
ReplicaExchangeReporter for details.
If you want to output other information, you can create your own reporters. Define a function that takes a
ReplicaExchangeSampler as its only argument:
>>> def report(sampler):
>>> # Generate whatever output you want here
......
from openmm import *
from openmm.app import *
from openmm.unit import *
from openmm.app.internal.xtc_utils import read_xtc
import numpy as np
import os
import tempfile
import unittest
class TestReplicaExchangeSampler(unittest.TestCase):
......@@ -74,3 +77,103 @@ class TestReplicaExchangeSampler(unittest.TestCase):
for i in range(len(r2)):
average = 0.5*states[i]['k']*r2[i]/steps
self.assertTrue(0.7 < average/expected < 1.3)
def testReporter(self):
"""Test reporting output from a replica exchange simulation."""
# Set up a replica exchange simulation.
pdb = PDBFile('systems/alanine-dipeptide-implicit.pdb')
ff = ForceField('amber19-all.xml')
system = ff.createSystem(pdb.topology)
integrator = LangevinIntegrator(300*kelvin, 1/picosecond, 0.001*picosecond)
simulation = Simulation(pdb.topology, system, integrator, Platform.getPlatform('Reference'))
simulation.context.setPositions(pdb.positions)
states = [{'temperature':t*kelvin} for t in [300.0, 320.0, 340.0]]
sampler = ReplicaExchangeSampler(states, simulation, 5)
# Add reporters to it.
stateIndices = []
energies = []
conformations = []
def report(sampler):
if sampler.currentIteration % 3 == 0:
stateIndices.append(sampler.replicaStateIndex[:])
energies.append(sampler.replicaStateEnergy[:])
conformations.append(sampler.replicaConformation[:])
with tempfile.TemporaryDirectory() as directory:
sampler.reporters.append(ReplicaExchangeReporter(directory, 3, sampler, trajectoryPerState=True, trajectoryPerReplica=True, energy=True))
sampler.reporters.append(report)
# Generate some output.
sampler.simulate(15)
# Delete all objects from the simulation and create a new one, telling it to resume from the files.
del sampler
del simulation
del integrator
integrator = LangevinIntegrator(300*kelvin, 1/picosecond, 0.001*picosecond)
simulation = Simulation(pdb.topology, system, integrator, Platform.getPlatform('Reference'))
sampler = ReplicaExchangeSampler(states, simulation, 5)
sampler.reporters.append(ReplicaExchangeReporter(directory, 3, sampler, trajectoryPerState=True, trajectoryPerReplica=True, energy=True, resume=True))
sampler.reporters.append(report)
# Check that it loaded the checkpoints correctly.
for i in range(len(states)):
xml1 = XmlSerializer.serialize(sampler.replicaConformation[i])
xml2 = open(os.path.join(directory, f'checkpoint_{i}.xml')).read()
self.assertEqual(xml1, xml2)
# Generate some more output.
sampler.simulate(15)
# Check the log file.
lines = open(os.path.join(directory, 'log.csv')).readlines()[1:]
for i, line in enumerate(lines):
fields = [int(x) for x in line.split(',')]
self.assertEqual(fields[0], 3*(i+1))
self.assertEqual(fields[1], 15*(i+1))
for j in range(len(states)):
self.assertEqual(stateIndices[i][j], fields[j+2])
# Check the energy file.
energy = np.loadtxt(os.path.join(directory, 'energy.csv'), delimiter=',').reshape(-1, len(states), len(states))
for i in range(10):
for j in range(len(states)):
for k in range(len(states)):
self.assertAlmostEqual(energy[i][j][k], energies[i][j][k]/sampler._kT[k])
# Check the trajectory files.
for i in range(len(states)):
# Check the per-replica trajectories.
coords_read, box_read, time, step = read_xtc(os.path.join(directory, f'replica_{i}.xtc').encode('utf-8'))
self.assertTrue(np.allclose(step, np.linspace(15, 150, 10)))
self.assertTrue(np.allclose(time, 0.005*np.linspace(3, 30, 10)))
for j in range(10):
conf = conformations[j][i].getPositions().value_in_unit(nanometers)
self.assertTrue(np.allclose(conf, coords_read[:,:,j], atol=0.001))
# Check the per-state trajectories.
coords_read, box_read, time, step = read_xtc(os.path.join(directory, f'state_{i}.xtc').encode('utf-8'))
self.assertTrue(np.allclose(step, np.linspace(15, 150, 10)))
self.assertTrue(np.allclose(time, 0.005*np.linspace(3, 30, 10)))
for j in range(10):
replica = stateIndices[j].index(i)
conf = conformations[j][replica].getPositions().value_in_unit(nanometers)
self.assertTrue(np.allclose(conf, coords_read[:,:,j], atol=0.001))
# Creating a new reporter for the same directory should fail.
with self.assertRaises(ValueError):
ReplicaExchangeReporter(directory, 3, sampler)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment