"""
Atmosphere's radiative profile.
"""
from __future__ import annotations
import attrs
import joseki
import numpy as np
import pint
import xarray as xr
from joseki.profiles.core import interp
from ._absorption import AbsorptionDatabase
from ._core import RadProfile, ZGrid, make_dataset
from .rayleigh import compute_sigma_s_air
from ..attrs import define, documented
from ..converters import convert_thermoprops
from ..units import to_quantity
from ..units import unit_registry as ureg
from ..util.misc import cache_by_id, summary_repr
[docs]
@define(eq=False)
class AtmosphereRadProfile(RadProfile):
"""
Atmospheric radiative profile.
This class provides an interface to generate vertical profiles of
atmospheric volume radiative properties (also sometimes referred to as
collision coefficients).
The atmospheric radiative profile is built from a thermophysical profile,
which provides the temperature, pressure and species concentrations as a
function of altitude, and an absorption coefficient database indexed by
those thermophysical variables.
"""
absorption_data: AbsorptionDatabase = documented(
attrs.field(
factory=AbsorptionDatabase.default,
converter=AbsorptionDatabase.convert,
validator=attrs.validators.instance_of(AbsorptionDatabase),
),
doc="Absorption coefficient data. The passed value is pre-processed by "
":meth:`.AbsorptionDatabase.convert`.",
type="AbsorptionDatabase",
init_type="str or path-like or dict or .AbsorptionDatabase",
default=":meth:`AbsorptionDatabase.default() <.AbsorptionDatabase.default>`",
)
thermoprops: xr.Dataset = documented(
attrs.field(
factory=lambda: joseki.make(
identifier="afgl_1986-us_standard",
z=np.linspace(0.0, 120.0, 121) * ureg.km,
additional_molecules=False,
),
converter=convert_thermoprops,
repr=summary_repr,
),
doc="Atmosphere's thermophysical properties.",
type="Dataset",
default=(
"`joseki.make <https://rayference.github.io/joseki/latest/reference/#src.joseki.core.make>`_"
' with ``identifier`` set to "afgl_1986-us_standard" and '
'``z`` set to "np.linspace(0.0, 120.0, 121) * ureg.km" and '
'``additional_molecules`` set to "False".'
),
)
@thermoprops.validator
def _check_thermoprops(self, attribute, value):
if not value.joseki.is_valid:
raise ValueError(
"Invalid thermophysical properties dataset." # TODO: explain what is invalid
)
has_absorption: bool = documented(
attrs.field(
default=True,
converter=bool,
validator=attrs.validators.instance_of(bool),
),
doc="Absorption switch. If ``True``, the absorption coefficient is "
"computed. Else, the absorption coefficient is not computed and "
"instead set to zero.",
type="bool",
default="True",
)
has_scattering: bool = documented(
attrs.field(
default=True,
converter=bool,
validator=attrs.validators.instance_of(bool),
),
doc="Scattering switch. If ``True``, the scattering coefficient is "
"computed. Else, the scattering coefficient is not computed and "
"instead set to zero.",
type="bool",
default="True",
)
_zgrid: ZGrid | None = attrs.field(default=None, init=False)
def __attrs_post_init__(self):
self.update()
def update(self) -> None:
self._zgrid = ZGrid(levels=self.levels)
@property
def levels(self) -> pint.Quantity:
return to_quantity(self.thermoprops.z)
@property
def zgrid(self) -> ZGrid:
# Inherit docstring
return self._zgrid
@cache_by_id
def _thermoprops_interp(self, zgrid: ZGrid) -> xr.Dataset:
# Interpolate thermophysical profile on specified altitude grid
# Note: this value is cached so that repeated calls with the same zgrid
# won't trigger an unnecessary computation.
return interp(
self.thermoprops,
z_new=zgrid.levels.m * ureg(str(zgrid.levels.units)),
method={"default": "nearest"}, # TODO: revisit
)
def eval_albedo_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity:
# Inherit docstring
sigma_s = self.eval_sigma_s_mono(w, zgrid)
sigma_t = self.eval_sigma_t_mono(w, zgrid)
return np.divide(
sigma_s, sigma_t, where=sigma_t != 0.0, out=np.zeros_like(sigma_s)
).to("dimensionless")
def eval_albedo_ckd(
self, w: pint.Quantity, g: float, zgrid: ZGrid
) -> pint.Quantity:
sigma_s = self.eval_sigma_s_ckd(w=w, g=g, zgrid=zgrid)
sigma_t = self.eval_sigma_t_ckd(w=w, g=g, zgrid=zgrid)
return np.divide(
sigma_s, sigma_t, where=sigma_t != 0.0, out=np.zeros_like(sigma_s)
).to("dimensionless")
[docs]
def eval_sigma_a_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity:
# NOTE: this method accepts 'w'-arrays and is vectorized as far as
# each individual absorption dataset is concerned, namely when the
# wavelengths span multiple datasets we for-loop over them.
w = np.atleast_1d(w)
if self.has_absorption:
thermoprops = self._thermoprops_interp(zgrid)
values = self.absorption_data.eval_sigma_a_mono(w, thermoprops).transpose(
"w", "z"
)
values = to_quantity(values)
# We evaluated the
# project on altitude layers
return 0.5 * (values[:, 1:] + values[:, :-1]).squeeze()
else:
return np.zeros((w.size, zgrid.n_layers)).squeeze() / ureg.km
[docs]
def eval_sigma_a_ckd(
self, w: pint.Quantity, g: float, zgrid: ZGrid
) -> pint.Quantity:
w = np.atleast_1d(w)
if self.has_absorption:
values = self.absorption_data.eval_sigma_a_ckd(
w, g, self._thermoprops_interp(zgrid)
) # axis order = (w, z)
values = to_quantity(values)
# Interpolate on altitude layers
return 0.5 * (values[:, 1:] + values[:, :-1]).squeeze()
else:
return np.zeros((w.size, zgrid.n_layers)).squeeze() / ureg.km
def eval_sigma_s_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity:
if self.has_scattering:
thermoprops = self._thermoprops_interp(zgrid)
sigma_s = compute_sigma_s_air(
wavelength=w,
number_density=to_quantity(thermoprops.n),
)
# project on altitude layers
return 0.5 * (sigma_s[1:] + sigma_s[:-1])
else:
return np.zeros((1, zgrid.n_layers)) / ureg.km
def eval_sigma_s_ckd(
self, w: pint.Quantity, g: float, zgrid: ZGrid
) -> pint.Quantity:
return self.eval_sigma_s_mono(w=w, zgrid=zgrid)
def eval_sigma_t_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity:
sigma_a = self.eval_sigma_a_mono(w=w, zgrid=zgrid)
sigma_s = self.eval_sigma_s_mono(w=w, zgrid=zgrid)
return sigma_a + sigma_s
def eval_sigma_t_ckd(
self,
w: pint.Quantity,
g: float,
zgrid: ZGrid,
) -> pint.Quantity:
sigma_a = self.eval_sigma_a_ckd(w=w, g=g, zgrid=zgrid)
sigma_s = self.eval_sigma_s_ckd(w=w, g=g, zgrid=zgrid)
return sigma_a + sigma_s
def eval_dataset_mono(self, w: pint.Quantity, zgrid: ZGrid) -> xr.Dataset:
return make_dataset(
wavelength=w,
z_level=zgrid.levels,
z_layer=zgrid.layers,
sigma_a=self.eval_sigma_a_mono(w, zgrid),
sigma_s=self.eval_sigma_s_mono(w, zgrid),
).squeeze()
def eval_dataset_ckd(
self,
w: pint.Quantity,
g: float,
zgrid: ZGrid,
) -> xr.Dataset:
return make_dataset(
wavelength=w,
z_level=zgrid.levels,
z_layer=zgrid.layers,
sigma_a=self.eval_sigma_a_ckd(w=w, g=g, zgrid=zgrid),
sigma_s=self.eval_sigma_s_ckd(w=w, g=g, zgrid=zgrid),
).squeeze()