Source code for pmda.leaflet

# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# PMDA
# Copyright (c) 2018 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
"""
LeafletFinder Analysis tool --- :mod:`pmda.leaflet`
===================================================

This module contains parallel versions of analysis tasks in
:mod:`MDAnalysis.analysis.leaflet`.

.. autoclass:: LeafletFinder
   :members:
   :undoc-members:
   :inherited-members:

"""
from __future__ import absolute_import, division

import numpy as np
import dask.bag as db
import networkx as nx
from scipy.spatial import cKDTree

import MDAnalysis as mda
import dask
from joblib import cpu_count

from .parallel import ParallelAnalysisBase, Timing
from .util import timeit


[docs]class LeafletFinder(ParallelAnalysisBase): """Parallel Leaflet Finder analysis. Identify atoms in the same leaflet of a lipid bilayer. This class implements and parallelizes the *LeafletFinder* algorithm [Michaud-Agrawal2011]_. The parallelization is done based on [Paraskevakos2018]_. Attributes ---------- Parameters ---------- Universe : :class:`~MDAnalysis.core.groups.Universe` a :class:`MDAnalysis.core.groups.Universe` (the `atomgroup` must belong to this Universe) atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` atomgroups that are iterated in parallel Note ---- At the moment, this class has far fewer features than the serial version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`. This version offers LeafletFinder algorithm 4 ("Tree-based Nearest Neighbor and Parallel-Connected Components (Tree-Search)") in [Paraskevakos2018]_. Currently, periodic boundaries are not taken into account. The calculation is parallelized on a per-frame basis; at the moment, no parallelization over trajectory blocks is performed. """ def __init__(self, universe, atomgroups): self._atomgroup = atomgroups self._results = list() super(LeafletFinder, self).__init__(universe, (atomgroups,)) def _find_connected_components(self, data, cutoff=15.0): """Perform the Connected Components discovery for the atoms in data. Parameters ---------- data : Tuple of lists of Numpy arrays This is a data and index tuple. The data are organized as `([AtomPositions1<NumpyArray>,AtomPositions2<NumpyArray>], [index1,index2])`. `index1` and `index2` are showing the position of the `AtomPosition` in the adjacency matrix and allows to correct the node number of the produced graph. cutoff : float (optional) head group-defining atoms within a distance of `cutoff` Angstroms are deemed to be in the same leaflet [15.0] Returns ------- values : list. A list of all the connected components of the graph that is generated from `data` """ # pylint: disable=unsubscriptable-object window, index = data[0] num = window[0].shape[0] i_index = index[0] j_index = index[1] graph = nx.Graph() if i_index == j_index: train = window[0] test = window[1] else: train = np.vstack([window[0], window[1]]) test = np.vstack([window[0], window[1]]) tree = cKDTree(train, leafsize=40) edges = tree.query_ball_point(test, cutoff) edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) for idx, dest_list in enumerate(edges)] edge_list_flat = np.array([list(item) for sublist in edge_list for item in sublist]) if i_index == j_index: res = edge_list_flat.transpose() res[0] = res[0] + i_index - 1 res[1] = res[1] + j_index - 1 else: removed_elements = list() for i in range(edge_list_flat.shape[0]): if (edge_list_flat[i, 0] >= 0 and edge_list_flat[i, 0] <= num - 1) and \ (edge_list_flat[i, 1] >= 0 and edge_list_flat[i, 1] <= num - 1) or \ (edge_list_flat[i, 0] >= num and edge_list_flat[i, 0] <= 2 * num - 1) and \ (edge_list_flat[i, 1] >= num and edge_list_flat[i, 1] <= 2 * num - 1) or \ (edge_list_flat[i, 0] >= num and edge_list_flat[i, 0] <= 2 * num - 1) and \ (edge_list_flat[i, 1] >= 0 and edge_list_flat[i, 1] <= num - 1): removed_elements.append(i) res = np.delete(edge_list_flat, removed_elements, axis=0).transpose() res[0] = res[0] + i_index - 1 res[1] = res[1] - num + j_index - 1 if res.shape[1] == 0: res = np.zeros((2, 1), dtype=np.int) edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] graph.add_edges_from(edges) # partial connected components subgraphs = nx.connected_components(graph) comp = [g for g in subgraphs] return comp # pylint: disable=arguments-differ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, cutoff=15.0): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** from member variables stored in ``self``. Changing them during a run will result in undefined behavior. `ts` and any of the atomgroups can be changed (but changes will be overwritten when the next time step is read). Parameters ---------- scheduler_kwargs : Dask Scheduler parameters. cutoff : float (optional) head group-defining atoms within a distance of `cutoff` Angstroms are deemed to be in the same leaflet [15.0] Returns ------- values : anything The output from the computation over a single frame must be returned. The `value` will be added to a list for each block and the list of blocks is stored as :attr:`_results` before :meth:`_conclude` is run. In order to simplify processing, the `values` should be "simple" shallow data structures such as arrays or lists of numbers. """ # Get positions of the atoms in the atomgroup and find their number. atoms = ts.positions[atomgroups.indices] matrix_size = atoms.shape[0] arranged_coord = list() part_size = int(matrix_size / n_jobs) # Partition the data based on a 2-dimensional partitioning for i in range(1, matrix_size + 1, part_size): for j in range(i, matrix_size + 1, part_size): arranged_coord.append(([atoms[i - 1:i - 1 + part_size], atoms[j - 1:j - 1 + part_size]], [i, j])) # Distribute the data over the available cores, apply the map function # and execute. parAtoms = db.from_sequence(arranged_coord, npartitions=len(arranged_coord)) parAtomsMap = parAtoms.map_partitions(self._find_connected_components, cutoff=cutoff) Components = parAtomsMap.compute(**scheduler_kwargs) # Gather the results and start the reduction. TODO: think if it can go # to the private _reduce method of the based class. result = list(Components) # Create the overall connected components of the graph while len(result) != 0: item1 = result[0] result.pop(0) ind = [] for i, item2 in enumerate(Components): if item1.intersection(item2): item1 = item1.union(item2) ind.append(i) ind.reverse() for j in ind: Components.pop(j) Components.append(item1) # Change output for and return. indices = [np.sort(list(g)) for g in Components] return indices # pylint: disable=arguments-differ
[docs] def run(self, start=None, stop=None, step=None, n_jobs=-1, cutoff=15.0): """Perform the calculation Parameters ---------- start : int, optional start frame of analysis stop : int, optional stop frame of analysis step : int, optional number of frames to skip between each analysed frame n_jobs : int, optional number of tasks to start, if `-1` use number of logical cpu cores. This argument will be ignored when the distributed scheduler is used """ # are we using a distributed scheduler or should we use # multiprocessing? scheduler = dask.config.get('scheduler', None) if scheduler is None: # maybe we can grab a global worker try: scheduler = dask.distributed.worker.get_client() except ValueError: pass if n_jobs == -1: n_jobs = cpu_count() # we could not find a global scheduler to use and we ask for a single # job. Therefore we run this on the single threaded scheduler for # debugging. if scheduler is None and n_jobs == 1: scheduler = 'single-threaded' # fall back to multiprocessing, we tried everything if scheduler is None: scheduler = 'multiprocessing' scheduler_kwargs = {'scheduler': scheduler} if scheduler == 'multiprocessing': scheduler_kwargs['num_workers'] = n_jobs with timeit() as b_universe: universe = mda.Universe(self._top, self._traj) start, stop, step = self._trajectory.check_slice_indices( start, stop, step) with timeit() as total: with timeit() as prepare: self._prepare() with self.readonly_attributes(): timings = list() times_io = [] for frame in range(start, stop, step): with timeit() as b_io: ts = universe.trajectory[frame] times_io.append(b_io.elapsed) with timeit() as b_compute: components = self. \ _single_frame(ts=ts, atomgroups=self._atomgroup, scheduler_kwargs=scheduler_kwargs, n_jobs=n_jobs, cutoff=cutoff) timings.append(b_compute.elapsed) leaflet1 = self._atomgroup[components[0]] leaflet2 = self._atomgroup[components[1]] self._results.append([leaflet1, leaflet2]) with timeit() as conclude: self._conclude() self.timing = Timing(times_io, np.hstack(timings), total.elapsed, b_universe.elapsed, prepare.elapsed, conclude.elapsed) return self
def _conclude(self): self.results = self._results