Source code for eradiate.radprops._core

from __future__ import annotations

import datetime
from abc import ABC, abstractmethod
from functools import singledispatchmethod

import attrs
import numpy as np
import pint
import pinttr
import xarray as xr
from pinttr.util import ensure_units

import eradiate

from .._factory import Factory
from ..attrs import documented, parse_docs
from ..spectral.index import CKDSpectralIndex, MonoSpectralIndex, SpectralIndex
from ..units import unit_context_config as ucc
from ..units import unit_registry as ureg

rad_profile_factory = Factory()
rad_profile_factory.register_lazy_batch(
    [
        ("_atmosphere.AtmosphereRadProfile", "atmosphere", {}),
    ],
    cls_prefix="eradiate.radprops",
)


@ureg.wraps(
    ret=None, args=("nm", "km", "km", "km^-1", "km^-1", "km^-1", ""), strict=False
)
def make_dataset(
    wavelength: pint.Quantity | float,
    z_level: pint.Quantity | float,
    z_layer: pint.Quantity | float | None = None,
    sigma_a: pint.Quantity | float | None = None,
    sigma_s: pint.Quantity | float | None = None,
    sigma_t: pint.Quantity | float | None = None,
    albedo: pint.Quantity | float | None = None,
) -> xr.Dataset:
    """
    Makes an atmospheric radiative properties data set.

    Parameters
    ----------
    wavelength : float
        Wavelength [nm].

    z_level : array
        Level altitudes [km].

    z_layer : array
        Layer altitudes [km].

        If ``None``, the layer altitudes are computed automatically, so that
        they are halfway between the adjacent altitude levels.

    sigma_a : array
        Absorption coefficient values [km^-1].

    sigma_s : array
        Scattering coefficient values [km^-1].

    sigma_t : array
        Extinction coefficient values [km^-1].

    albedo : array
        Albedo values [/].

    Returns
    -------
    Dataset
        Atmosphere radiative properties data set.
    """
    if z_layer is None:
        z_layer = 0.5 * (z_level[1:] + z_level[:-1])

    if sigma_a is not None and sigma_s is not None:
        sigma_t = sigma_a + sigma_s
        albedo = np.divide(
            sigma_s, sigma_t, where=sigma_t != 0.0, out=np.zeros_like(sigma_s)
        )
    elif sigma_t is not None and albedo is not None:
        sigma_s = albedo * sigma_t
        sigma_a = sigma_t - sigma_s
    else:
        raise ValueError(
            "You must provide either one of the two pairs of arguments "
            "('sigma_a', 'sigma_s') or ('sigma_t', 'albedo')."
        )

    return xr.Dataset(
        data_vars={
            "sigma_a": (
                ("w", "z_layer"),
                sigma_a.reshape(1, z_layer.size),
                dict(
                    standard_name="absorption_coefficient",
                    units="km^-1",
                    long_name="absorption coefficient",
                ),
            ),
            "sigma_s": (
                ("w", "z_layer"),
                sigma_s.reshape(1, z_layer.size),
                dict(
                    standard_name="scattering_coefficient",
                    units="km^-1",
                    long_name="scattering coefficient",
                ),
            ),
            "sigma_t": (
                ("w", "z_layer"),
                sigma_t.reshape(1, z_layer.size),
                dict(
                    standard_name="extinction_coefficient",
                    units="km^-1",
                    long_name="extinction coefficient",
                ),
            ),
            "albedo": (
                ("w", "z_layer"),
                albedo.reshape(1, z_layer.size),
                dict(
                    standard_name="albedo",
                    units="",
                    long_name="albedo",
                ),
            ),
        },
        coords={
            "z_level": (
                "z_level",
                z_level,
                dict(
                    standard_name="level_altitude",
                    units="km",
                    long_name="level altitude",
                ),
            ),
            "z_layer": (
                "z_layer",
                z_layer,
                dict(
                    standard_name="layer_altitude",
                    units="km",
                    long_name="layer altitude",
                ),
            ),
            "w": (
                "w",
                [wavelength],
                dict(
                    standard_name="radiation_wavelength",
                    units="nm",
                    long_name="wavelength",
                ),
            ),
        },
        attrs={
            "convention": "CF-1.8",
            "title": "Atmospheric monochromatic radiative properties",
            "history": f"{datetime.datetime.utcnow().replace(microsecond=0)} - "
            f"data set creation - "
            f"{__name__}.make_dataset",
            "source": f"eradiate, version {eradiate.__version__}",
            "references": "",
        },
    )


[docs] @parse_docs @attrs.frozen(eq=False, init=False) class ZGrid: """ A container for a regular altitude grid. Notes ----- * Instances are immutable. * Instances are hashable by ID. This is required to allow for using them as an argument of an LRU-cached function. * This class is used as the argument of the ``eval()`` family of methods. """ levels: pint.Quantity = documented( pinttr.field( units=ucc.deferred("length"), on_setattr=None, # frozen instance: on_setattr must be disabled ), type="pint.Quantity", init_type="quantity or array-like", doc="Grid node altitudes.\n" "\n" "Unit-enabled field (default: ``ucc['length']``).", ) _layers: pint.Quantity = pinttr.field( units=ucc.deferred("length"), on_setattr=None, # frozen instance: on_setattr must be disabled ) _layer_height: pint.Quantity = pinttr.field( units=ucc.deferred("length"), on_setattr=None, # frozen instance: on_setattr must be disabled ) @_layer_height.validator def _layer_height_validator(self, attribute, value): if not np.isscalar(value.magnitude): raise ValueError("layer height must be a scalar") _total_height: pint.Quantity = pinttr.field( units=ucc.deferred("length"), on_setattr=None, # frozen instance: on_setattr must be disabled ) def __init__(self, levels: np.typing.ArrayLike): levels = ensure_units(levels, ucc.get("length")) layer_height = np.diff(levels) if not np.allclose(layer_height, layer_height[0]): raise ValueError("levels must be regularly spaced") layers = levels[:-1] + 0.5 * layer_height self.__attrs_init__( levels=levels, layers=layers, layer_height=layer_height[0], total_height=(levels.m[-1] - levels.m[0]) * levels.u, ) @property def layers(self) -> pint.Quantity: """ Returns ------- quantity Vector of altitudes of layer centres. """ return self._layers @property def layer_height(self) -> pint.Quantity: """ Returns ------- quantity Layer height. """ return self._layer_height @property def n_levels(self) -> int: """ Returns ------- int Number of levels. """ return len(self.levels) @property def n_layers(self) -> int: """ Returns ------- int Number of layers. """ return len(self.layers) @property def total_height(self) -> pint.Quantity: """ Returns ------- quantity Total height covered by the altitude grid. """ return self._total_height
[docs] @attrs.define(eq=False) class RadProfile(ABC): """ An abstract base class for radiative property profiles. Classes deriving from this one must implement methods which return the albedo and collision coefficients as Pint-wrapped Numpy arrays. See Also -------- :data:`.rad_profile_factory` """ @property @abstractmethod def zgrid(self) -> ZGrid: """ Default altitude grid used for profile evaluation. """ pass
[docs] @singledispatchmethod def eval_albedo( self, si: SpectralIndex, zgrid: ZGrid | None = None, ) -> pint.Quantity: """ Evaluate albedo at given spectral index. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : .ZGrid, optional The altitude grid for which the albedo is evaluated. If unset, a profile-specific default is used. Returns ------- quantity Evaluated spectrum as an array with length equal to the number of layers. """ raise NotImplementedError
@eval_albedo.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid) -> pint.Quantity: return self.eval_albedo_mono(w=si.w, zgrid=zgrid).squeeze() @eval_albedo.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid) -> pint.Quantity: return self.eval_albedo_ckd(w=si.w, g=si.g, zgrid=zgrid).squeeze() def eval_albedo_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: raise NotImplementedError def eval_albedo_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid, ) -> pint.Quantity: raise NotImplementedError
[docs] @singledispatchmethod def eval_sigma_t( self, si: SpectralIndex, zgrid: ZGrid | None = None, ) -> pint.Quantity: """ Evaluate extinction coefficient at given spectral index. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : .ZGrid, optional The altitude grid for which the extinction coefficient is evaluated. If unset, a profile-specific default is used. Returns ------- quantity Evaluated spectrum as an array with length equal to the number of layers. """ raise NotImplementedError
@eval_sigma_t.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid) -> pint.Quantity: return self.eval_sigma_t_mono(w=si.w, zgrid=zgrid).squeeze() @eval_sigma_t.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid) -> pint.Quantity: return self.eval_sigma_t_ckd(w=si.w, g=si.g, zgrid=zgrid).squeeze() def eval_sigma_t_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: raise NotImplementedError def eval_sigma_t_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid, ) -> pint.Quantity: raise NotImplementedError
[docs] @singledispatchmethod def eval_sigma_a( self, si: SpectralIndex, zgrid: ZGrid | None = None, ) -> pint.Quantity: """ Evaluate absorption coefficient at given spectral index. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : .ZGrid, optional The altitude grid for which the absorption coefficient is evaluated. If unset, a profile-specific default is used. Returns ------- quantity Evaluated spectrum as an array with length equal to the number of layers. """ raise NotImplementedError
@eval_sigma_a.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid) -> pint.Quantity: return self.eval_sigma_a_mono(w=si.w, zgrid=zgrid).squeeze() @eval_sigma_a.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid) -> pint.Quantity: return self.eval_sigma_a_ckd(w=si.w, g=si.g, zgrid=zgrid).squeeze()
[docs] def eval_sigma_a_mono( self, w: pint.Quantity, zgrid: ZGrid, ) -> pint.Quantity: """ Evaluate absorption coefficient spectrum in monochromatic mode. """ raise NotImplementedError
[docs] def eval_sigma_a_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid, ) -> pint.Quantity: """ Evaluate absorption coefficient spectrum in CKD modes. """ raise NotImplementedError
[docs] @singledispatchmethod def eval_sigma_s( self, si: SpectralIndex, zgrid: ZGrid | None = None, ) -> pint.Quantity: """ Evaluate scattering coefficient at given spectral index. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : :class:`.ZGrid`, optional The altitude grid for which the scattering coefficient is evaluated. If unset, a profile-specific default is used. Returns ------- quantity Evaluated spectrum as an array with length equal to the number of layers. """ raise NotImplementedError
@eval_sigma_s.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid) -> pint.Quantity: return self.eval_sigma_s_mono(w=si.w, zgrid=zgrid).squeeze() @eval_sigma_s.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid) -> pint.Quantity: return self.eval_sigma_s_ckd(w=si.w, g=si.g, zgrid=zgrid).squeeze() def eval_sigma_s_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: raise NotImplementedError def eval_sigma_s_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid ) -> pint.Quantity: raise NotImplementedError
[docs] @singledispatchmethod def eval_dataset( self, si: SpectralIndex, zgrid: ZGrid | None = None, ) -> xr.Dataset: """ Evaluate radiative properties at given spectral index. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : .ZGrid, optional The altitude grid for which the radiative profile is evaluated. If unset, a profile-specific default is used. Returns ------- Dataset Radiative property dataset. """ raise NotImplementedError
@eval_dataset.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid) -> xr.Dataset: return self.eval_dataset_mono(w=si.w, zgrid=zgrid) @eval_dataset.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid) -> xr.Dataset: return self.eval_dataset_ckd(w=si.w, g=si.g, zgrid=zgrid) def eval_dataset_mono(self, w: pint.Quantity, zgrid: ZGrid) -> xr.Dataset: raise NotImplementedError def eval_dataset_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid, ) -> xr.Dataset: raise NotImplementedError