2.1.1. Parallel Analysis building blocks — pmda.parallel

A collection of useful building blocks for creating Analysis classes.

class pmda.parallel.ParallelAnalysisBase(universe, atomgroups)[source]

Base class for defining parallel multi frame analysis

The class it is designed as a template for creating multiframe analyses. This class will automatically take care of setting up the trajectory reader for iterating in parallel.

To parallelize the analysis ParallelAnalysisBase separates the trajectory into work blocks containing multiple frames. The number of blocks is equal to the number of available cores or dask workers. This minimizes the number of python processes that are started during a calculation. Accumulation of frames within a block happens in the self._reduce function. A consequence when using dask is that adding additional workers during a computation will not result in an reduction of run-time.

To define a new Analysis, ParallelAnalysisBase needs to be subclassed and _single_frame() and _conclude() must be defined. It is also possible to define _prepare() for pre-processing and _reduce() for custom reduce operation on the work blocks. See the example below.

class NewAnalysis(ParallelAnalysisBase):
    def __init__(self, atomgroup, parameter):
        self._ag = atomgroup
        super(NewAnalysis, self).__init__(atomgroup.universe,
                                          self._ag)

    def _single_frame(self, ts, agroups):
        # REQUIRED
        # called for every frame. ``ts`` contains the current time step
        # and ``agroups`` a tuple of atomgroups that are updated to the
        # current frame. Return result of `some_function` for a single
        # frame
        return some_function(agroups[0], self._parameter)

    def _conclude(self):
        # REQUIRED
        # Called once iteration on the trajectory is finished. Results
        # for each frame are stored in ``self._results`` in a per block
        # basis. Here those results should be moved and reshaped into a
        # sensible new variable.
        self.results = np.hstack(self._results)
        # Apply normalisation and averaging to results here if wanted.
        self.results /= np.sum(self.results

    @staticmethod
    def _reduce(res, result_single_frame):
        # NOT REQUIRED
        # Called for every frame. ``res`` contains all the results
        # before current time step, and ``result_single_frame`` is the
        # result of self._single_frame for the current time step. The
        # return value is the updated ``res``. The default is to append
        # results to a python list. This approach is sufficient for
        # time-series data.
        res.append(results_single_frame)
        # This is not suitable for every analysis. To add results over
        # multiple frames this function can be overwritten. The default
        # value for ``res`` is an empty list. Here we change the type to
        # the return type of `self._single_frame`. Afterwards we can
        # safely use addition to accumulate the results.
        if res == []:
            res = result_single_frame
        else:
            res += result_single_frame
        # If you overwrite this function *always* return the updated
        # ``res`` at the end.
        return res

Afterwards the new analysis can be run like this.

na = NewAnalysis(u.select_atoms('name CA'), 35).run()
print(na.result)
Parameters
  • Universe (Universe) – a MDAnalysis.core.groups.Universe (the atomgroups must belong to this Universe)

  • atomgroups (tuple of AtomGroup) – atomgroups that are iterated in parallel

_results

The raw data from each process are stored as a list of lists, with each sublist containing the return values from pmda.parallel.ParallelAnalysisBase._single_frame().

Type

list

readonly_attributes()[source]

Set the attributes of this class to be read only

Useful to avoid the class being modified when passing it around.

To be used as a context manager:

with analysis.readonly_attributes():
    some_function(analysis)
run(start=None, stop=None, step=None, n_jobs=1, n_blocks=None)[source]

Perform the calculation

Parameters
  • start (int, optional) – start frame of analysis

  • stop (int, optional) – stop frame of analysis

  • step (int, optional) – number of frames to skip between each analysed frame

  • n_jobs (int, optional) – number of jobs to start, if -1 use number of logical cpu cores. This argument will be ignored when the distributed scheduler is used

  • n_blocks (int, optional) – number of blocks to divide trajectory into. If None set equal to n_jobs or number of available workers in scheduler.

class pmda.parallel.Timing(io, compute, total, universe, prepare, conclude, wait=None, io_block=None, compute_block=None)[source]

store various timeing results of obtained during a parallel analysis run

property compute

compute time per frame

property compute_block

compute time per block

property conclude

time to conclude

property cumulate_time

cumulative time of io and compute for each frame. This isn’t equal to self.total / n_jobs because self.total also includes the scheduler overhead

property io

io time per frame

property io_block

io time per block

property prepare

time to prepare

property total

wall time

property universe

time to create a universe for each block

property wait

time for blocks to start working