Source code for eradiate.scenes.atmosphere._particle_layer

"""
Particle layers.
"""
from __future__ import annotations

from functools import singledispatchmethod

import attrs
import numpy as np
import pint
import pinttr
import xarray as xr

from ._core import AbstractHeterogeneousAtmosphere
from ._particle_dist import ParticleDistribution, particle_distribution_factory
from ..core import traverse
from ..phase import TabulatedPhaseFunction
from ... import converters, data
from ...attrs import documented, parse_docs
from ...contexts import KernelContext
from ...kernel import UpdateParameter
from ...radprops import ZGrid
from ...spectral.ckd import BinSet, QuadSpec
from ...spectral.index import (
    CKDSpectralIndex,
    MonoSpectralIndex,
    SpectralIndex,
)
from ...spectral.mono import WavelengthSet
from ...units import to_quantity
from ...units import unit_context_config as ucc
from ...units import unit_registry as ureg
from ...util.misc import cache_by_id, summary_repr
from ...validators import is_positive


def _particle_layer_distribution_converter(value):
    if isinstance(value, str):
        if value == "uniform":
            return particle_distribution_factory.convert({"type": "uniform"})
        elif value == "gaussian":
            return particle_distribution_factory.convert({"type": "gaussian"})
        elif value == "exponential":
            return particle_distribution_factory.convert({"type": "exponential"})

    return particle_distribution_factory.convert(value)


[docs] @parse_docs @attrs.define(eq=False, slots=False) class ParticleLayer(AbstractHeterogeneousAtmosphere): """ Particle layer scene element [``particle_layer``]. The particle layer has a vertical extension specified by a bottom altitude (set by ``bottom``) and a top altitude (set by ``top``). Inside the layer, the particles number is distributed according to a distribution (set by ``distribution``). See :mod:`~eradiate.scenes.atmosphere.particle_dist` for the available distribution types and corresponding parameters. The particle layer is itself divided into a number of (sub-)layers (``n_layers``) to allow to describe the variations of the particles number with altitude. The particle density in the layer is adjusted so that the particle layer's optical thickness at a specified reference wavelength (``w_ref``) meets a specified value (``tau_ref``). The particles radiative properties are specified by a data set (``dataset``). """ bottom: pint.Quantity = documented( pinttr.field( default=ureg.Quantity(0.0, ureg.km), validator=[ is_positive, pinttr.validators.has_compatible_units, ], units=ucc.deferred("length"), ), doc="Bottom altitude of the particle layer.\n" "\n" "Unit-enabled field (default: ucc[length])", type="quantity", init_type="float or quantity", default="0 km", ) top: pint.Quantity = documented( pinttr.field( units=ucc.deferred("length"), default=ureg.Quantity(1.0, ureg.km), validator=[ is_positive, pinttr.validators.has_compatible_units, ], ), doc="Top altitude of the particle layer.\n" "\n" "Unit-enabled field (default: ucc[length]).", type="quantity", init_type="float or quantity", default="1 km.", ) @bottom.validator @top.validator def _bottom_top_validator(self, attribute, value): if self.bottom >= self.top: raise ValueError( f"while validating '{attribute.name}': bottom altitude must be " "lower than top altitude " f"(got bottom={self.bottom}, top={self.top})" ) distribution: ParticleDistribution = documented( attrs.field( default="uniform", converter=_particle_layer_distribution_converter, validator=attrs.validators.instance_of(ParticleDistribution), ), doc="Particle distribution. Simple defaults can be set using a string: " '``"uniform"`` (resp. ``"gaussian"``, ``"exponential"``) is converted to ' ":class:`UniformParticleDistribution() <.UniformParticleDistribution>` " "(resp. :class:`GaussianParticleDistribution() <.GaussianParticleDistribution>`, " ":class:`ExponentialParticleDistribution() <.ExponentialParticleDistribution>`).", init_type=":class:`.ParticleDistribution` or dict or " '{"uniform", "gaussian", "exponential"}, optional', type=":class:`.ParticleDistribution`", default='"uniform"', ) w_ref: pint.Quantity = documented( pinttr.field( units=ucc.deferred("wavelength"), default=550 * ureg.nm, validator=[ is_positive, pinttr.validators.has_compatible_units, ], ), doc="Reference wavelength at which the extinction optical thickness is " "specified.\n" "\n" "Unit-enabled field (default: ucc['wavelength']).", type="quantity", init_type="quantity or float", default="550.0", ) tau_ref: pint.Quantity = documented( pinttr.field( units=ucc.deferred("dimensionless"), default=ureg.Quantity(0.2, ureg.dimensionless), validator=[ is_positive, pinttr.validators.has_compatible_units, ], ), doc="Extinction optical thickness at the reference wavelength.\n" "\n" "Unit-enabled field (default: ucc[dimensionless]).", type="quantity", init_type="quantity or float", default="0.2", ) dataset: xr.Dataset = documented( attrs.field( default="govaerts_2021-continental", converter=converters.to_dataset( load_from_id=lambda x: data.load_dataset(f"spectra/particles/{x}.nc") ), validator=attrs.validators.instance_of(xr.Dataset), repr=summary_repr, ), doc="Particle radiative property data set." "If an xarray dataset is passed, the dataset is used as is " "(refer to the data guide for the format requirements of this dataset)." "If a path is passed, the converter tries to open the corresponding " "file on the hard drive; should that fail, it queries the Eradiate data" "store with that path." "If a string is passed, it is interpreted as an identifier for a " "particle radiative property dataset in the Eradiate data store.", type="Dataset", init_type="Dataset or path-like or str, optional", default="govaerts_2021-continental", ) has_absorption: bool = documented( attrs.field( default=True, converter=bool, ), doc="Absorption bypass switch. If ``True``, the absorption coefficient " "is computed. Else, the absorption coefficient is not computed and " "instead set to zero.", type="bool", default="True", ) has_scattering: bool = documented( attrs.field( default=True, converter=bool, ), doc="Scattering bypass switch. If ``True``, the scattering coefficient " "is computed. Else, the scattering coefficient is not computed and " "instead set to zero.", type="bool", default="True", ) @has_absorption.validator @has_scattering.validator def _switch_validator(self, attribute, value): if not self.has_absorption and not self.has_scattering: raise ValueError( f"while validating {attribute.name}: at least one of " "'has_absorption' and 'has_scattering' must be True" ) _phase: TabulatedPhaseFunction | None = attrs.field(default=None, init=False)
[docs] def spectral_set( self, quad_spec: QuadSpec | None = None ) -> None | BinSet | WavelengthSet: return None
[docs] def update(self) -> None: self._phase = TabulatedPhaseFunction(id=self.phase_id, data=self.dataset.phase)
# -------------------------------------------------------------------------- # Spatial and thermophysical properties # --------------------------------------------------------------------------
[docs] def eval_fractions(self, zgrid: ZGrid) -> np.ndarray: """ Compute the particle number fraction in the particle layer. Returns ------- ndarray Particle number fractions as a (n_layers,)-shaped array. """ x = (zgrid.layers - self.bottom) / (self.top - self.bottom) fractions = self.distribution(x.m_as(ureg.dimensionless)) fractions /= np.sum(fractions) return fractions
[docs] def eval_mfp(self, ctx: KernelContext) -> pint.Quantity: min_sigma_s = self.eval_sigma_s(ctx.si).min() return 1.0 / min_sigma_s if min_sigma_s != 0.0 else np.inf * ureg.m
# -------------------------------------------------------------------------- # Radiative properties # -------------------------------------------------------------------------- @cache_by_id def _eval_albedo_impl(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: # Return albedo from dataset (without accounting for bypass switches) # This routine is vectorized and returns an array of shape # (n_wavelengths, n_layers) ds = self.dataset wavelengths = w.m_as(ds.w.attrs["units"]) interpolated = to_quantity(ds.albedo.interp(w=wavelengths)) where_present = np.reshape(self.eval_fractions(zgrid) > 0, (1, -1)) return interpolated * where_present @cache_by_id def _eval_sigma_t_impl(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: # Return extinction coefficient from dataset (without accounting # for bypass switches). This routine is vectorized and returns an # array of shape (n_wavelengths, n_layers) # Collect input data ds = self.dataset ds_w_units = ureg(ds.w.attrs["units"]) wavelengths = np.atleast_1d(w.m_as(ds_w_units)) sigma_t_star = to_quantity(ds.sigma_t.interp(w=wavelengths)) sigma_t_star_ref = to_quantity(ds.sigma_t.interp(w=self.w_ref.m_as(ds_w_units))) # Compute target optical thickness value tau = self.tau_ref * sigma_t_star / sigma_t_star_ref # Scatter this total OT to all layers # TODO: Make sure that axis order is consistent with other vectorized # routines tau_layers = np.broadcast_to( np.reshape(tau, (-1, 1)), (len(wavelengths), zgrid.n_layers) ) * np.reshape(self.eval_fractions(zgrid), (1, -1)) # Compute corresponding average coefficient sigma_t = tau_layers / zgrid.layer_height return sigma_t def _eval_sigma_a_impl(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: # Return absorption coefficient from dataset (without accounting for # bypass switches). This routine is vectorized and returns an array of # shape (n_wavelengths, n_layers) albedo = self._eval_albedo_impl(w, zgrid) return self._eval_sigma_t_impl(w, zgrid) * (1.0 - albedo.m) def _eval_sigma_s_impl(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: # Return scattering coefficient from dataset (without accounting for # bypass switches). This routine is vectorized and returns an array of # shape (n_wavelengths, n_layers) albedo = self._eval_albedo_impl(w, zgrid) return self._eval_sigma_t_impl(w, zgrid) * albedo.m
[docs] @singledispatchmethod def eval_albedo( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: # Inherit docstring raise NotImplementedError
@eval_albedo.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid | None = None) -> pint.Quantity: return self.eval_albedo_mono( w=si.w, zgrid=self.geometry.zgrid if zgrid is None else zgrid, ) @eval_albedo.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid | None = None) -> pint.Quantity: return self.eval_albedo_ckd( w=si.w, g=si.g, zgrid=self.geometry.zgrid if zgrid is None else zgrid, ) def eval_albedo_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: if self.has_absorption and self.has_scattering: albedo = self._eval_albedo_impl(w, zgrid).squeeze() elif self.has_absorption and not self.has_scattering: albedo = 0.0 * ureg.dimensionless elif self.has_scattering and not self.has_absorption: albedo = 1.0 * ureg.dimensionless else: raise RuntimeError # Albedo is constant vs spatial dimension return np.full_like(zgrid.layers, albedo) def eval_albedo_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid ) -> pint.Quantity: return self.eval_albedo_mono(w=w, zgrid=zgrid)
[docs] @singledispatchmethod def eval_sigma_t( self, si: SpectralIndex, zgrid: ZGrid | None = None, ) -> pint.Quantity: # Inherit docstring raise NotImplementedError
@eval_sigma_t.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid | None = None) -> pint.Quantity: return self.eval_sigma_t_mono( w=si.w, zgrid=self.geometry.zgrid if zgrid is None else zgrid, ) @eval_sigma_t.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid | None = None) -> pint.Quantity: return self.eval_sigma_t_ckd( w=si.w, g=si.g, zgrid=self.geometry.zgrid if zgrid is None else zgrid, ) def eval_sigma_t_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: result = self._eval_sigma_t_impl(w, zgrid).squeeze() if self.has_absorption and self.has_scattering: return result elif not self.has_absorption and self.has_scattering: return result - self._eval_sigma_a_impl(w, zgrid) elif self.has_absorption and not self.has_scattering: return result - self._eval_sigma_s_impl(w, zgrid) raise RuntimeError def eval_sigma_t_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid, ) -> pint.Quantity: return self.eval_sigma_t_mono(w=w, zgrid=zgrid)
[docs] @singledispatchmethod def eval_sigma_a( self, si: SpectralIndex, zgrid: ZGrid | None = None, ) -> pint.Quantity: # Inherit docstring raise NotImplementedError
@eval_sigma_a.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid | None = None) -> pint.Quantity: return self.eval_sigma_a_mono( w=si.w, zgrid=self.geometry.zgrid if zgrid is None else zgrid, ) @eval_sigma_a.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid | None = None) -> pint.Quantity: return self.eval_sigma_a_ckd( w=si.w, g=si.g, zgrid=self.geometry.zgrid if zgrid is None else zgrid, ) def eval_sigma_a_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: value = self._eval_sigma_a_impl(w, zgrid).squeeze() return value if self.has_absorption else np.zeros_like(value) * value.units def eval_sigma_a_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid ) -> pint.Quantity: return self.eval_sigma_a_mono(w, zgrid)
[docs] @singledispatchmethod def eval_sigma_s( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: # Inherit docstring raise NotImplementedError
@eval_sigma_s.register(MonoSpectralIndex) def _(self, si, zgrid: ZGrid | None = None) -> pint.Quantity: return self.eval_sigma_s_mono( w=si.w, zgrid=self.geometry.zgrid if zgrid is None else zgrid, ) @eval_sigma_s.register(CKDSpectralIndex) def _(self, si, zgrid: ZGrid | None = None) -> pint.Quantity: return self.eval_sigma_s_ckd( w=si.w, g=si.g, zgrid=self.geometry.zgrid if zgrid is None else zgrid, ) def eval_sigma_s_mono(self, w: pint.Quantity, zgrid: ZGrid) -> pint.Quantity: value = self._eval_sigma_s_impl(w, zgrid).squeeze() return value if self.has_scattering else np.zeros_like(value) * value.units def eval_sigma_s_ckd( self, w: pint.Quantity, g: float, zgrid: ZGrid ) -> pint.Quantity: return self.eval_sigma_s_mono(w, zgrid) # -------------------------------------------------------------------------- # Kernel dictionary generation # -------------------------------------------------------------------------- @property def phase(self) -> TabulatedPhaseFunction: # Inherit docstring return self._phase @property def _template_phase(self) -> dict: result, _ = traverse(self.phase) return result.data @property def _params_phase(self) -> dict[str, UpdateParameter]: _, result = traverse(self.phase) return result.data