Source code for mdtools.dynamics

# This file is part of MDTools.
# Copyright (C) 2021, The MDTools Development Team and all contributors
# listed in the file AUTHORS.rst
#
# MDTools is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# MDTools is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with MDTools.  If not, see <http://www.gnu.org/licenses/>.


"""
Functions to calculate dynamic properties.

This module can be called from :mod:`mdtools` via the shortcut ``dyn``::

    import mdtools as mdt
    mdt.dyn  # insetad of mdt.dynamics

"""


# Standard libraries
import warnings
from copy import deepcopy
from datetime import datetime, timedelta
# Third party libraries
import psutil
import numpy as np
from scipy import sparse
# Local application/library specific imports
import mdtools as mdt


[docs] def msd(t, D, d=3): r""" Mean square displacement (MSD) as fuction of time according to the Einstein relation: .. math:: \langle \Delta\mathbf{r}^2(\tau) \rangle = 2d \cdot D \cdot \tau with :math:`\Delta\mathbf{r}(\tau) = |\mathbf{r}(t_0+\tau) - \mathbf{r}(t_0)|` being the displacement vector after a diffusion time :math:`\tau`, :math:`D` being the diffusion coefficient and :math:`d` being the number of spatial dimensions of the diffusion process. The brackets :math:`\langle ... \rangle` denote averaging over all particles of the same species and over all possible starting times :math:`t_0`. Parameters ---------- t : scalar or numpy.ndarray Array of diffusion times :math:`\tau` for which to calculate the MSD. D : scalar or numpy.ndarray Diffustion coefficient. d : scalar or numpy.ndarray, optional Number of spatial dimensions of the diffusion process. Returns ------- msd : scalar or numpy.ndarray Mean square displacement MSD(t) = 2d*D*t. See Also -------- :func:`mdtools.dynamics.msd_non_diffusive` : Calculate a non-diffusive MSD as function of time. Notes ----- If more than one input argument is a :class:`numpy.ndarray`, the arrays must be broadcastable to a common shape. Examples -------- >>> mdt.dyn.msd(t=np.arange(4), D=2, d=3) array([ 0, 12, 24, 36]) """ return 2 * d * D * t
[docs] def msd_non_diffusive(t, a, x): r""" Non-diffusive Mean Square Displacement (MSD) as fuction of time. .. math:: \langle \Delta\mathbf{r}^2(\tau) \rangle = a \tau^x with :math:`\Delta\mathbf{r}(\tau) = |\mathbf{r}(t_0+\tau) - \mathbf{r}(t_0)|` being the displacement vector after a diffusion time :math:`\tau`. :math:`a` and :math:`x` are just parameters. If :math:`x=1`, the equation describes diffusive motion. In this case, :math:`a` is equal to :math:`2dD`, with :math:`D` being the diffusion coefficient and :math:`d` being the number of spatial dimensions of the diffusion process. The brackets :math:`\langle ... \rangle` denote averaging over all particles of the same species and over all possible starting times :math:`t_0`. Parameters ---------- t : scalar or numpy.ndarray Array of times for which to calculate the (non-diffusive) MSD. a : scalar or numpy.ndarray Prefactor. x : scalar or numpy.ndarray Exponent. Returns ------- msd : scalar or numpy.ndarray (Non-diffusive) mean square displacement MSD(t) = a*t^x. See Also -------- :func:`mdtools.dynamics.msd` : Calculate the diffusive MSD according to the Einstin relation as function of time Notes ----- If more than one input argument is a :class:`numpy.ndarray`, the arrays must be broadcastable to a common shape. Examples -------- >>> mdt.dyn.msd_non_diffusive(t=np.arange(4), a=2, x=3) array([ 0, 2, 16, 54]) """ return a * t**x
[docs] def n_restarts(n_frames, restart): """ Get the number of restarting points for each lag time. Useful to normalize quantities that depend on a lag time and should be averaged over all possible restarting times. The most prominent example for such a quantity is the Mean Square Displacement (MSD), another example is autocorrelation functions. Parameters ---------- n_frames : int Number of frames in the trajectory. Must be greater than zero. restart : int Number of frames between restarting points. Must be greater than zero and should be less than `n_frames`. Returns ------- n_restarts : numpy.ndarray Array of shape ``(n_frames,)`` and dtype :attr:`numpy.uint32` containing the number of restarting points for each lag time. Examples -------- >>> n_frames = 5 >>> for restart in range(1, n_frames+1): ... mdt.dyn.n_restarts(n_frames, restart) array([5, 4, 3, 2, 1], dtype=uint32) array([3, 2, 2, 1, 1], dtype=uint32) array([2, 2, 1, 1, 1], dtype=uint32) array([2, 1, 1, 1, 1], dtype=uint32) array([1, 1, 1, 1, 1], dtype=uint32) >>> # 0 1 2 3 4 Corresponding lag times [in trajectory frames] :func:`mdtools.dynamics.n_restarts` is equivalent to but faster than the following nested loops >>> n_frames, restart = 5, 2 >>> n_restarts = np.zeros(n_frames, dtype=np.uint32) >>> for t0 in range(0, n_frames, restart): ... for lag in range(n_frames-t0): ... n_restarts[lag] += 1 >>> n_restarts array([3, 2, 2, 1, 1], dtype=uint32) >>> np.array_equal(n_restarts, mdt.dyn.n_restarts(n_frames, restart)) True The nested loop from above can also be reduced to a single loop. But still, :func:`mdtools.dynamics.n_restarts` is faster. >>> n_frames, restart = 5, 2 >>> n_restarts = np.zeros(n_frames, dtype=np.uint32) >>> for t0 in range(0, n_frames, restart): ... n_restarts[:n_frames-t0] += 1 >>> n_restarts array([3, 2, 2, 1, 1], dtype=uint32) >>> np.array_equal(n_restarts, mdt.dyn.n_restarts(n_frames, restart)) True """ if n_frames <= 0: raise ValueError("n_frames must be greater than zero") if restart <= 0: raise ValueError("restart must be greater than zero") if restart > n_frames: warnings.warn("The number of frames between restarting points" " ({}) is equal to or greater than the number of" " frames in the trajectory ({})" .format(restart, n_frames), RuntimeWarning) n_restarts = np.arange(n_frames, 0, -1, dtype=np.float32) n_restarts /= restart np.ceil(n_restarts, out=n_restarts) return n_restarts.astype(np.uint32, copy=False)
[docs] def correct_intermittency( list_of_arrays, intermittency, sparse2array=True, inplace=True, verbose=True, debug=False): r""" Correct for intermittent behavior of discrete variables stored in a sequence of arrays. Analogue of :func:`MDAnalysis.lib.correlations.correct_intermittency` for sequences of arbitrarily shaped arrays (e.g. a list of contact matrices as generated by :func:`mdtools.structure.contact_matrices`). Fill gaps between same values with a gap size smaller or equal to a given maximum gap size with the enclosing values. Parameters ---------- list_of_arrays : tuple or list or numpy.ndarray Sequence of arrays (usually one array per trajectory frame). All arrays in the sequence must have the same shape and must be either :class:`NumPy arrays <numpy.ndarray>` or :mod:`SciPy sparse matrices <scipy.sparse>`. A mix of :class:`NumPy arrays <numpy.ndarray>` and :mod:`SciPy sparse matrices <scipy.sparse>` is not possible. intermittency : int The maximum allowed gap size. All gaps with a size smaller or equal to the maximum gap size are considered to be negligible and are filled with their enclosing values. If `intermittency` is less than one, `list_of_arrays` will be returned without any changes. sparse2array : bool, optional If ``True``, :mod:`SciPy sparse matrices <scipy.sparse>` are internally converted on the fly to :class:`NumPy arrays <numpy.ndarray>` for the computations and back again to :mod:`SciPy sparse matrices <scipy.sparse>` using the sliding window method. The window size is automatically determined by the intermittency value. This conversion can increase the performance considerably, but may also lead to a higher memory consumption. `sparse2array` is ignored, if `list_of_arrays` is already a sequence of :class:`NumPy arrays <numpy.ndarray>`. The back conversion to :mod:`SciPy sparse matrices <scipy.sparse>` will always convert back to the format of the first sparse matrix in `list_of_arrays`. inplace : bool, optional If ``True``, modify `list_of_arrays` inplace instead of creating a copy of `list_of_arrays`. verbose : bool, optional If ``True``, print progress information to standard output. debug : bool, optional If ``True``, run in :ref:`debug mode <debug-mode-label>`. .. deprecated:: 0.0.0.dev0 This argument is without use and will be removed in a future release. Returns ------- list_of_arrays : tuple or list or numpy.ndarray The input sequence of arrays with gaps of size ``<=intermittency`` between same values filled with the enclosing values. See Also -------- :func:`mdtools.dynamics.correct_intermittency_1d` : Correct for intermittent behavior of discrete variables stored in a 1-dimensional array :func:`mdtools.dynamics.replace_short_sequences` : Replace consecutive occurrences of the same value that are shorter than a given minimum length by a given value :func:`mdtools.dynamics.replace_short_sequences_global` : TODO Notes ----- Possible use case of this function: Calculating lifetimes from the autocorrelation of contact matrices (autocorrelation of the existence function). Lifetimes may be calculated either with a strict continuous requirement or a less strict intermittency. If you are for instance calculating the lifetime of a ligand around a metal ion, in the former case the ligand must be within a cutoff distance of the metal ion at every frame from :math:`t_0` to :math:`t_0 + \tau` in order to be considered present at :math:`t_0 + \tau`. In the intermittent case, the ligand is allowed to leave the region of interest for up to a specified consecutive number of frames whilst still being considered present at :math:`t_0 + \tau`. For example, if a ligand is absent for a number of frames equal to or smaller than the parameter `intermittency`, then this absence will be removed and thus the ligand is considered to have not left. For instance, T,F,F,T with `intermittency` set to ``2`` will be replaced by T,T,T,T, where T=True=present, F=False=absent. On the other hand, F,T,T,F will be replaced by F,F,F,F. Note the following behaviour when the elements of the arrays in `list_of_arrays` can take more than two different values: 0,1,1,0,1,1,2 will be replaced by 0,0,0,0,1,1,2 with `intermittency=2`, because the last 1,1 block is not enclosed by the same value. If you want 0,1,1,0,1,1,2 to be replaced by 0,0,0,0,0,0,0, take a look at :func:`mdtools.dynamics.replace_short_sequences`. If you want to apply this function on a 1-dimensional array, parse the array as ``numpy.expand_dims(array, axis=0).T`` and convert the output back to a 1-dimensional array via ``np.squeeze(result.T)``. However, we recommend to use :func:`mdtools.dynamics.correct_intermittency_1d` instead, which is a fast version of :func:`mdtools.dynamics.correct_intermittency` for 1-dimensional arrays. The `intermittency` value must not exceed 65534 (``np.uint16(-1) - 1``). If you really need a higher intermittency value, change the dtype of ``seen_nframes_ago`` in the source code of this function. Be aware that high intermittency values increase the computational cost. Ask yourself if you could not better read-in only every ``intermittency/2``-th frame and set `intermittency` to one or even read-in only every `intermittency`-th frame and set `intermittency` to zero. Examples -------- >>> a = np.array([[0, 0, 0], ... [1, 1, 1]]) >>> b = np.array([[1, 1, 1], ... [0, 0, 0]]) >>> c = np.array([[1, 0, 1], ... [0, 1, 0]]) >>> d = np.array([[0, 1, 0], ... [1, 0, 1]]) >>> x = (a, b, c, d) >>> y = mdt.dyn.correct_intermittency(x, ... intermittency=1, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[0 0 0] [1 1 1]] [[1 0 1] [0 1 0]] [[1 0 1] [0 1 0]] [[0 1 0] [1 0 1]] >>> y = mdt.dyn.correct_intermittency(x, ... intermittency=2, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[0 0 0] [1 1 1]] [[0 0 0] [1 1 1]] [[0 0 0] [1 1 1]] [[0 1 0] [1 0 1]] An `intermittency` value larger than ``len(list_of_arrays)-2`` leads to the same result as an `intermittency` value of ``len(list_of_arrays)-2``. The first and the last arrays are never changed: >>> y = mdt.dyn.correct_intermittency(x, ... intermittency=len(x)-1, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[0 0 0] [1 1 1]] [[0 0 0] [1 1 1]] [[0 0 0] [1 1 1]] [[0 1 0] [1 0 1]] >>> np.array_equal(y[0], a) True >>> np.array_equal(y[3], d) True If intermittency is less than one, the input `list_of_arrays` is returned without any changes: >>> y = mdt.dyn.correct_intermittency(x, ... intermittency=0, ... inplace=False, ... verbose=False) >>> all(np.array_equal(x[i], arr) for i, arr in enumerate(y)) True Using :mod:`SciPy sparse matrices <scipy.sparse>`: >>> from scipy.sparse import csr_matrix >>> x_sparse = [csr_matrix(arr) for arr in x] >>> y = mdt.dyn.correct_intermittency(x_sparse, ... intermittency=2, ... sparse2array=False, ... inplace=False, ... verbose=False) >>> for csr_mat in y: ... print(csr_mat.toarray()) [[0 0 0] [1 1 1]] [[0 0 0] [1 1 1]] [[0 0 0] [1 1 1]] [[0 1 0] [1 0 1]] >>> y = mdt.dyn.correct_intermittency(x_sparse, ... intermittency=2, ... sparse2array=True, ... inplace=False, ... verbose=False) >>> for csr_mat in y: ... print(csr_mat.toarray()) [[0 0 0] [1 1 1]] [[0 0 0] [1 1 1]] [[0 0 0] [1 1 1]] [[0 1 0] [1 0 1]] Inplace modification: >>> y = mdt.dyn.correct_intermittency(x, ... intermittency=2, ... inplace=True, ... verbose=False) >>> x is y True """ if verbose: timer = datetime.now() proc = psutil.Process() proc.cpu_percent() # Initiate monitoring of CPU usage print("Running mdtools.dynamics.correct_intermittency()...") if intermittency > np.uint16(-1) - 1: raise ValueError("intermittency must be less than {}, which is" " np.uint16(-1)-1. If you really need a higher" " intermittency value, change the dtype of" " 'seen_nframes_ago' in the source code of this" " function".format(np.uint16(-1) - 1)) mdt.check.list_of_cms(list_of_arrays) if not inplace: list_of_arrays = deepcopy(list_of_arrays) if intermittency <= 0: if verbose: print("mdtools.dynamics.correct_intermittency() done") print("Elapsed time: {}".format(datetime.now() - timer)) print("CPU time: {}" .format(timedelta(seconds=sum(proc.cpu_times()[:4])))) print("CPU usage: {:.2f} %" .format(proc.cpu_percent())) print("Current memory usage: {:.2f} MiB" .format(proc.memory_info().rss / 2**20)) return list_of_arrays if sparse.issparse(list_of_arrays[0]): issparse = True else: issparse = False sparse2array = False if issparse and sparse2array: sparse_format = list_of_arrays[0].format stop = min(intermittency + 2, len(list_of_arrays)) for i in range(0, stop - 1): list_of_arrays[i] = list_of_arrays[i].toarray() if verbose: print("Arrays to process: {}".format(len(list_of_arrays))) print("Shape of the arrays: {}".format(list_of_arrays[0].shape)) loa = mdt.rti.ProgressBar(list_of_arrays, unit="arrays") else: loa = list_of_arrays for i, a in enumerate(loa): ix_max = i + intermittency + 1 if sparse2array and ix_max < len(list_of_arrays): list_of_arrays[ix_max] = list_of_arrays[ix_max].toarray() seen_nframes_ago = np.zeros(a.shape, dtype=np.uint16) stop = min(intermittency + 2, len(list_of_arrays) - i) for j in range(1, stop): mask = (list_of_arrays[i + j] != a) if issparse and not sparse2array: mask = mask.toarray() seen_nframes_ago += mask seen_nframes_ago[~mask] = 0 for j in range(1, stop - 1): mask = (seen_nframes_ago <= stop - 2 - j) list_of_arrays[i + j][mask] = a[mask] if sparse2array: if sparse_format == 'bsr': list_of_arrays[i] = sparse.bsr_matrix(list_of_arrays[i]) elif sparse_format == 'coo': list_of_arrays[i] = sparse.coo_matrix(list_of_arrays[i]) elif sparse_format == 'csc': list_of_arrays[i] = sparse.csc_matrix(list_of_arrays[i]) elif sparse_format == 'csr': list_of_arrays[i] = sparse.csr_matrix(list_of_arrays[i]) elif sparse_format == 'dia': list_of_arrays[i] = sparse.dia_matrix(list_of_arrays[i]) elif sparse_format == 'dok': list_of_arrays[i] = sparse.dok_matrix(list_of_arrays[i]) elif sparse_format == 'lil': list_of_arrays[i] = sparse.lil_matrix(list_of_arrays[i]) else: raise ValueError("Unknown scipy sparse matrix format" " ({}). Allowed formats: bsr, coo, csc," " csr, dia, dok, lil" .format(sparse_format)) if verbose: progress_bar_mem = proc.memory_info().rss / 2**20 loa.set_postfix_str("{:>7.2f}MiB".format(progress_bar_mem), refresh=False) if verbose: loa.close() print("mdtools.dynamics.correct_intermittency() done") print("Elapsed time: {}".format(datetime.now() - timer)) print("CPU time: {}" .format(timedelta(seconds=sum(proc.cpu_times()[:4])))) print("CPU usage: {:.2f} %" .format(proc.cpu_percent())) print("Current memory usage: {:.2f} MiB" .format(proc.memory_info().rss / 2**20)) return list_of_arrays
[docs] def correct_intermittency_1d(a, intermittency, inplace=True, debug=False): """ Correct for intermittent behavior of discrete variables stored in a 1-dimensional array. Fill gaps between same values with a gap size smaller or equal to a given maximum gap size with the enclosing values. For instance, 0,1,1,0,1,1,2 is replaced by 0,0,0,0,1,1,2, when `intermittency` is set to ``2``. Parameters ---------- a : array_like 1-dimensional array, in which to fill gaps between same values with the enclosing values. For higher dimensional arrays use :func:`mdtools.dynamics.correct_intermittency`. intermittency : int The maximum allowed gap size. All gaps with a size smaller or equal to the maximum gap size are considered to be negligible and are filled with their enclosing values. If `intermittency` is less than one, `a` will be returned without any changes. inplace : bool, optional If ``True`` (default), modify `a` inplace instead of creating a copy of `a`. Works only, if `a` is a :class:`numpy.ndarray`. debug : bool, optional If ``True``, check the input arguments. .. deprecated:: 0.0.0.dev0 This argument is without use and will be removed in a future release. Returns ------- a : numpy.ndarray The input array as :class:`numpy.ndarray` with gaps of size ``<=intermittency`` between same values filled with the enclosing values. See Also -------- :func:`mdtools.dynamics.correct_intermittency` : Correct for intermittent behavior of discrete variables stored in a sequence of arrays :func:`mdtools.dynamics.replace_short_sequences` : Replace consecutive occurrences of the same value that are shorter than a given minimum length by a given value :func:`mdtools.dynamics.replace_short_sequences_global` : TODO Notes ----- This is a faster version of :func:`mdtools.dynamics.correct_intermittency` for 1-dimensional arrays. See there for more details. Examples -------- >>> a = np.array([1, 2, 2, 1, 3, 3, 3, 1, 3, 3, 3]) >>> mdt.dyn.correct_intermittency_1d(a, ... intermittency=1, ... inplace=False) array([1, 2, 2, 1, 3, 3, 3, 3, 3, 3, 3]) >>> mdt.dyn.correct_intermittency_1d(a, ... intermittency=2, ... inplace=False) array([1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3]) >>> mdt.dyn.correct_intermittency_1d(a, ... intermittency=3, ... inplace=False) array([1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3]) An `intermittency` value larger than ``len(a)-2`` leads to the same result as an `intermittency` value of ``len(a)-2``. The first and the last elements are never changed: >>> mdt.dyn.correct_intermittency_1d(a, ... intermittency=len(a)-2, ... inplace=False) array([1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3]) >>> mdt.dyn.correct_intermittency_1d(a, ... intermittency=len(a)-1, ... inplace=False) array([1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3]) If intermittency is less than one, the input array is returned as :class:`numpy.ndarray`: >>> mdt.dyn.correct_intermittency_1d(a, ... intermittency=0, ... inplace=False) array([1, 2, 2, 1, 3, 3, 3, 1, 3, 3, 3]) Inplace modification: >>> b = mdt.dyn.correct_intermittency_1d(a, ... intermittency=3, ... inplace=True) >>> a is b True """ a = np.array(a, copy=(not inplace)) if a.ndim > 1: raise ValueError("The input array must be 1-dimensional") if intermittency <= 0: return a for i, init_val in enumerate(a): stop = min(intermittency + 2, len(a) - i) for j, final_val in enumerate(a[i + 2:i + stop][::-1]): if final_val == init_val: a[i + 1:i + stop - 1 - j] = init_val break return a
[docs] def replace_short_sequences( list_of_arrays, min_len, val=0, inplace=True, verbose=False, debug=False): """ Replace consecutive occurrences of the same value that are shorter than a given minimum length by a given value. For instance, the sequence 1,3,3,3,2,2,1 is replaced by 0,3,3,3,0,0,0, when `min_len` is set to ``3`` and `val` is set to ``0``. Parameters ---------- list_of_arrays : tuple or list or numpy.ndarray Sequence of arrays (usually one array per trajectory frame). All arrays in the sequence must have the same shape and all arrays must be :class:`NumPy arrays <numpy.ndarray>`. min_len : int All consecutive occurrences **shorter** than `min_len` will be replaced by `val`. If `min_len` is less than two, `list_of_arrays` will be returned without any changes. val : scalar, optional The value used to replace the values in consecutive occurrences shorter than `min_len`. Be aware that the arrays in `list_of_arrays` must have a suitable dtype to be able to hold `val`. inplace : bool, optional If ``True``, modify `list_of_arrays` inplace instead of creating a copy of `list_of_arrays`. verbose : bool, optional If ``True``, print progress information to standard output. debug : bool, optional If ``True``, check the input arguments. .. deprecated:: 0.0.0.dev0 This argument is without use and will be removed in a future release. Returns ------- list_of_arrays : tuple or list or numpy.ndarray List of arrays with consecutive occurrences shorter than `min_len` replaced by `val`. See Also -------- :func:`mdtools.dynamics.correct_intermittency` : Correct for intermittent behavior of discrete variables stored in a sequence of arrays :func:`mdtools.dynamics.correct_intermittency_1d` : Correct for intermittent behavior of discrete variables stored in a 1-dimensional array :func:`mdtools.dynamics.replace_short_sequences_global` : TODO Notes ----- If you want to replace consecutive occurrences in a 1-dimensional array, parse the array as ``np.expand_dims(array, axis=0).T`` and convert the output back to a 1-dimensional array via ``np.squeeze(result.T)``. `min_len` must not exceed 65535 (``np.uint16(-1)``). If you really need a higher value for `min_len`, change the dtype of ``seen_nframes_ago`` and ``seen_nframes_ahead`` in the source code of this function. Be aware that high values of `min_len` increase the computational cost. Examples -------- >>> a = np.array([[3, 1, 2], ... [1, 2, 3]]) >>> b = np.array([[3, 2, 2], ... [2, 2, 3]]) >>> c = np.array([[3, 2, 1], ... [2, 1, 3]]) >>> x = (a, b, c) >>> y = mdt.dyn.replace_short_sequences(x, ... min_len=2, ... val=0, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[3 0 2] [0 2 3]] [[3 2 2] [2 2 3]] [[3 2 0] [2 0 3]] >>> y = mdt.dyn.replace_short_sequences(x, ... min_len=3, ... val=0, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[3 0 0] [0 0 3]] [[3 0 0] [0 0 3]] [[3 0 0] [0 0 3]] If `min_len` is greater than ``len(list_of_arrays)``, all elements of all arrays are replaced by `val`: >>> y = mdt.dyn.replace_short_sequences(x, ... min_len=len(x)+1, ... val=0, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[0 0 0] [0 0 0]] [[0 0 0] [0 0 0]] [[0 0 0] [0 0 0]] If `min_len` is less than two, the input `list_of_arrays` is returned without any changes: >>> y = mdt.dyn.replace_short_sequences(x, ... min_len=1, ... val=0, ... inplace=False, ... verbose=False) >>> all(np.array_equal(x[i], arr) for i, arr in enumerate(y)) True Inplace modification: >>> y = mdt.dyn.replace_short_sequences(x, ... min_len=3, ... val=0, ... inplace=True, ... verbose=False) >>> x is y True """ if verbose: timer = datetime.now() proc = psutil.Process() proc.cpu_percent() # Initiate monitoring of CPU usage print("Running mdtools.dynamics.replace_short_sequences()...") if min_len > np.uint16(-1): raise ValueError("min_len must be less than 65535, which is" " np.uint16(-1). If you really need a higher" " value for 'min_len', change the dtype of" " 'seen_nframes_ago' and 'seen_nframes_ahead'" " in the source code of this function") mdt.check.list_of_cms(list_of_arrays) if not inplace: list_of_arrays = deepcopy(list_of_arrays) if min_len < 2: if verbose: print("mdtools.dynamics.replace_short_sequences() done") print("Elapsed time: {}".format(datetime.now() - timer)) print("CPU time: {}" .format(timedelta(seconds=sum(proc.cpu_times()[:4])))) print("CPU usage: {:.2f} %" .format(proc.cpu_percent())) print("Current memory usage: {:.2f} MiB" .format(proc.memory_info().rss / 2**20)) return list_of_arrays if verbose: print("Arrays to process: {}".format(len(list_of_arrays))) print("Shape of the arrays: {}".format(list_of_arrays[0].shape)) i = 0 a = list_of_arrays[i] seen_nframes_ago = np.ones(a.shape, dtype=np.uint16) seen_nframes_ahead = np.zeros(a.shape, dtype=np.uint16) seen_before = np.ones(a.shape, dtype=bool) for j in range(1, min(min_len, len(list_of_arrays) - i)): seen = (list_of_arrays[i + j] == a) seen_nframes_ahead += (seen & seen_before) seen_before = seen mask = ((seen_nframes_ago + seen_nframes_ahead) < min_len) a[mask] = val seen_nframes_ago += 1 if verbose: loa = mdt.rti.ProgressBar(list_of_arrays[1:], initial=1, unit="arrays") else: loa = list_of_arrays[1:] for i, a in enumerate(loa, start=1): seen_nframes_ago[a != list_of_arrays[i - 1]] = 1 seen_nframes_ahead = np.zeros(a.shape, dtype=np.uint16) seen_before = np.ones(a.shape, dtype=bool) for j in range(1, min(min_len, len(list_of_arrays) - i)): seen = (list_of_arrays[i + j] == a) seen_nframes_ahead += (seen & seen_before) seen_before = seen mask = ((seen_nframes_ago + seen_nframes_ahead) < min_len) a[mask] = val seen_nframes_ago += 1 if verbose: progress_bar_mem = proc.memory_info().rss / 2**20 loa.set_postfix_str("{:>7.2f}MiB".format(progress_bar_mem), refresh=False) if verbose: loa.close() print("mdtools.dynamics.replace_short_sequences() done") print("Elapsed time: {}".format(datetime.now() - timer)) print("CPU time: {}" .format(timedelta(seconds=sum(proc.cpu_times()[:4])))) print("CPU usage: {:.2f} %" .format(proc.cpu_percent())) print("Current memory usage: {:.2f} MiB" .format(proc.memory_info().rss / 2**20)) return list_of_arrays
[docs] def replace_short_sequences_global( list_of_arrays, min_len, val=0, inplace=True, verbose=False, debug=False): """ Replace consecutive occurrences of the same value that are shorter than a given minimum length by a given value at any location of the arrays. Parameters ---------- list_of_arrays : tuple or list or numpy.ndarray Sequence of arrays (usually one array per trajectory frame). All arrays in the sequence must have the same shape and all arrays must be :class:`NumPy arrays <numpy.ndarray>`. min_len : int All consecutive occurrences **shorter** than `min_len` will be replaced by `val`. If `min_len` is less than two, `list_of_arrays` will be returned without any changes. val : scalar, optional The value used to replace the values in consecutive occurrences shorter than `min_len`. Be aware that the arrays in `list_of_arrays` must have a suitable dtype to be able to hold `val`. inplace : bool, optional If ``True``, modify `list_of_arrays` inplace instead of creating a copy of `list_of_arrays`. verbose : bool, optional If ``True``, print progress information to standard output. debug : bool, optional If ``True``, check the input arguments. .. deprecated:: 0.0.0.dev0 This argument is without use and will be removed in a future release. See Also -------- :func:`mdtools.dynamics.correct_intermittency` : Correct for intermittent behavior of discrete variables stored in a sequence of arrays :func:`mdtools.dynamics.correct_intermittency_1d` : Correct for intermittent behavior of discrete variables stored in a 1-dimensional array :func:`mdtools.dynamics.replace_short_sequences` : Replace consecutive occurrences of the same value that are shorter than a given minimum length by a given value Notes ----- This function is very similar to :func:`mdtools.dynamics.replace_short_sequences`, but here consecutive occurrences of the same value are not checked between elements at the same location of the arrays in `list_of_arrays` but all elements of the arrays in `list_of_arrays` are taken into account. `min_len` must not exceed 65535 (``np.uint16(-1)``). If you really need a higher value for `min_len`, change the dtype of ``seen_nframes_ago`` and ``seen_nframes_ahead`` in the source code of this function. Be aware that high values of `min_len` increase the computational cost. Examples -------- >>> a = np.array([[1, 2, 3], ... [1, 2, 3]]) >>> b = np.array([[2, 2, 3], ... [2, 2, 3]]) >>> c = np.array([[3, 3, 3], ... [3, 3, 3]]) >>> x = (a, b, c) >>> y = mdt.dyn.replace_short_sequences_global(x, ... min_len=2, ... val=0, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[0 2 3] [0 2 3]] [[2 2 3] [2 2 3]] [[3 3 3] [3 3 3]] >>> y = mdt.dyn.replace_short_sequences_global(x, ... min_len=3, ... val=0, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[0 0 3] [0 0 3]] [[0 0 3] [0 0 3]] [[3 3 3] [3 3 3]] If `min_len` is greater than ``len(list_of_arrays)``, all elements of all arrays are replaced by `val`: >>> y = mdt.dyn.replace_short_sequences_global(x, ... min_len=len(x)+1, ... val=0, ... inplace=False, ... verbose=False) >>> for arr in y: ... print(arr) [[0 0 0] [0 0 0]] [[0 0 0] [0 0 0]] [[0 0 0] [0 0 0]] If `min_len` is less than two, the input `list_of_arrays` is returned without any changes: >>> y = mdt.dyn.replace_short_sequences_global(x, ... min_len=1, ... val=0, ... inplace=False, ... verbose=False) >>> all(np.array_equal(x[i], arr) for i, arr in enumerate(y)) True Inplace modification: >>> y = mdt.dyn.replace_short_sequences_global(x, ... min_len=3, ... val=0, ... inplace=True, ... verbose=False) >>> x is y True """ if verbose: timer = datetime.now() proc = psutil.Process() proc.cpu_percent() # Initiate monitoring of CPU usage print("Running mdtools.dynamics.replace_short_sequences_global()...") if min_len > np.uint16(-1): raise ValueError("min_len must be less than 65535, which is" " np.uint16(-1). If you really need a higher" " value for 'min_len', change the dtype of" " 'seen_nframes_ago' and 'seen_nframes_ahead'" " in the source code of this function") mdt.check.list_of_cms(list_of_arrays) if not inplace: list_of_arrays = deepcopy(list_of_arrays) if min_len < 2: if verbose: print("mdtools.dynamics.replace_short_sequences_global()" " done") print("Elapsed time: {}".format(datetime.now() - timer)) print("CPU time: {}" .format(timedelta(seconds=sum(proc.cpu_times()[:4])))) print("CPU usage: {:.2f} %" .format(proc.cpu_percent())) print("Current memory usage: {:.2f} MiB" .format(proc.memory_info().rss / 2**20)) return list_of_arrays if verbose: print("Arrays to process: {}".format(len(list_of_arrays))) print("Shape of the arrays: {}".format(list_of_arrays[0].shape)) i = 0 a = list_of_arrays[i] init_values = np.unique(a) seen_continuously = init_values seen_nframes_ago = np.ones(init_values.shape, dtype=np.uint16) seen_nframes_ahead = np.zeros(init_values.shape, dtype=np.uint16) for j in range(1, min(min_len, len(list_of_arrays) - i)): seen_values = np.unique(list_of_arrays[i + j]) seen_continuously = np.intersect1d(seen_continuously, seen_values) mask = np.isin(init_values, test_elements=seen_continuously, assume_unique=True) seen_nframes_ahead += mask mask = ((seen_nframes_ago + seen_nframes_ahead) < min_len) a[np.isin(a, init_values[mask])] = val seen_nframes_ago += 1 if verbose: loa = mdt.rti.ProgressBar(list_of_arrays[1:], initial=1, unit="arrays") else: loa = list_of_arrays[1:] for i, a in enumerate(loa, start=1): init_values_prev = init_values seen_nframes_ago_prev = seen_nframes_ago init_values = np.unique(a) seen_continuously = init_values seen_nframes_ago = np.ones(init_values.shape, dtype=np.uint16) seen_nframes_ahead = np.zeros(init_values.shape, dtype=np.uint16) mask_prev = np.isin(init_values_prev, test_elements=init_values, assume_unique=True) mask = np.isin(init_values, test_elements=init_values_prev, assume_unique=True) seen_nframes_ago[mask] = seen_nframes_ago_prev[mask_prev] for j in range(1, min(min_len, len(list_of_arrays) - i)): seen_values = np.unique(list_of_arrays[i + j]) seen_continuously = np.intersect1d(seen_continuously, seen_values) mask = np.isin(init_values, test_elements=seen_continuously, assume_unique=True) seen_nframes_ahead += mask mask = ((seen_nframes_ago + seen_nframes_ahead) < min_len) a[np.isin(a, init_values[mask])] = val seen_nframes_ago += 1 if verbose: progress_bar_mem = proc.memory_info().rss / 2**20 loa.set_postfix_str("{:>7.2f}MiB".format(progress_bar_mem), refresh=False) if verbose: print("mdtools.dynamics.replace_short_sequences_global()" " done") print("Elapsed time: {}".format(datetime.now() - timer)) print("CPU time: {}" .format(timedelta(seconds=sum(proc.cpu_times()[:4])))) print("CPU usage: {:.2f} %" .format(proc.cpu_percent())) print("Current memory usage: {:.2f} MiB" .format(proc.memory_info().rss / 2**20)) return list_of_arrays