Parallelization

Dask is used to parallelize analysis in PMDA. It provides a flexible approach to task-based parallelism and can scale from multi-core laptops to large compute clusters.

Single machine

By default, all the available cores on the local machine (laptop or workstation) are used with the n_jobs=-1 keyword but any number can be set, e.g., n_jobs=4 to split the trajectory into 4 blocks.

Internally, this uses the processes (multiprocessing) scheduler of dask. If you want to make use of more advanced scheduler features or scale your analysis to multiple nodes, e.g., in an HPC (high performance computing) environment, then use the distributed scheduler, as described next. If n_jobs==1 a synchronous (single threaded) scheduler is used 1.

dask.distributed

With the distributed scheduler on can run analysis in a distributed fashion on HPC or ad-hoc clusters (see setting up a dask.distributed network) or on a single machine. (In addition, distributed also provides diagnostics in form of a dashboard in the browser and a progress bar.)

Local cluster (single machine)

You can try out dask.distributed with a local cluster, which sets up a scheduler and workers on the local machine.

import distributed
lc = distributed.LocalCluster(n_workers=8, processes=True)
client = distributed.Client(lc)

Setting up the client is sufficient for Dask (and PMDA, namely the run() method) to use it. We continue to use the RMSD example:

rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run()

Because the local cluster contains 8 workers, the RMSD trajectory analysis will be parallelized over 8 trajectory segments.

Cluster

In order to run on a larger cluster with multiple nodes (see setting up a dask.distributed network) one needs to know how to connect to the running scheduler (e.g., address and port number or shared state file). Assuming that the scheduler is running on 192.168.0.1:8786, one would initialize the distributed.Client and this is enough to use distributed for all analysis (it configures the scheduler to be distributed):

import distributed
client = distributed.Client('192.168.0.1:8786')
rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run()

In this way one can spread an analysis task over many different nodes.

Footnotes

1

The synchronous scheduler is very useful for debugging. By setting n_jobs=1 and not using a distributed scheduler, the synchronous scheduler is automatically used. Alternatively, set the synchronous scheduler with

dask.config.set(scheduler='synchronous')

for any n_jobs.