Source code for eradiate.radprops._array

"""
Array radiative profile.
"""

from __future__ import annotations

import typing as t
import warnings

import attrs
import numpy as np
import pint
import xarray as xr

from ._core import RadProfile, ZGrid, make_dataset
from ..attrs import define, documented
from ..units import to_quantity
from ..units import unit_registry as ureg


[docs] @define(eq=False) class ArrayRadProfile(RadProfile): """ Array radiative profile. This class provides an interface to generate vertical profiles of atmospheric volume radiative properties (also sometimes referred to as collision coefficients). The array radiative profile is built from absorption and scattering coefficient data provided as input. This can be useful for debugging and benchmarking. """ sigma_a: xr.DataArray | None = documented( attrs.field( default=None, ), doc="``DataArray`` of absorption coefficients. " "The ``DataArray`` is composed of two dimensions ``{w, z}`` representing the " "wavelength and altitude respectively. Note that ``w`` must be of length " "2 minimum to be correctly interpolated.", type="DataArray or None", init_type="DataArray or None", default="None", ) sigma_s: xr.DataArray | None = documented( attrs.field( default=None, ), doc="``DataArray`` of scattering coefficients. " "The ``DataArray`` is composed of two dimensions ``{w, z}`` representing the " "wavelength and altitude respectively. Note that ``w`` must be of length " "2 minimum to be correctly interpolated.", type="DataArray or None", init_type="DataArray or None", default="None", ) has_absorption: bool = documented( attrs.field( default=True, converter=bool, validator=attrs.validators.instance_of(bool), ), doc="Absorption switch. If ``True``, the absorption coefficient is " "computed. Else, the absorption coefficient is not computed and " "instead set to zero.", type="bool", default="True", ) has_scattering: bool = documented( attrs.field( default=True, converter=bool, validator=attrs.validators.instance_of(bool), ), doc="Scattering switch. If ``True``, the scattering coefficient is " "computed. Else, the scattering coefficient is not computed and " "instead set to zero.", type="bool", default="True", ) rayleigh_depolarization: np.ndarray = documented( attrs.field( converter=lambda x: np.array(x, dtype=np.float64), kw_only=True, factory=lambda: np.array(0.0), ), type="ndarray", doc="Depolarization factor of the rayleigh phase function. " "A ``ndarray`` will be interpreted as a description of the depolarization " "factor at different levels of the atmosphere. Must be shaped (N,) with " "N the number of layers.", init_type="array-like, optional", default="[0]", ) interpolation_method: t.Literal["nearest", "linear"] = documented( attrs.field( default="nearest", converter=str, validator=attrs.validators.in_(["nearest", "linear"]), ), doc="Method of interpolation of the absorption and scattering coefficients.", type="str", init_type="{'nearest', 'linear'}", default="nearest", ) interpolation_kwargs: dict[str, t.Any] = documented( attrs.field( factory=dict, converter=dict, validator=attrs.validators.instance_of(dict), ), doc="Keyword arguments passed to :meth:`xarray.DataArray.interp` when called.", type="dict", init_type="dict, optional", ) def __attrs_post_init__(self): self.update() def update(self) -> None: pass @property def zbounds(self) -> tuple[pint.Quantity, pint.Quantity]: z_max = np.inf z_min = -1.0 if self.sigma_s is not None and self.has_scattering: z_sigma_s = to_quantity(self.sigma_s.z) z_max = pint.Quantity(z_max, z_sigma_s.units) z_min = pint.Quantity(z_min, z_sigma_s.units) z_max = min(z_max, z_sigma_s.max()) z_min = max(z_min, z_sigma_s.min()) if self.sigma_a is not None and self.has_absorption: z_sigma_a = to_quantity(self.sigma_a.z) z_max = pint.Quantity(z_max, z_sigma_a.units) z_min = pint.Quantity(z_min, z_sigma_a.units) z_max = min(z_max, z_sigma_a.max()) z_min = max(z_min, z_sigma_a.min()) return z_min, z_max @property def levels(self) -> pint.Quantity: NotImplementedError @property def zgrid(self) -> ZGrid: NotImplementedError def eval_albedo_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: # Inherit docstring sigma_s = self.eval_sigma_s_mono(w, zgrid) sigma_t = self.eval_sigma_t_mono(w, zgrid) return np.divide( sigma_s, sigma_t, where=sigma_t != 0.0, out=np.zeros_like(sigma_s) ).to("dimensionless") def eval_albedo_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid ) -> pint.Quantity: sigma_s = self.eval_sigma_s_ckd(w=w, g=g, zgrid=zgrid) sigma_t = self.eval_sigma_t_ckd(w=w, g=g, zgrid=zgrid) return np.divide( sigma_s, sigma_t, where=sigma_t != 0.0, out=np.zeros_like(sigma_s) ).to("dimensionless")
[docs] def eval_sigma_a_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: # NOTE: this method accepts 'w'-arrays and is vectorized as far as # each individual absorption dataset is concerned, namely when the # wavelengths span multiple datasets we for-loop over them. w = np.atleast_1d(w) if self.has_absorption and self.sigma_a is not None: values = self.sigma_a.interp( coords={ "z": zgrid.layers.m_as(self.sigma_a.z.attrs["units"]), "w": w.m_as(self.sigma_a.w.attrs["units"]), }, method=self.interpolation_method, kwargs=self.interpolation_kwargs, ) values = to_quantity(values) return values.squeeze() else: return np.zeros((w.size, zgrid.n_layers)).squeeze() / ureg.km
[docs] def eval_sigma_a_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid ) -> pint.Quantity: warnings.warn( "You are using ArrayRadProps in CKD mode, this is not standard " "behaviour and might provide erroneous results." ) return self.eval_sigma_a_mono(w=w, zgrid=zgrid)
def eval_sigma_s_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: w = np.atleast_1d(w) if self.has_scattering and self.sigma_s is not None: sigma_s = self.sigma_s.interp( coords={ "z": zgrid.layers.m_as(self.sigma_s.z.attrs["units"]), "w": w.m_as(self.sigma_s.w.attrs["units"]), }, method=self.interpolation_method, kwargs=self.interpolation_kwargs, ) sigma_s = to_quantity(sigma_s) return sigma_s.squeeze() else: return np.zeros((1, zgrid.n_layers)) / ureg.km def eval_sigma_s_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid ) -> pint.Quantity: return self.eval_sigma_s_mono(w=w, zgrid=zgrid) def eval_sigma_t_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: sigma_a = self.eval_sigma_a_mono(w=w, zgrid=zgrid) sigma_s = self.eval_sigma_s_mono(w=w, zgrid=zgrid) return sigma_a + sigma_s def eval_sigma_t_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid, ) -> pint.Quantity: sigma_a = self.eval_sigma_a_ckd(w=w, g=g, zgrid=zgrid) sigma_s = self.eval_sigma_s_ckd(w=w, g=g, zgrid=zgrid) return sigma_a + sigma_s def eval_dataset_mono(self, w: pint.Quantity, zgrid: ZGrid) -> xr.Dataset: return make_dataset( wavelength=w, z_level=zgrid.levels, z_layer=zgrid.layers, sigma_a=self.eval_sigma_a_mono(w, zgrid), sigma_s=self.eval_sigma_s_mono(w, zgrid), ).squeeze() def eval_dataset_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid, ) -> xr.Dataset: return make_dataset( wavelength=w, z_level=zgrid.levels, z_layer=zgrid.layers, sigma_a=self.eval_sigma_a_ckd(w=w, g=g, zgrid=zgrid), sigma_s=self.eval_sigma_s_ckd(w=w, g=g, zgrid=zgrid), ).squeeze() def eval_depolarization_factor_mono( self, w: pint.Quantity, zgrid: ZGrid ) -> pint.Quantity: if self.has_scattering: if isinstance(self.rayleigh_depolarization, np.ndarray): return np.atleast_1d(self.rayleigh_depolarization) * ureg.dimensionless else: raise NotImplementedError else: return np.atleast_1d(0.0) * ureg.dimensionless def eval_depolarization_factor_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid, ) -> pint.Quantity: return self.eval_depolarization_factor_mono(w=w, zgrid=zgrid)