.. -*- coding: utf-8 -*- .. _parallelization: ================= Parallelization ================= Dask_ is used to parallelize analysis in PMDA. It provides a flexible approach to task-based parallelism and can scale from multi-core laptops to large compute clusters. Single machine ============== By default, all the available cores on the local machine (laptop or workstation) are used with the ``n_jobs=-1`` keyword but any number can be set, e.g., ``n_jobs=4`` to split the trajectory into 4 blocks. Internally, this uses the processes (multiprocessing) `scheduler`_ of dask. If you want to make use of more advanced scheduler features or scale your analysis to multiple nodes, e.g., in an HPC (high performance computing) environment, then use the :mod:`distributed` scheduler, as described next. If ``n_jobs==1`` a synchronous (single threaded) scheduler is used [#threads]_. .. _`scheduler`: https://docs.dask.org/en/latest/scheduler-overview.html ``dask.distributed`` ==================== With the `distributed`_ scheduler on can run analysis in a distributed fashion on HPC or ad-hoc clusters (see `setting up a dask.distributed network`_) or on a `single machine`_. (In addition, *distributed* also provides `diagnostics`_ in form of a dashboard in the browser and a progress bar.) .. _Dask: https://dask.org .. _`distributed`: https://distributed.readthedocs.io/ .. _`setting up a dask.distributed network`: https://distributed.readthedocs.io/en/latest/setup.html .. _`single machine`: http://docs.dask.org/en/latest/setup/single-distributed.html .. _diagnostics: http://docs.dask.org/en/latest/diagnostics-distributed.html Local cluster (single machine) ------------------------------ You can try out `dask.distributed`_ with a `local cluster`_, which sets up a scheduler and workers on the local machine. .. code:: python import distributed lc = distributed.LocalCluster(n_workers=8, processes=True) client = distributed.Client(lc) Setting up the ``client`` is sufficient for Dask_ (and PMDA, namely the :meth:`~pmda.parallel.ParallelAnalysisBase.run` method) to use it. We continue to use the :ref:`RMSD example`: .. code:: python rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run() Because the local cluster contains 8 workers, the RMSD trajectory analysis will be parallelized over 8 trajectory segments. Cluster ------- In order to run on a larger cluster with multiple nodes (see `setting up a dask.distributed network`_) one needs to know how to connect to the running scheduler (e.g., address and port number or shared state file). Assuming that the scheduler is running on, one would initialize the `distributed.Client`_ and this is enough to use *distributed* for all analysis (it `configures the scheduler`_ to be *distributed*): .. code:: python import distributed client = distributed.Client('') rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run() In this way one can spread an analysis task over many different nodes. .. _`local cluster`: https://distributed.readthedocs.io/en/latest/local-cluster.html .. _`distributed.Client`: https://distributed.readthedocs.io/en/latest/client.html .. _`configures the scheduler`: https://docs.dask.org/en/latest/scheduling.html#configuration .. rubric:: Footnotes .. [#threads] The *synchronous* scheduler is very useful for debugging_. By setting ``n_jobs=1`` and not using a *distributed* scheduler, the synchronous scheduler is automatically used. Alternatively, set the synchronous scheduler with .. code:: python dask.config.set(scheduler='synchronous') for any ``n_jobs``. .. _debugging: https://docs.dask.org/en/latest/debugging.html