src / pysoilmap / features.py

Source code for pysoilmap.features

"""
Computation of topographic variables from a DEM.

The DEM's axes ordering is assumed to be ``(Y, X)`` in accordance with the
same convention as for :func:`numpy.meshgrid` or
:func:`matplotlib.pyplot.imshow`.
"""

import affine
import numpy as np
import pyproj
import scipy.ndimage as ndimage
import scipy.misc as misc

import functools
import inspect
import numbers


def _cachedmethod(func: callable) -> callable:
    """Simple decorator for creating methods that cache results based on their
    hashable, positional arguments. Expects that ``self._cache`` is a dict."""
    @functools.wraps(func)
    def wrapper(self, *args):
        cache = self._cache.setdefault(func, {})
        try:
            value = cache[args]
        except KeyError:
            value = cache[args] = func(self, *args)
        return value
    wrapper.__signature__ = inspect.signature(func)
    return wrapper


[docs]def diff_finite( dem: np.ndarray, cellsize: tuple, dx: int, dy: int, ) -> np.ndarray: """ Calculate a higher-order derivative of the DEM by finite differencing successively in X and then Y direction. Uses a central differencing scheme. :param dem: DEM (z values at each pixel of a rectangular grid) :param cellsize: pixel ``(width, height)`` :param dx: order of derivative in x direction :param dy: order of derivative in y direction """ wx = misc.central_diff_weights(dx + 1 + dx % 2, dx).reshape(1, -1) wy = misc.central_diff_weights(dy + 1 + dy % 2, dy).reshape(-1, 1) image = _as_float_array(dem) image = ndimage.convolve(image, wx, mode='nearest') image = ndimage.convolve(image, wy, mode='nearest') norm = np.product(cellsize ** np.array([dy, dx])) return image / norm
[docs]def diff_gauss( dem: np.ndarray, cellsize: tuple, dx: int, dy: int, sigma: float = 1, ) -> np.ndarray: """ Applies a Gaussian derivative filter to the given DEM. This is the same as smoothing the DEM with the given lengthscale ``sigma`` and then calculating the nth-order derivatives in the given directions. :param dem: DEM (z coordinates of each pixel of a regular rectangular grid) :param cellsize: pixel ``(width, height)`` :param dx: order of derivative in x direction :param dy: order of derivative in y direction :param sigma: lengthscale in unit of pixels """ norm = np.product(cellsize ** np.array([dy, dx])) image = _as_float_array(dem) image = ndimage.gaussian_filter(image, order=[dy, dx], sigma=sigma) return image / norm
[docs]class Topography: """ Calculates various topographic indices at each pixel from on a given DEM. :param dem: DEM, i.e. z coordinates of each pixel :param cellsize: Size of a pixel given as pair ``(width, height)`` :param diff: differentiation function. Either :func:`diff_finite`, or :func:`diff_gauss` :param indexing: either ``xy`` or ``ij``, same meaning as for ``numpy.meshgrid`` :param transform: list of 6 numbers that describe the transformation. Only used for ``latitude``, ``rad_angle``, and ``sun_exposure``. :param crs: Name of the coordinate system. Only used for ``latitude``, ``rad_angle``, and ``sun_exposure``. :param kwargs: keyword arguments for ``diff``. Usually ``sigma=VALUE`` for the gaussian derivative Note that the DEM is assumed to live on a regular rectangular grid. Note that the object caches some of the calculated quantities for reuse. In order to free memory you should let go of the ``Topography`` instance after calculation of the quantities in which you are interested """ def __init__( self, dem: np.ndarray, cellsize: tuple, diff: callable = diff_finite, indexing: str = 'xy', transform: list = None, crs: str = None, **kwargs, ): self._cache = {} self._dem = _as_float_array(dem) self._diff = diff self._transform = transform self._crs = crs self._kwargs = kwargs if isinstance(cellsize, numbers.Number): self._cellsize = (cellsize, cellsize) elif isinstance(cellsize, (tuple, list)): self._cellsize = cellsize else: raise TypeError( "Expected a number for `cellsize`, but received a {!r}: {!r}" .format(type(cellsize), cellsize)) self._indexing = ''.join(indexing) if self._indexing not in ('xy', 'ij'): raise ValueError("Invalid indexing: {!r}".format(self._indexing)) self._transform = transform and affine.Affine(*transform)
[docs] @_cachedmethod def diff(self, dx: int, dy: int) -> np.ndarray: """Differentiate the DEM ``dx`` times in X direction and ``dy`` times in Y direction.""" if self._indexing == 'ij': dx, dy = dy, dx return self._diff(self._dem, self._cellsize, dx, dy, **self._kwargs)
[docs] def D1x(self) -> np.ndarray: """Calculate the first derivative (slope) of the DEM in X direction. Same as ``self.diff(1, 0)``.""" return self.diff(1, 0)
[docs] def D1y(self) -> np.ndarray: """Calculate the first derivative (slope) of the DEM in Y direction. Same as ``self.diff(0, 1)``.""" return self.diff(0, 1)
[docs] def D2x(self) -> np.ndarray: """Calculate the 2nd derivative (curvature) of the DEM in X direction. Same as ``self.diff(2, 0)``.""" return self.diff(2, 0)
[docs] def D2y(self) -> np.ndarray: """Calculate the 2nd derivative (curvature) of the DEM in Y direction. Same as ``self.diff(0, 2)``.""" return self.diff(0, 2)
@_cachedmethod def _Dxy(self) -> np.ndarray: """Differentiate the DEM with respect to both X and Y, and then multiply by the individual derivatives. Same as ``self.diff(1, 1) * D1x * D1y``.""" return self.diff(1, 1) * self.D1x() * self.D1y()
[docs] @_cachedmethod def Dx2(self) -> np.ndarray: """Return the square of the X slope. Same as ``D1x**2``.""" return self.D1x() ** 2
[docs] @_cachedmethod def Dy2(self) -> np.ndarray: """Return the square of the Y slope. Same as ``D1y**2``.""" return self.D1y() ** 2
@_cachedmethod def _p(self) -> np.ndarray: """Returns the square of the total slope. Same as ``slope_x**2 + slope_y**2``.""" return self.Dx2() + self.Dy2() @_cachedmethod def _q(self) -> np.ndarray: """Returns ``slope_x**2 + slope_y**2 + 1``.""" return self._p() + 1
[docs] @_cachedmethod def slope(self) -> np.ndarray: """Returns the absolute value of the total slope. Same as ``sqrt(D1x**2 + D1y**2)``.""" return np.sqrt(self._p())
slope_x = D1x slope_y = D1y
[docs] def curvature(self) -> np.ndarray: """Returns the absolute value of the total curvature. Same as ``sqrt(D2x**2 + D2y**2)``.""" return np.sqrt(self.D2x()**2 + self.D2y()**2)
curvature_x = D2x curvature_y = D2y
[docs] def plan_curvature(self) -> np.ndarray: """Calculate the planform curvature, i.e. the curvature of the terrain surface in the horizontal plane. See: - http://surferhelp.goldensoftware.com/gridops/plan_curvature.htm - Map Use: Reading, Analysis, Interpretation, Seventh Edition (p. 360) """ return _safe_divide( self.D2x() * self.Dy2() + self.Dx2() * self.D2y() - 2 * self._Dxy(), self._p() ** 1.5, fill=0)
[docs] def tang_curvature(self) -> np.ndarray: """Calculate the tangential curvature, i.e. the curvature of a terrain cross-section on a vertical plane perpendicular to the gradient direction. See: - http://surferhelp.goldensoftware.com/gridops/tangential_curvature.htm - Map Use: Reading, Analysis, Interpretation, Seventh Edition (p. 360) """ return _safe_divide( self.D2x() * self.Dy2() + self.Dx2() * self.D2y() - 2 * self._Dxy(), self._q()**0.5 * self._p(), fill=0)
[docs] def prof_curvature(self) -> np.ndarray: """Calculate the profile curvature., i.e. the curvature of a terrain cross-section on a vertical plane containing the gradient vector. See: - http://surferhelp.goldensoftware.com/gridops/profile_curvature.htm - Map Use: Reading, Analysis, Interpretation, Seventh Edition (p. 360) """ return _safe_divide( self.D2x() * self.Dx2() + self.Dy2() * self.D2y() + 2 * self._Dxy(), self._q()**1.5 * self._p(), fill=0)
[docs] def northness(self) -> np.ndarray: """Calculate the northness, i.e. the cosine-transformed aspect. This quantity can be understood as the projected length of the gradient direction on the north axis in the horizontal plane.""" return np.sin(self.grad_dir())
[docs] def eastness(self) -> np.ndarray: """Calculate the eastness, i.e. the sine-transformed aspect. This quantity can be understood as the projected length of the gradient direction on the east axis in the horizontal plane.""" return -np.cos(self.grad_dir())
[docs] @_cachedmethod def aspect(self) -> np.ndarray: """Returns the angle of the gradient direction in radians counted from north in clockwise direction.""" return 3 * np.pi / 2 - self.grad_dir()
[docs] @_cachedmethod def grad_dir(self) -> np.ndarray: """Returns the angle of the gradient direction in radians counted from east in counter-clockwise direction (i.e. mathematical convention).""" return np.arctan2(self.D1y(), self.D1x())
[docs] @_cachedmethod def slope_angle(self) -> np.ndarray: """Returns the slope angle, i.e. the angle between the horizontal plane and the gradient vector. This is a non-negative value.""" return np.arctan(self.slope())
[docs] @_cachedmethod def slope_angle_x(self) -> np.ndarray: """Returns the x slope angle, i.e. the angle between the x axis and the surface in the xz plane.""" return np.arctan(self.slope_x())
[docs] @_cachedmethod def slope_angle_y(self) -> np.ndarray: """Returns the y slope angle, i.e. the angle between the y axis and the surface in the yz plane.""" return np.arctan(self.slope_y())
[docs] def verticality(self) -> np.ndarray: """Sine of the slope angle.""" return np.sin(self.slope_angle())
[docs] def verticality_x(self) -> np.ndarray: """Sine of the x slope angle.""" return np.sin(self.slope_angle_x())
[docs] def verticality_y(self) -> np.ndarray: """Sine of the y slope angle.""" return np.sin(self.slope_angle_y())
[docs] @_cachedmethod def latitude(self) -> np.ndarray: """Get the latitude in radians at each pixel.""" if self._crs is None or self._transform is None: raise ValueError( "In order to use this method the `crs` and `transform` " "parameters must be defined when initializing the " "Topography() object!") if self._indexing == 'xy': ny, nx = self._dem.shape else: nx, ny = self._dem.shape xs, ys = self._transform * np.meshgrid( np.linspace(0, nx - 1, nx), np.linspace(0, ny - 1, ny), indexing=self._indexing) return get_latitude(xs, ys, self._crs)
[docs] def sun_exposure(self, declination=0) -> np.ndarray: """ Return the cosine of the angle between surface normal and the sun at midday. :param declination: sun declination angle in radians. Can be used to adjust for season. A positive value corresponds to a northward deviation. Zero means the sun is in the zenith over the equator. This is very similar to ``rad_angle`` but not exactly the same. """ return sun_exposure( self.grad_dir(), self.slope(), self.latitude(), declination)
[docs] def rad_angle(self, declination=0) -> np.ndarray: """ Return the cosine of the "radiation angle" according to Herbsta, et al 2006 [1]. :param declination: sun declination angle in radians. Can be used to adjust for seasonal changes. All input arguments must be of the same shape or scalars. The formulate is taken from Herbsta, et al. 2006 [1] who cite Moore, et al. 1988 [2] as source - which doesn't mention the formula anywhere. [1] (2006 Geostatistical co-regionalization of soil hydraulic properties in a microscale catchment using terrain attributes [Herbsta, et al.]) [2] Moore, I.D., Burch, G.J., Mackenzie, D.H., 1988. Topographic effects on the distribution of surface soil water and the location of ephemeral gullies. American Society of Agricultural Engi- neers 31 (4), 1098 – 1107. """ return rad_angle( self.aspect(), self.slope(), self.latitude(), declination)
[docs]def get_latitude( xs: np.ndarray, ys: np.ndarray, crs: str, ): """Calculate the latitude for each pair of (X, Y) coordinates in the given CRS. :param xs: array of X coordinates :param ys: array of Y coordinates :param crs: name of the coordinate system """ to_wgs84 = pyproj.Transformer.from_crs(crs, 'epsg:4326', always_xy=True) lon, lat = to_wgs84.transform(xs, ys) return lat * (np.pi / 180)
[docs]def sun_exposure(grad_dir, slope, latitude, declination=0.0): """ Return the cosine of the angle between surface normal and the sun at midday. :param grad_dir: direction of the gradient in XY plane in radians in counter-clockwise direction with respect to east :param slope: inclination of the gradient in radians :param latitude: point latitude in radians :param declination: sun declination angle in radians. Can be used to adjust for season. A positive value corresponds to a northward deviation. Zero means the sun is in the zenith over the equator. All input arguments must be of the same shape or scalar. This is very similar to ``rad_angle`` but not exactly the same. """ delta = latitude - declination sun_vector = np.array([ np.zeros_like(delta), -np.sin(delta), np.cos(delta), ]) plane_normal = np.array([ -np.cos(grad_dir) * np.sin(slope), -np.sin(grad_dir) * np.sin(slope), np.cos(slope), ]) return np.sum(sun_vector * plane_normal, axis=0)
[docs]def rad_angle(aspect, slope, latitude, declination=0.0): """ Return the cosine of the "radiation angle" according to Herbsta, et al 2006 [1]. :param aspect: direction of the gradient in XY plane in radians in clockwise direction with respect to north :param slope: inclination of the gradient in radians :param latitude: point latitude in radians :param declination: sun declination angle in radians All input arguments must be of the same shape or scalar. The formula is taken from Herbsta, et al. 2006 [1]. They cite Moore, et al. 1988 [2] as source - which doesn't seem to mention the formula anywhere. [1] (2006 Geostatistical co-regionalization of soil hydraulic properties in a microscale catchment using terrain attributes [Herbsta, et al.]) [2] Moore, I.D., Burch, G.J., Mackenzie, D.H., 1988. Topographic effects on the distribution of surface soil water and the location of ephemeral gullies. American Society of Agricultural Engi- neers 31 (4), 1098 – 1107. """ delta = latitude - declination b = 1 + np.sin(slope)**2 * np.cos(aspect)**2 c = 2 * np.sin(slope) * np.cos(slope) * np.cos(aspect) * np.sin(delta) d = np.cos(slope)**2 * np.sin(delta) - 1 return (-c + np.sqrt(c**2 - 4 * b * d)) / (2*b)
def _as_float_array(array: np.ndarray, min_float=np.float32) -> np.ndarray: """Convert array to a floating point array.""" array = np.asanyarray(array) dtype = np.result_type(array, min_float) return np.asanyarray(array, dtype=dtype) def _safe_divide(a: np.ndarray, b: np.ndarray, fill=np.nan) -> np.ndarray: """Perform division without complaining about division by zero. Fills nan/inf values by the given fill value (can be an array that broadcasts to the result shape).""" with np.errstate(divide='ignore', invalid='ignore'): result = np.divide(a, b) result = np.where(np.isfinite(result), result, fill) return result