Unverified Commit 81271f93 authored by Stephen Farr's avatar Stephen Farr Committed by GitHub
Browse files

Add support for writing atom subsets to PDBReporter (#3931)

* enable atom subset in PDBReporter

* adds optional atomSubset argument to PDBReporter
* adds functions in PDBReporter which create a new topology
  and positions with chosen subset of atoms
* PDBReporter write PDB files with the subset topology and positions.

* refactor PDBReporter

* make _createTopologySubset a method that is called only the first time it is needed
* more efficient creation of subset positions
* check that atomSubset is ordered

* refactoring PDBReporter and add PDBx

* move checks on atomSubset to _createSubsetTopology
* copy periodic box vectors from topology to subsetTopology
* add atomSubset to PDBx reporter

* add bond subset to createSubsetTopology

* formatting changes to pdbreporter

* loop over atoms cleaned up
* put file opening inside parameter loop in
  TestPdbReporter::testinvalidSubsets to try and fix failing tests on windows

* add thorough tests to TestPdbReporter

* spelling changes
* Add tests for atom positions, elements, names etc
* attempt at fixing PyPy and Windows failing test cases

* fix spellings

* close output file before raising exceptions

* closes output files before raising exception in pdbreporter
createSubsetTopology
* changes assertVecAlmostEqual from a method to function to avoid
  repeating
parent cc8cdf60
......@@ -32,7 +32,8 @@ from __future__ import absolute_import
__author__ = "Peter Eastman"
__version__ = "1.0"
from openmm.app import PDBFile, PDBxFile
from openmm.app import PDBFile, PDBxFile, Topology
from openmm.unit import angstroms
class PDBReporter(object):
"""PDBReporter outputs a series of frames from a Simulation to a PDB file.
......@@ -40,7 +41,7 @@ class PDBReporter(object):
To use it, create a PDBReporter, then add it to the Simulation's list of reporters.
"""
def __init__(self, file, reportInterval, enforcePeriodicBox=None):
def __init__(self, file, reportInterval, enforcePeriodicBox=None, atomSubset=None):
"""Create a PDBReporter.
Parameters
......@@ -54,12 +55,17 @@ class PDBReporter(object):
lies in the same periodic box. If None (the default), it will automatically decide whether
to translate molecules based on whether the system being simulated uses periodic boundary
conditions.
atomSubset: list
Atom indices (zero indexed) of the particles to output. if None (the default), all particles will be output.
"""
self._reportInterval = reportInterval
self._enforcePeriodicBox = enforcePeriodicBox
self._out = open(file, 'w')
self._topology = None
self._nextModel = 0
self._atomSubset = atomSubset
self._subsetTopology = None
def describeNextReport(self, simulation):
"""Get information about the next report this object will generate.
......@@ -91,15 +97,82 @@ class PDBReporter(object):
state : State
The current state of the simulation
"""
if self._atomSubset is not None:
if self._subsetTopology is None:
self._createSubsetTopology(simulation.topology)
topology = self._subsetTopology
#PDBFile will convert to angstroms so do it here first instead
positions = state.getPositions().value_in_unit(angstroms)
positions = [positions[i] for i in self._atomSubset]
else:
topology = simulation.topology
positions = state.getPositions()
if self._nextModel == 0:
PDBFile.writeHeader(simulation.topology, self._out)
self._topology = simulation.topology
PDBFile.writeHeader(topology, self._out)
self._topology = topology
self._nextModel += 1
PDBFile.writeModel(simulation.topology, state.getPositions(), self._out, self._nextModel)
PDBFile.writeModel(topology, positions, self._out, self._nextModel)
self._nextModel += 1
if hasattr(self._out, 'flush') and callable(self._out.flush):
self._out.flush()
def _createSubsetTopology(self, topology):
"""Create a subset of an existing topology.
Parameters
----------
topology : Topology
The Topology to create a subset from
"""
# check atomSubset is valid
if len(self._atomSubset) == 0:
self._out.close()
raise ValueError('atomSubset cannot be an empty list')
if not all(a == int(a) for a in self._atomSubset):
self._out.close()
raise ValueError('all of the indices in atomSubset must be integers')
if len(set(self._atomSubset)) != len(self._atomSubset):
self._out.close()
raise ValueError('atomSubset must contain unique indices')
if sorted(self._atomSubset) != self._atomSubset:
self._out.close()
raise ValueError('atomSubset must be sorted in ascending order')
if self._atomSubset[0] < 0:
self._out.close()
raise ValueError('The smallest allowed value in atomSubset is zero')
if self._atomSubset[-1] >= topology.getNumAtoms():
self._out.close()
raise ValueError('The maximum allowed value in atomSubset must be less than the total number of particles')
self._subsetTopology = Topology()
# convert to set for fast look up
atomSubsetSet = set(self._atomSubset)
# store a map from posIndex to Atom object for when we add the bonds
indexToAtom = {}
for chain in topology.chains():
c = self._subsetTopology.addChain(chain.id)
for res in chain.residues():
r = self._subsetTopology.addResidue(res.name, c, res.id, res.insertionCode)
for atom in res.atoms():
if atom.index in atomSubsetSet:
indexToAtom[atom.index] = self._subsetTopology.addAtom(atom.name, atom.element, r, atom.id)
self._subsetTopology.setPeriodicBoxVectors(topology.getPeriodicBoxVectors())
for bond in topology.bonds():
if bond[0].index in atomSubsetSet and bond[1].index in atomSubsetSet:
atom1 = indexToAtom[bond[0].index]
atom2 = indexToAtom[bond[1].index]
self._subsetTopology.addBond(atom1, atom2, bond.type, bond.order)
def __del__(self):
if self._topology is not None:
PDBFile.writeFooter(self._topology, self._out)
......@@ -121,10 +194,25 @@ class PDBxReporter(PDBReporter):
state : State
The current state of the simulation
"""
if self._atomSubset is not None:
if self._subsetTopology is None:
self._createSubsetTopology(simulation.topology)
topology = self._subsetTopology
#PDBFile will convert to angstroms so do it here first instead
positions = state.getPositions().value_in_unit(angstroms)
positions = [positions[i] for i in self._atomSubset]
else:
topology = simulation.topology
positions = state.getPositions()
if self._nextModel == 0:
PDBxFile.writeHeader(simulation.topology, self._out)
PDBxFile.writeHeader(topology, self._out)
self._nextModel += 1
PDBxFile.writeModel(simulation.topology, state.getPositions(), self._out, self._nextModel)
PDBxFile.writeModel(topology, positions, self._out, self._nextModel)
self._nextModel += 1
if hasattr(self._out, 'flush') and callable(self._out.flush):
self._out.flush()
......
import unittest
import tempfile
from openmm import app
import openmm as mm
from openmm import unit
from openmm.unit.unit_math import norm
import os
import gc
def assertVecAlmostEqual(p1, p2, tol=1e-7):
unit = p1.unit
p1 = p1.value_in_unit(unit)
p2 = p2.value_in_unit(unit)
scale = max(1.0, norm(p1),)
for i in range(3):
diff = abs(p1[i]-p2[i])/scale
assert(diff < tol)
class TestPDBReporter(unittest.TestCase):
def setUp(self):
self.pdb = app.PDBFile('systems/alanine-dipeptide-explicit.pdb')
self.forcefield = app.ForceField('amber99sbildn.xml','tip3p.xml')
self.system = self.forcefield.createSystem(self.pdb.topology, nonbondedMethod=app.CutoffNonPeriodic, constraints=app.HBonds)
def testSubset(self):
"""Test writing out a subset of atoms"""
with tempfile.TemporaryDirectory() as tempdir:
filename = os.path.join(tempdir, 'temptraj.pdb')
simulation = app.Simulation(self.pdb.topology, self.system, mm.LangevinMiddleIntegrator(300*unit.kelvin, 1.0/unit.picosecond, 0.002*unit.picoseconds))
simulation.context.setPositions(self.pdb.positions)
# just the alanine-dipeptide atoms
subset = list(range(0,22))
simulation.reporters.append(app.PDBReporter(filename, 1, atomSubset=subset))
simulation.step(1)
# clear reporters to ensure PDBReporter calls writeFooter and file.close
simulation.reporters.clear()
# check if the output out trajectory contains the expected number of atoms
checkpdb = app.PDBFile(filename)
self.assertEqual(len(checkpdb.positions), len(subset))
# check the positions are correct
validPositions = [simulation.context.getState(getPositions=True).getPositions()[i] for i in subset]
# round to 4 decimal places before comparing
for i in range(len(validPositions)):
validPositions[i] = [round(validPositions[i][j]._value, 4) for j in (0, 1, 2)]*unit.nanometer
for (p1, p2) in zip(checkpdb.positions, validPositions):
assertVecAlmostEqual(p1, p2)
# check elements and residue names are correct
validAtoms = [list(self.pdb.topology.atoms())[i] for i in subset]
for atom1, atom2 in zip(checkpdb.topology.atoms(), validAtoms):
self.assertEqual(atom1.element, atom2.element)
self.assertEqual(atom1.name, atom2.name)
self.assertEqual(atom1.residue.name, atom2.residue.name)
def testWriteAll(self):
"""Test all atoms are written when atomSubset is not specified"""
with tempfile.TemporaryDirectory() as tempdir:
filename = os.path.join(tempdir, 'temptraj.pdb')
simulation = app.Simulation(self.pdb.topology, self.system, mm.LangevinMiddleIntegrator(300*unit.kelvin, 1.0/unit.picosecond, 0.002*unit.picoseconds))
simulation.context.setPositions(self.pdb.positions)
simulation.reporters.append(app.PDBReporter(filename, 1))
simulation.step(1)
# clear reporters to ensure PDBReporter calls writeFooter and file.close
simulation.reporters.clear()
# check if the output out trajectory contains the expected number of atoms
checkpdb = app.PDBFile(filename)
self.assertEqual(len(checkpdb.positions), simulation.topology.getNumAtoms())
# check the positions are correct
validPositions = simulation.context.getState(getPositions=True).getPositions()
# round to 4 decimal places before comparing
for i in range(len(validPositions)):
validPositions[i] = [round(validPositions[i][j]._value, 4) for j in (0, 1, 2)]*unit.nanometer
for (p1, p2) in zip(checkpdb.positions, validPositions):
assertVecAlmostEqual(p1, p2)
# check elements and residue names are correct
validAtoms = list(self.pdb.topology.atoms())
for atom1, atom2 in zip(checkpdb.topology.atoms(), validAtoms):
self.assertEqual(atom1.element, atom2.element)
self.assertEqual(atom1.name, atom2.name)
self.assertEqual(atom1.residue.name, atom2.residue.name)
def testInvalidSubsets(self):
"""Test that an exception is raised when the indices in atomSubset are invalid"""
with tempfile.TemporaryDirectory() as tempdir:
for i, subset in enumerate([[-1,10], [0,99999], [0,0,0,1], [0.1,0.2], [5,10,0,9], ["C", "H"],[]]):
filename = os.path.join(tempdir, 'temptraj'+str(i)+'.pdb')
simulation = app.Simulation(self.pdb.topology, self.system, mm.LangevinMiddleIntegrator(300*unit.kelvin, 1.0/unit.picosecond, 0.002*unit.picoseconds))
simulation.context.setPositions(self.pdb.positions)
simulation.reporters.append(app.PDBReporter(filename, 1, atomSubset=subset))
self.assertRaises(ValueError, lambda: simulation.step(1))
def testBondSubset(self):
""" Test that CONECT records are output correctly when using atomSubset"""
# use a testcase that has CONECT records in the input pdb file
ff = app.ForceField('amber14/protein.ff14SB.xml', 'amber14/GLYCAM_06j-1.xml','amber14/tip3pfb.xml')
pdb = app.PDBFile('systems/glycopeptide.pdb')
# add in water molecules
modeller = app.Modeller(pdb.topology, pdb.positions)
modeller.addSolvent(ff, padding=1.0*unit.nanometer)
system = ff.createSystem(modeller.topology)
with tempfile.TemporaryDirectory() as tempdir:
filename = os.path.join(tempdir, 'temptraj.pdb')
simulation = app.Simulation(modeller.topology, system, mm.LangevinMiddleIntegrator(1.0*unit.kelvin, 1.0/unit.picosecond, 0.0000001*unit.picoseconds))
simulation.context.setPositions(modeller.positions)
# output just the glycopeptide atoms
atomSubset = list(range(pdb.topology.getNumAtoms()))
simulation.reporters.append(app.PDBReporter(filename, 1, atomSubset=atomSubset))
simulation.step(1)
# clear reporters to ensure PDBReporter calls writeFooter and file.close
simulation.reporters.clear()
# for PyPy the above line is not enough, we need to force garbage collection to ensure the
# PDBReporter.__del__ method has been called before we open the file for reading
gc.collect()
validpdb = pdb
testpdb = app.PDBFile(filename)
validBonds = list(validpdb.topology.bonds())
testBonds = list(testpdb.topology.bonds())
self.assertEqual(len(validBonds), len(testBonds))
for validBond, testBond in zip(validBonds, testBonds):
self.assertEqual(str(validBond), str(testBond))
class TestPDBxReporter(unittest.TestCase):
def setUp(self):
self.pdb = app.PDBFile('systems/alanine-dipeptide-explicit.pdb')
self.forcefield = app.ForceField('amber99sbildn.xml','tip3p.xml')
self.system = self.forcefield.createSystem(self.pdb.topology, nonbondedMethod=app.CutoffNonPeriodic, constraints=app.HBonds)
def testSubset(self):
"""Test writing out a subset of atoms"""
with tempfile.TemporaryDirectory() as tempdir:
filename = os.path.join(tempdir, 'temptraj.pdbx')
simulation = app.Simulation(self.pdb.topology, self.system, mm.LangevinMiddleIntegrator(300*unit.kelvin, 1.0/unit.picosecond, 0.002*unit.picoseconds))
simulation.context.setPositions(self.pdb.positions)
# just the alanine-dipeptide atoms
subset = list(range(0,22))
simulation.reporters.append(app.PDBxReporter(filename, 1, atomSubset=subset))
simulation.step(1)
# clear reporters to ensure PDBxReporter calls file.close
simulation.reporters.clear()
# check if the output out trajectory contains the expected number of atoms
checkpdb = app.PDBxFile(filename)
self.assertEqual(len(checkpdb.positions), len(subset))
# check the positions are correct
validPositions = [simulation.context.getState(getPositions=True).getPositions()[i] for i in subset]
# round to 5 decimal places before comparing
for i in range(len(validPositions)):
validPositions[i] = [round(validPositions[i][j]._value, 5) for j in (0, 1, 2)]*unit.nanometer
for (p1, p2) in zip(checkpdb.positions, validPositions):
assertVecAlmostEqual(p1, p2)
# check elements and residue names are correct
validAtoms = [list(self.pdb.topology.atoms())[i] for i in subset]
for atom1, atom2 in zip(checkpdb.topology.atoms(), validAtoms):
self.assertEqual(atom1.element, atom2.element)
self.assertEqual(atom1.name, atom2.name)
self.assertEqual(atom1.residue.name, atom2.residue.name)
def testWriteAll(self):
"""Test all atoms are written when atomSubset is not specified"""
with tempfile.TemporaryDirectory() as tempdir:
filename = os.path.join(tempdir, 'temptraj.pdbx')
simulation = app.Simulation(self.pdb.topology, self.system, mm.LangevinMiddleIntegrator(300*unit.kelvin, 1.0/unit.picosecond, 0.002*unit.picoseconds))
simulation.context.setPositions(self.pdb.positions)
simulation.reporters.append(app.PDBxReporter(filename, 1))
simulation.step(1)
# clear reporters to ensure PDBxReporter calls file.close
simulation.reporters.clear()
# check if the output out trajectory contains the expected number of atoms
checkpdb = app.PDBxFile(filename)
self.assertEqual(len(checkpdb.positions), simulation.topology.getNumAtoms())
# check the positions are correct
validPositions = simulation.context.getState(getPositions=True).getPositions()
# round to 5 decimal places before comparing
for i in range(len(validPositions)):
validPositions[i] = [round(validPositions[i][j]._value, 5) for j in (0, 1, 2)]*unit.nanometer
for (p1, p2) in zip(checkpdb.positions, validPositions):
assertVecAlmostEqual(p1, p2)
# check elements and residue names are correct
validAtoms = list(self.pdb.topology.atoms())
for atom1, atom2 in zip(checkpdb.topology.atoms(), validAtoms):
self.assertEqual(atom1.element, atom2.element)
self.assertEqual(atom1.name, atom2.name)
self.assertEqual(atom1.residue.name, atom2.residue.name)
def testInvalidSubsets(self):
"""Test that an exception is raised when the indices in atomSubset are invalid"""
with tempfile.TemporaryDirectory() as tempdir:
for i,subset in enumerate([[-1,10], [0,99999], [0,0,0,1], [0.1,0.2], [5,10,0,9], ["C", "H"],[]]):
filename = os.path.join(tempdir, 'temptraj'+str(i)+'.pdbx')
simulation = app.Simulation(self.pdb.topology, self.system, mm.LangevinMiddleIntegrator(300*unit.kelvin, 1.0/unit.picosecond, 0.002*unit.picoseconds))
simulation.context.setPositions(self.pdb.positions)
simulation.reporters.append(app.PDBxReporter(filename, 1, atomSubset=subset))
self.assertRaises(ValueError, lambda: simulation.step(1))
def testBondSubset(self):
""" Test that struct_conn records are output correctly when using atomSubset"""
# use a testcase that has CONECT records in the input pdb file
ff = app.ForceField('amber14/protein.ff14SB.xml', 'amber14/GLYCAM_06j-1.xml','amber14/tip3pfb.xml')
pdb = app.PDBFile('systems/glycopeptide.pdb')
# add in water molecules
modeller = app.Modeller(pdb.topology, pdb.positions)
modeller.addSolvent(ff, padding=1.0*unit.nanometer)
system = ff.createSystem(modeller.topology)
with tempfile.TemporaryDirectory() as tempdir:
filename = os.path.join(tempdir, 'temptraj.pdbx')
simulation = app.Simulation(modeller.topology, system, mm.LangevinMiddleIntegrator(1.0*unit.kelvin, 1.0/unit.picosecond, 0.0000001*unit.picoseconds))
simulation.context.setPositions(modeller.positions)
# output just the glycopeptide atoms
atomSubset = list(range(pdb.topology.getNumAtoms()))
simulation.reporters.append(app.PDBxReporter(filename, 1, atomSubset=atomSubset))
simulation.step(1)
# clear reporters to ensure PDBxReporter calls file.close
simulation.reporters.clear()
validpdb = pdb
testpdb = app.PDBxFile(filename)
validBonds = list(validpdb.topology.bonds())
testBonds = list(testpdb.topology.bonds())
self.assertEqual(len(validBonds), len(testBonds))
for validBond, testBond in zip(validBonds, testBonds):
self.assertEqual(str(validBond), str(testBond))
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment