Source code for pmda.rdf

# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# PMDA
# Copyright (c) 2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
"""
Radial Distribution Functions --- :mod:`pmda.rdf`
=================================================

This module contains parallel versions of analysis tasks in
:mod:`MDAnalysis.analysis.rdf`.

See Also
--------
MDAnalysis.analysis.rdf


Classes
-------
.. autoclass:: InterRDF
   :members:
   :inherited-members:

"""

from __future__ import absolute_import, division

import numpy as np

from MDAnalysis.lib import distances

from .parallel import ParallelAnalysisBase


[docs]class InterRDF(ParallelAnalysisBase): """Intermolecular pair distribution function Attributes ---------- bins : array The distance :math:`r` at which the distribution function :math:`g(r)` is determined; these are calculated as the centers of the bins that were used for histogramming. rdf : array The value of the pair distribution function :math:`g(r)` at :math:`r`. Parameters ---------- g1 : AtomGroup First AtomGroup g2 : AtomGroup Second AtomGroup nbins : int (optional) Number of bins in the histogram [75] range : tuple or list (optional) The size of the RDF [0.0, 15.0] exclusion_block : tuple (optional) A tuple representing the tile to exclude from the distance array. [None] Example ------- First create the :class:`InterRDF` object, by supplying two AtomGroups then use the :meth:`run` method :: rdf = InterRDF(ag1, ag2) rdf.run() Results are available through the :attr:`bins` and :attr:`rdf` attributes:: plt.plot(rdf.bins, rdf.rdf) The `exclusion_block` keyword allows the masking of pairs from within the same molecule. For example, if there are 7 of each atom in each molecule, the exclusion mask `(7, 7)` can be used. See Also -------- MDAnalysis.analysis.rdf.InterRDF .. versionadded:: 0.2.0 """ # pylint: disable=redefined-builtin # continue to use 'range' as long as MDAnalysis uses it so that # the user interface remains consistent def __init__(self, g1, g2, nbins=75, range=(0.0, 15.0), exclusion_block=None): u = g1.universe super(InterRDF, self).__init__(u, (g1, g2)) # collect all atomgroups with the same trajectory object as universe self.nA = len(g1) self.nB = len(g2) self.rdf_settings = {'bins': nbins, 'range': range} self._exclusion_block = exclusion_block edges = np.histogram([-1], **self.rdf_settings)[1] self.edges = edges self.bins = 0.5 * (edges[:-1] + edges[1:]) # pylint: enable=redefined-builtin def _prepare(self): # Empty histogram to store the RDF self.count = self.bins * 0.0 # Need to know average volume self.volume = 0.0 # Set the max range to filter the search radius self._maxrange = self.rdf_settings['range'][1] def _single_frame(self, ts, atomgroups): g1, g2 = atomgroups u = g1.universe pairs, dist = distances.capped_distance(g1.positions, g2.positions, self._maxrange, box=u.dimensions) # If provided exclusions, create a mask of _result which # lets us take these out. if self._exclusion_block is not None: idxA = pairs[:, 0]//self._exclusion_block[0] idxB = pairs[:, 1]//self._exclusion_block[1] mask = np.where(idxA != idxB)[0] dist = dist[mask] count = np.histogram(dist, **self.rdf_settings)[0] volume = u.trajectory.ts.volume return np.array([count, np.array(volume, dtype=np.float64)]) def _conclude(self, ): self.count = np.sum(self._results[:, 0]) self.volume = np.sum(self._results[:, 1]) # Number of each selection N = self.nA * self.nB # If we had exclusions, take these into account if self._exclusion_block: xA, xB = self._exclusion_block nblocks = self.nA / xA N -= xA * xB * nblocks # Volume in each radial shell vol = np.power(self.edges[1:], 3) - np.power(self.edges[:-1], 3) vol *= 4/3.0 * np.pi # Average number density box_vol = self.volume / self.n_frames density = N / box_vol rdf = self.count / (density * vol * self.n_frames) self.rdf = rdf @property def cdf(self): """Calculate the cumulative distribution functions (CDF). Note that this is the actual count within a given radius, i.e., :math:`N(r)`. Returns ------- cdf : numpy array a numpy array with the same structure as :attr:`rdf` .. versionadded:: 0.3.0 """ cdf = np.cumsum(self.count) / self.n_frames return cdf @staticmethod def _reduce(res, result_single_frame): """ 'add' action for an accumulator""" if isinstance(res, list) and len(res) == 0: # Convert res from an empty list to a numpy array # which has the same shape as the single frame result res = result_single_frame else: # Add two numpy arrays res += result_single_frame return res
class InterRDF_s(ParallelAnalysisBase): """Site-specific intermolecular pair distribution function Arguments --------- u : Universe a Universe that contains atoms in `ags` ags : list a list of pairs of :class:`~MDAnalysis.core.groups.AtomGroup` instances nbins : int (optional) Number of bins in the histogram [75] range : tuple or list (optional) The size of the RDF [0.0, 15.0] Example ------- First create the :class:`InterRDF_s` object, by supplying one Universe and one list of pairs of AtomGroups, then use the :meth:`~InterRDF_s.run` method:: from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT u = mda.Universe(GRO_MEMPROT, XTC_MEMPROT) s1 = u.select_atoms('name ZND and resid 289') s2 = u.select_atoms('(name OD1 or name OD2) and resid 51 and sphzone 5.0 (resid 289)') s3 = u.select_atoms('name ZND and (resid 291 or resid 292)') s4 = u.select_atoms('(name OD1 or name OD2) and sphzone 5.0 (resid 291)') ags = [[s1, s2], [s3, s4]] rdf = InterRDF_s(u, ags) rdf.run() Results are available through the :attr:`bins` and :attr:`rdf` attributes:: plt.plot(rdf.bins, rdf.rdf[0][0][0]) (Which plots the rdf between the first atom in ``s1`` and the first atom in ``s2``) To generate the *cumulative distribution function* (cdf), use the :meth:`~InterRDF_s.get_cdf` method :: cdf = rdf.get_cdf() Results are available through the :attr:'cdf' attribute:: plt.plot(rdf.bins, rdf.cdf[0][0][0]) (Which plots the cdf between the first atom in ``s1`` and the first atom in ``s2``) See Also -------- MDAnalysis.analysis.rdf.InterRDF_s .. versionadded:: 0.2.0 """ # pylint: disable=redefined-builtin # continue to use 'range' as long as MDAnalysis uses it so that # the user interface remains consistent def __init__(self, u, ags, nbins=75, range=(0.0, 15.0), density=True): atomgroups = [] for pair in ags: atomgroups.append(pair[0]) atomgroups.append(pair[1]) super(InterRDF_s, self).__init__(u, atomgroups) # List of pairs of AtomGroups self._density = density self.n = len(ags) self.rdf_settings = {'bins': nbins, 'range': range} ag_shape = [] indices = [] for (ag1, ag2) in ags: indices.append([ag1.indices, ag2.indices]) ag_shape.append([len(ag1), len(ag2)]) self.indices = indices self.ag_shape = ag_shape # pylint: enable=redefined-builtin def _prepare(self): # Empty list to store the RDF count, edges = np.histogram([-1], **self.rdf_settings) self.len = len(count) self.edges = edges self.bins = 0.5 * (edges[:-1] + edges[1:]) # Need to know average volume self.volume = 0.0 self._maxrange = self.rdf_settings['range'][1] def _single_frame(self, ts, atomgroups): ags = [[atomgroups[2*i], atomgroups[2*i+1]] for i in range(self.n)] count = [np.zeros((ag1.n_atoms, ag2.n_atoms, self.len), dtype=np.float64) for ag1, ag2 in ags] for i, (ag1, ag2) in enumerate(ags): u = ag1.universe pairs, dist = distances.capped_distance(ag1.positions, ag2.positions, self._maxrange, box=u.dimensions) for j, (idx1, idx2) in enumerate(pairs): count[i][idx1, idx2, :] = np.histogram(dist[j], **self.rdf_settings)[0] volume = u.trajectory.ts.volume return np.array([np.array(count), np.array(volume, dtype=np.float64)]) def _conclude(self): self.count = np.sum(self._results[:, 0]) self.volume = np.sum(self._results[:, 1]) # Volume in each radial shell vol = np.power(self.edges[1:], 3) - np.power(self.edges[:-1], 3) vol *= 4/3.0 * np.pi # Empty lists to restore indices, RDF rdf = [] for i in range(len(self.ag_shape)): # Number of each selection N = 1 # Average number density box_vol = self.volume / self.n_frames density = N / box_vol if self._density: rdf.append(self.count[i] / (density * vol * self.n_frames)) else: rdf.append(self.count[i] / (vol * self.n_frames)) self.rdf = rdf @property def cdf(self): """Calculate the cumulative distribution functions (CDF) for all sites. Note that this is the actual count within a given radius, i.e., :math:`N(r)`. Returns ------- cdf : list list of arrays with the same structure as :attr:`rdf` .. versionadded:: 0.3.0 Method ``get_cdf()`` was removed and :attr:`cdf` is now a managed attribute that computes the CDF when accessed. """ # Calculate cumulative distribution function # Empty list to restore CDF cdf = [] # cdf is a list of cdf between pairs of AtomGroups in ags for count in self.count: cdf.append(np.cumsum(count, axis=2) / self.n_frames) return cdf @staticmethod def _reduce(res, result_single_frame): """ 'add' action for an accumulator""" if isinstance(res, list) and len(res) == 0: # Convert res from an empty list to a numpy array # which has the same shape as the single frame result res = result_single_frame else: # Add two numpy arrays res += result_single_frame return res