Parallel Analysis building blocks —
A collection of useful building blocks for creating Analysis classes.
Base class for defining parallel multi frame analysis
The class it is designed as a template for creating multiframe analyses. This class will automatically take care of setting up the trajectory reader for iterating in parallel.
To parallelize the analysis
ParallelAnalysisBaseseparates the trajectory into work blocks containing multiple frames. The number of blocks is equal to the number of available cores or dask workers. This minimizes the number of python processes that are started during a calculation. Accumulation of frames within a block happens in the self._reduce function. A consequence when using dask is that adding additional workers during a computation will not result in an reduction of run-time.
To define a new Analysis,
ParallelAnalysisBaseneeds to be subclassed and
_conclude()must be defined. It is also possible to define
_prepare()for pre-processing and
_reduce()for custom reduce operation on the work blocks. See the example below.
class NewAnalysis(ParallelAnalysisBase): def __init__(self, atomgroup, parameter): self._ag = atomgroup super(NewAnalysis, self).__init__(atomgroup.universe, self._ag) def _single_frame(self, ts, agroups): # REQUIRED # called for every frame. ``ts`` contains the current time step # and ``agroups`` a tuple of atomgroups that are updated to the # current frame. Return result of `some_function` for a single # frame return some_function(agroups, self._parameter) def _conclude(self): # REQUIRED # Called once iteration on the trajectory is finished. Results # for each frame are stored in ``self._results`` in a per block # basis. Here those results should be moved and reshaped into a # sensible new variable. self.results = np.concatenate(self._results) # Apply normalisation and averaging to results here if wanted. self.results /= np.sum(self.results @staticmethod def _reduce(res, result_single_frame): # NOT REQUIRED # Called for every frame. ``res`` contains all the results # before current time step, and ``result_single_frame`` is the # result of self._single_frame for the current time step. The # return value is the updated ``res``. The default is to append # results to a python list. This approach is sufficient for # time-series data. res.append(results_single_frame) # This is not suitable for every analysis. To add results over # multiple frames this function can be overwritten. The default # value for ``res`` is an empty list. Here we change the type to # the return type of `self._single_frame`. Afterwards we can # safely use addition to accumulate the results. if res == : res = result_single_frame else: res += result_single_frame # If you overwrite this function *always* return the updated # ``res`` at the end. return res
Afterwards the new analysis can be run like this.
na = NewAnalysis(u.select_atoms('name CA'), 35).run() print(na.result)
Universe) – a
MDAnalysis.core.groups.Universe(the atomgroups must belong to this Universe)
atomgroups (tuple of
AtomGroup) – atomgroups that are iterated in parallel
The raw data from each process are stored as a list of lists, with each sublist containing the return values from
Set the attributes of this class to be read only
Useful to avoid the class being modified when passing it around.
To be used as a context manager:
with analysis.readonly_attributes(): some_function(analysis)
run(start=None, stop=None, step=None, n_jobs=1, n_blocks=None)¶
Perform the calculation
start (int, optional) – start frame of analysis
stop (int, optional) – stop frame of analysis
step (int, optional) – number of frames to skip between each analysed frame
n_jobs (int, optional) – number of jobs to start, if -1 use number of logical cpu cores. This argument will be ignored when the distributed scheduler is used
n_blocks (int, optional) – number of blocks to divide trajectory into. If
Noneset equal to n_jobs or number of available workers in scheduler.
Timing(io, compute, total, universe, prepare, conclude, wait=None, io_block=None, compute_block=None)¶
store various timeing results of obtained during a parallel analysis run
compute time per frame
compute time per block
time to conclude
cumulative time of io and compute for each frame. This isn’t equal to self.total / n_jobs because self.total also includes the scheduler overhead
io time per frame
io time per block
time to prepare
time to create a universe for each block
time for blocks to start working