-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathParallelMDAnalysis.py
366 lines (292 loc) · 12.3 KB
/
ParallelMDAnalysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# ------------------------------------------------------------------------------------ #
# COPIED FROM MDAnalysis v1.1.1
# Changed AnalysisBase to ParallelAnalysisBase, which now includes a `multiprocessing`
# option in the `run()` method.
#
# Changed InterRDF to ParallelInterRDF to take advantage of ParallelAnalysisBase
#
# ------------------------------------------------------------------------------------ #
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# MDAnalysis --- https://www.mdanalysis.org
# Copyright (c) 2006-2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
#
# Please cite your use of MDAnalysis in published work:
#
# R. J. Gowers, M. Linke, J. Barnoud, T. J. E. Reddy, M. N. Melo, S. L. Seyler,
# D. L. Dotson, J. Domanski, S. Buchoux, I. M. Kenney, and O. Beckstein.
# MDAnalysis: A Python package for the rapid analysis of molecular dynamics
# simulations. In S. Benthall and S. Rostrup editors, Proceedings of the 15th
# Python in Science Conference, pages 102-109, Austin, TX, 2016. SciPy.
# doi: 10.25080/majora-629e541a-00e
#
# N. Michaud-Agrawal, E. J. Denning, T. B. Woolf, and O. Beckstein.
# MDAnalysis: A Toolkit for the Analysis of Molecular Dynamics Simulations.
# J. Comput. Chem. 32 (2011), 2319--2327, doi:10.1002/jcc.21787
#
from __future__ import division, absolute_import
import six
from six.moves import range, zip
import inspect
import logging
import itertools
import warnings
import numpy as np
from MDAnalysis import coordinates
from MDAnalysis.core.groups import AtomGroup
from MDAnalysis.lib.log import ProgressBar
from MDAnalysis.analysis.base import Results
from MDAnalysis.lib.util import blocks_of
from MDAnalysis.lib import distances
import multiprocessing
from multiprocessing import Pool
from functools import partial
logger = logging.getLogger(__name__)
class ParallelAnalysisBase(object):
"""Base class for defining multi frame analysis
The class it is designed as a template for creating multiframe analyses.
This class will automatically take care of setting up the trajectory
reader for iterating, and it offers to show a progress meter.
To define a new Analysis, `AnalysisBase` needs to be subclassed
`_single_frame` must be defined. It is also possible to define
`_prepare` and `_conclude` for pre and post processing. See the example
below.
.. code-block:: python
class NewAnalysis(AnalysisBase):
def __init__(self, atomgroup, parameter, **kwargs):
super(NewAnalysis, self).__init__(atomgroup.universe.trajectory,
**kwargs)
self._parameter = parameter
self._ag = atomgroup
def _prepare(self):
# OPTIONAL
# Called before iteration on the trajectory has begun.
# Data structures can be set up at this time
self.result = []
def _single_frame(self):
# REQUIRED
# Called after the trajectory is moved onto each new frame.
# store result of `some_function` for a single frame
self.result.append(some_function(self._ag, self._parameter))
def _conclude(self):
# OPTIONAL
# Called once iteration on the trajectory is finished.
# Apply normalisation and averaging to results here.
self.result = np.asarray(self.result) / np.sum(self.result)
Afterwards the new analysis can be run like this.
.. code-block:: python
na = NewAnalysis(u.select_atoms('name CA'), 35).run(start=10, stop=20)
print(na.result)
Attributes
----------
times: np.ndarray
array of Timestep times. Only exists after calling run()
frames: np.ndarray
array of Timestep frame indices. Only exists after calling run()
"""
def __init__(self, trajectory, verbose=False, **kwargs):
"""
Parameters
----------
trajectory : mda.Reader
A trajectory Reader
verbose : bool, optional
Turn on more logging and debugging, default ``False``
.. versionchanged:: 1.0.0
Support for setting ``start``, ``stop``, and ``step`` has been
removed. These should now be directly passed to
:meth:`AnalysisBase.run`.
"""
self._trajectory = trajectory
self._verbose = verbose
def _setup_frames(self, trajectory, start=None, stop=None, step=None):
"""
Pass a Reader object and define the desired iteration pattern
through the trajectory
Parameters
----------
trajectory : mda.Reader
A trajectory Reader
start : int, optional
start frame of analysis
stop : int, optional
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
.. versionchanged:: 1.0.0
Added .frames and .times arrays as attributes
"""
self._trajectory = trajectory
start, stop, step = trajectory.check_slice_indices(start, stop, step)
self.start = start
self.stop = stop
self.step = step
self.n_frames = len(range(start, stop, step))
self.frames = np.zeros(self.n_frames, dtype=int)
self.times = np.zeros(self.n_frames)
def _single_frame(self, frame_idx):
"""Calculate data from a single frame of trajectory
Don't worry about normalising, just deal with a single frame.
"""
raise NotImplementedError("Only implemented in child classes")
def _prepare(self):
"""Set things up before the analysis loop begins"""
pass # pylint: disable=unnecessary-pass
def _conclude(self):
"""Finalise the results you've gathered.
Called at the end of the run() method to finish everything up.
"""
pass # pylint: disable=unnecessary-pass
def run(self, start=None, stop=None, step=None, verbose=None, njobs=-1):
"""Perform the calculation
Parameters
----------
start : int, optional
start frame of analysis
stop : int, optional
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
verbose : bool, optional
Turn on verbosity
njobs : int, optional
number of cores to use for parallelization, default=-1 means all available
"""
logger.info("Choosing frames to analyze")
# if verbose unchanged, use class default
verbose = getattr(self, '_verbose',
False) if verbose is None else verbose
self._setup_frames(self._trajectory, start, stop, step)
logger.info("Starting preparation")
self._prepare()
if njobs == 1:
self._result = []
for i, ts in enumerate(ProgressBar(
self._trajectory[self.start:self.stop:self.step],
verbose=verbose)):
self._frame_index = i
self._ts = ts
self.frames[i] = ts.frame
self.times[i] = ts.time
# logger.info("--> Doing frame {} of {}".format(i+1, self.n_frames))
self._result.append(self._single_frame(i))
else:
if njobs == -1:
n = multiprocessing.cpu_count()
else:
n = njobs
print(f'Running with {n} CPUs')
frame_values = np.arange(len(self._trajectory[self.start:self.stop:self.step]))
with Pool(n) as worker_pool:
self._result = worker_pool.map(self._single_frame, frame_values)
logger.info("Finishing up")
self._conclude()
return self
class ParallelInterRDF(ParallelAnalysisBase):
r"""Intermolecular pair distribution function
The :ref:`radial distribution function<equation-gab>` is calculated by
histogramming distances between all particles in `g1` and `g2` while taking
periodic boundary conditions into account via the minimum image
convention.
The `exclusion_block` keyword may be used to exclude a set of distances
from the calculations.
Results are available in the attributes :attr:`rdf` and :attr:`count`.
Arguments
---------
g1 : AtomGroup
First AtomGroup
g2 : AtomGroup
Second AtomGroup
nbins : int (optional)
Number of bins in the histogram [75]
range : tuple or list (optional)
The size of the RDF [0.0, 15.0]
exclusion_block : tuple (optional)
A tuple representing the tile to exclude from the distance
array. [None]
verbose : bool (optional)
Show detailed progress of the calculation if set to ``True``; the
default is ``False``.
Example
-------
First create the :class:`InterRDF` object, by supplying two
AtomGroups then use the :meth:`run` method ::
rdf = InterRDF(ag1, ag2)
rdf.run()
Results are available through the :attr:`bins` and :attr:`rdf`
attributes::
plt.plot(rdf.bins, rdf.rdf)
The `exclusion_block` keyword allows the masking of pairs from
within the same molecule. For example, if there are 7 of each
atom in each molecule, the exclusion mask `(7, 7)` can be used.
.. versionadded:: 0.13.0
.. versionchanged:: 1.0.0
Support for the ``start``, ``stop``, and ``step`` keywords has been
removed. These should instead be passed to :meth:`InterRDF.run`.
"""
def __init__(self, g1, g2,
nbins=75, range=(0.0, 15.0), exclusion_block=None,
**kwargs):
super(ParallelInterRDF, self).__init__(g1.universe.trajectory, **kwargs)
self.g1 = g1
self.g2 = g2
self.u = g1.universe
self.rdf_settings = {'bins': nbins,
'range': range}
self._exclusion_block = exclusion_block
def _prepare(self):
# Empty histogram to store the RDF
count, edges = np.histogram([-1], **self.rdf_settings)
count = count.astype(np.float64)
count *= 0.0
self.count = count
self.edges = edges
self.bins = 0.5 * (edges[:-1] + edges[1:])
# Need to know average volume
self.volume = 0.0
# Set the max range to filter the search radius
self._maxrange = self.rdf_settings['range'][1]
# create a Results object to hold the results
self.results = Results()
self.results.bins = self.bins
def _single_frame(self, frame_idx):
self._frame_index = frame_idx
self._ts = self.u.trajectory[frame_idx]
pairs, dist = distances.capped_distance(self.g1.positions,
self.g2.positions,
self._maxrange,
box=self.u.dimensions)
# Maybe exclude same molecule distances
if self._exclusion_block is not None:
idxA, idxB = pairs[:, 0]//self._exclusion_block[0], pairs[:, 1]//self._exclusion_block[1]
mask = np.where(idxA != idxB)[0]
dist = dist[mask]
count = np.histogram(dist, **self.rdf_settings)[0]
return count, self._ts.volume # return, rather than accumulate
def _conclude(self):
# gather the results from all processes and sum
for res in self._result:
self.count += res[0]
self.volume += res[1]
# Number of each selection
nA = len(self.g1)
nB = len(self.g2)
N = nA * nB
# If we had exclusions, take these into account
if self._exclusion_block:
xA, xB = self._exclusion_block
nblocks = nA / xA
N -= xA * xB * nblocks
# Volume in each radial shell
vol = np.power(self.edges[1:], 3) - np.power(self.edges[:-1], 3)
vol *= 4/3.0 * np.pi
# Average number density
box_vol = self.volume / self.n_frames
density = N / box_vol
rdf = self.count / (density * vol * self.n_frames)
self.rdf = rdf
self.results.rdf = rdf