Source code for eradiate.scenes.atmosphere._core

from __future__ import annotations

from abc import ABC, abstractmethod

import attrs
import mitsuba as mi
import numpy as np
import pint
import xarray as xr

import eradiate

from ..core import (
    CompositeSceneElement,
    SceneElement,
    traverse,
)
from ..geometry import PlaneParallelGeometry, SceneGeometry, SphericalShellGeometry
from ..phase import PhaseFunction
from ..shapes import Shape
from ..._factory import Factory
from ...attrs import documented, get_doc, parse_docs
from ...cfconventions import ATTRIBUTES
from ...contexts import KernelContext
from ...exceptions import UnsupportedModeError
from ...kernel import (
    InitParameter,
    TypeIdLookupStrategy,
    UpdateParameter,
)
from ...quad import Quad
from ...radprops import ZGrid
from ...spectral.ckd import BinSet, QuadSpec
from ...spectral.index import SpectralIndex
from ...spectral.mono import WavelengthSet
from ...units import symbol
from ...units import unit_context_config as ucc
from ...units import unit_context_kernel as uck
from ...units import unit_registry as ureg
from ...util.misc import flatten

atmosphere_factory = Factory()
atmosphere_factory.register_lazy_batch(
    [
        (
            "_heterogeneous.HeterogeneousAtmosphere",
            "heterogeneous",
            {},
        ),
        (
            "_homogeneous.HomogeneousAtmosphere",
            "homogeneous",
            {},
        ),
        (
            "_molecular.MolecularAtmosphere",
            "molecular",
            {},
        ),
        (
            "_particle_layer.ParticleLayer",
            "particle_layer",
            {},
        ),
    ],
    cls_prefix="eradiate.scenes.atmosphere",
)


[docs] @parse_docs @attrs.define(eq=False, slots=False) class Atmosphere(CompositeSceneElement, ABC): """ Abstract base class defining common facilities for all atmospheres. An atmosphere expands as a :class:`mitsuba.PhaseFunction`, a :class:`mitsuba.Medium` and a :class:`mitsuba.Shape`. """ id: str | None = documented( attrs.field( default="atmosphere", validator=attrs.validators.optional(attrs.validators.instance_of(str)), ), doc=get_doc(SceneElement, "id", "doc"), type=get_doc(SceneElement, "id", "type"), init_type=get_doc(SceneElement, "id", "init_type"), default='"atmosphere"', ) geometry: SceneGeometry = documented( attrs.field( default="plane_parallel", converter=SceneGeometry.convert, validator=attrs.validators.instance_of(SceneGeometry), ), doc="Parameters defining the basic geometry of the scene. " "Note if the atmosphere is used in a simulation, the experiment has " "all control over the atmosphere's geometry and is going to set it. " "Therefore, in such a case it is best to define the geometry at the " "experiment level. When the atmosphere is used standalone, care must " "be taken to ensure that the geometry is consistent with the vertical " "extent of the atmosphere.", type=".SceneGeometry", init_type=".SceneGeometry or dict or str, optional", default='"plane_parallel"', ) # -------------------------------------------------------------------------- # Properties # -------------------------------------------------------------------------- @property def bottom_altitude(self) -> pint.Quantity: """ Returns ------- quantity Atmosphere bottom altitude. """ return self.geometry.ground_altitude @property def top_altitude(self) -> pint.Quantity: """ Returns ------- quantity Atmosphere top altitude. """ return self.geometry.toa_altitude @property def height(self) -> pint.Quantity: """ Returns ------- quantity Atmosphere height. """ return self.top_altitude - self.bottom_altitude @property @abstractmethod def phase(self) -> PhaseFunction: """ Returns ------- .PhaseFunction Phase function associated with the atmosphere. """ pass # -------------------------------------------------------------------------- # Spectral set # --------------------------------------------------------------------------
[docs] @abstractmethod def spectral_set(self) -> None | BinSet | WavelengthSet: """ The spectral set emitted by the atmosphere (optional). Notes ----- Typically, absorbing molecular atmosphere are characterized by an absorption dataset which tabulates the absorption coefficient over some spectral set, e.g. wavelengths, CKD bin, etc. This property returns the spectral set associated with the absorption dataset. In experiments, the spectral set emitted by the atmosphere is given the highest priority when creating the experiment's spectral set. """ pass
# -------------------------------------------------------------------------- # Spatial and thermophysical properties # --------------------------------------------------------------------------
[docs] @abstractmethod def eval_mfp(self, ctx: KernelContext) -> pint.Quantity: """ Compute a typical scattering mean free path. This rough estimate can be used *e.g.* to compute a distance guaranteeing that the medium can be considered optically thick. Parameters ---------- ctx : :class:`.KernelContext` A context data structure containing parameters relevant for kernel dictionary generation. Returns ------- mfp : quantity Mean free path estimate. """ pass
# -------------------------------------------------------------------------- # Kernel dictionary generation # -------------------------------------------------------------------------- @property def shape(self) -> Shape: """ Returns ------- .Shape Shape associated with this atmosphere, based on the scene geometry. """ return self.geometry.atmosphere_shape @property def shape_id(self): """ Returns ------- str ID of the shape associated with the atmosphere in the Mitsuba scene tree. """ return f"shape_{self.id}" @property def medium_id(self): """ Returns ------- str ID of the medium associated with the atmosphere in the Mitsuba scene tree. """ return f"medium_{self.id}" @property def phase_id(self): """ Returns ------- str ID of the phase function associated with the atmosphere in the Mitsuba scene tree. """ return f"phase_{self.id}" @property @abstractmethod def _template_phase(self) -> dict: """ Returns ------- dict The phase function-related contribution to the kernel scene dictionary template for the atmosphere. """ pass @property @abstractmethod def _template_medium(self) -> dict: """ Returns ------- dict The medium-related contribution to the kernel scene dictionary template for the atmosphere. """ pass @property def _template_shape(self) -> dict: """ Returns ------- dict The shape-related contribution to the kernel scene dictionary template for the atmosphere. """ template, _ = traverse(self.shape) return template.data @property def template(self) -> dict: # Inherit docstring result = flatten( { self.phase_id: self._template_phase, self.medium_id: self._template_medium, self.shape_id: self._template_shape, } ) result.update( { f"{self.medium_id}.phase.type": "ref", f"{self.medium_id}.phase.id": self.phase_id, f"{self.shape_id}.bsdf.type": "null", f"{self.shape_id}.interior.type": "ref", f"{self.shape_id}.interior.id": self.medium_id, } ) return result @property @abstractmethod def _params_phase(self) -> dict[str, UpdateParameter]: """ Returns ------- dict The phase function-related contribution to the parameter update map template for the atmosphere. """ pass @property @abstractmethod def _params_medium(self) -> dict[str, UpdateParameter]: """ Returns ------- dict The medium-related contribution to the parameter update map template for the atmosphere. """ pass @property def _params_shape(self) -> dict[str, UpdateParameter]: """ Returns ------- dict The shape-related contribution to the parameter update map template for the atmosphere. """ _, umap = traverse(self.shape) return umap.data @property def params(self) -> dict[str, UpdateParameter]: # Inherit docstring return flatten( { self.medium_id: { **self._params_medium, "phase_function": self._params_phase, }, self.shape_id: self._params_shape, } )
[docs] @parse_docs @attrs.define(eq=False, slots=False) class AbstractHeterogeneousAtmosphere(Atmosphere, ABC): """ Abstract base class for heterogeneous atmospheres. """ scale: float | None = documented( attrs.field( default=None, kw_only=True, converter=attrs.converters.optional(float), validator=attrs.validators.optional(attrs.validators.instance_of(float)), ), doc="If set, the extinction coefficient is scaled by the corresponding " "amount during computation.", type="float or None", init_type="float, optional", )
[docs] def update(self) -> None: """ Update internal state. """ pass
# -------------------------------------------------------------------------- # Spatial and thermophysical properties # -------------------------------------------------------------------------- # Nothing at the moment # -------------------------------------------------------------------------- # Radiative properties # --------------------------------------------------------------------------
[docs] def eval_radprops( self, si: SpectralIndex, zgrid: ZGrid | None = None, optional_fields: bool = False, ) -> xr.Dataset: """ Evaluate the extinction coefficients and albedo profiles. Parameters ---------- si : .SpectralIndex Spectral index. zgrid : .ZGrid, optional Altitude grid on which evaluation is performed. If unset, an instance-specific default is used (see :meth:`zgrid <.AbstractHeterogeneousAtmosphere.zgrid>`). optional_fields : bool, optional, default: False If ``True``, also output the absorption and scattering coefficients, not required for scene setup but useful for analysis and debugging. Returns ------- Dataset A dataset with the following variables: * ``sigma_t``: extinction coefficient; * ``albedo``: albedo; * ``sigma_a``: absorption coefficient (optional); * ``sigma_s``: scattering coefficient (optional). and coordinates: * ``z``: altitude. """ if zgrid is None: zgrid = self.geometry.zgrid sigma_units = ucc.get("collision_coefficient") sigma_t = self.eval_sigma_t(si, zgrid).reshape(zgrid.layers.shape) albedo = self.eval_albedo(si, zgrid).m_as(ureg.dimensionless) data_vars = { "sigma_t": ( "z_layer", sigma_t.m_as(sigma_units), { "units": f"{symbol(sigma_units)}", "standard_name": "extinction_coefficient", "long_name": "extinction coefficient", }, ), "albedo": ( "z_layer", albedo, { "standard_name": "albedo", "long_name": "albedo", "units": "", }, ), } if optional_fields: data_vars.update( { "sigma_a": ( "z_layer", sigma_t.m_as(sigma_units) * (1.0 - albedo), { "units": f"{symbol(sigma_units)}", "standard_name": "absorption_coefficient", "long_name": "absorption coefficient", }, ), "sigma_s": ( "z_layer", sigma_t.m_as(sigma_units) * albedo, { "units": f"{symbol(sigma_units)}", "standard_name": "scattering_coefficient", "long_name": "scattering coefficient", }, ), } ) return xr.Dataset( data_vars, coords={ "z_layer": ( "z_layer", zgrid.layers.magnitude, { "units": f"{symbol(zgrid.layers.units)}", "standard_name": "layer_altitude", "long_name": "layer altitude", }, ) }, )
[docs] @abstractmethod def eval_albedo( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: """ Evaluate albedo spectrum based on a spectral context. This method dispatches evaluation to specialized methods depending on the active mode. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : .ZGrid, optional Altitude grid on which evaluation is performed. If unset, an instance-specific default is used (see :meth:`zgrid <.AbstractHeterogeneousAtmosphere.zgrid>`). Returns ------- quantity Evaluated spectrum as an array with length equal to the number of layers. """ pass
[docs] @abstractmethod def eval_sigma_t( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: """ Evaluate extinction coefficient given a spectral context. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : .ZGrid, optional Altitude grid on which evaluation is performed. If unset, an instance-specific default is used (see :meth:`zgrid <.AbstractHeterogeneousAtmosphere.zgrid>`). Returns ------- quantity Particle layer extinction coefficient. """ pass
[docs] @abstractmethod def eval_sigma_a( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: """ Evaluate absorption coefficient given a spectral context. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : .ZGrid, optional Altitude grid on which evaluation is performed. If unset, an instance-specific default is used (see :meth:`zgrid <.AbstractHeterogeneousAtmosphere.zgrid>`). Returns ------- quantity Particle layer extinction coefficient. """ pass
[docs] @abstractmethod def eval_sigma_s( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: """ Evaluate scattering coefficient given a spectral context. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. zgrid : .ZGrid, optional Altitude grid on which evaluation is performed. If unset, an instance-specific default is used (see :meth:`zgrid <.AbstractHeterogeneousAtmosphere.zgrid>`). Returns ------- quantity Particle layer scattering coefficient. """ pass
[docs] def eval_transmittance( self, si: SpectralIndex, interaction: str = "extinction", ) -> pint.Quantity: """ Evaluate the atmosphere's transmittance with respect to specific interaction. Parameters ---------- si : :class:`.SpectralIndex` Spectral index. interaction : str, optional, default: "extinction" Interaction type. One of ``"extinction"``, ``"absorption"``, ``"scattering"``. Returns ------- quantity Total atmosphere transmittance. """ eval_sigma = { "extinction": self.eval_sigma_t, "absorption": self.eval_sigma_a, "scattering": self.eval_sigma_s, } try: sigma = eval_sigma[interaction](si=si) except KeyError: raise ValueError( f"invalid interaction type '{interaction}', " f"supported: {list(eval_sigma.keys())}" ) dz = np.diff(self.geometry.zgrid.levels) tau = np.sum((sigma * dz).to("1")) return np.exp(-tau)
def eval_transmittance_t(self, si: SpectralIndex) -> pint.Quantity: return self.eval_transmittance(si=si, interaction="extinction") def eval_transmittance_a(self, si: SpectralIndex) -> pint.Quantity: return self.eval_transmittance(si=si, interaction="absorption") def eval_transmittance_s(self, si: SpectralIndex) -> pint.Quantity: return self.eval_transmittance(si=si, interaction="scattering")
[docs] def eval_transmittance_accross_spectral_set( self, interaction: str = "extinction", quad_spec: QuadSpec | None = None, ) -> xr.DataArray: """ Evaluate the atmosphere's transmittance with respect to extinction accross the spectral set emitted by the atmosphere. Parameters ---------- quad_spec : :class:`.QuadSpec`, optional Quadrature specifications. Returns ------- DataArray Atmosphere's transmittance with respect to extinction tabulated against the spectral coordinate. """ if eradiate.mode().is_mono: return self.eval_transmittance_accross_spectral_set_mono( interaction=interaction, quad_spec=quad_spec, ) elif eradiate.mode().is_ckd: if quad_spec is None: quad_spec = QuadSpec.default() return self.eval_transmittance_accross_spectral_set_ckd( interaction=interaction, quad_spec=quad_spec, ) else: raise UnsupportedModeError(supported=["mono", "ckd"])
def eval_transmittance_accross_spectral_set_mono( self, interaction: str = "extinction", quad_spec: QuadSpec | None = None, ) -> xr.DataArray: if quad_spec is None: quad_spec = QuadSpec.default() w = self.spectral_set.wavelengths wunits = symbol(ucc.get("wavelength")) transmittance = np.full(w.size, np.nan) spectral_set = self.spectral_set(quad_spec=quad_spec) for i, si in enumerate(spectral_set.spectral_indices()): transmittance[i] = self.eval_transmittance( si=si, interaction=interaction, ) return xr.DataArray( transmittance, dims=["w"], coords={ "w": ( "w", w.m_as(wunits), ATTRIBUTES["radiation_wavelength"], ) }, attrs={"units": "1", "long_name": "transmittance"}, ) def eval_transmittance_accross_spectral_set_ckd( self, interaction: str = "extinction", quad_spec: QuadSpec | None = None, ) -> xr.DataArray: # compute transmittance at each spectral index transmittance = {} if quad_spec is None: quad_spec = QuadSpec.default() for si in self.spectral_set(quad_spec=quad_spec).spectral_indices(): transmittance[si.as_hashable] = self.eval_transmittance( si=si, interaction=interaction, ) # gather transmittance values that belong to the same CKD bin and # compute the quadrature t_values = np.stack(list(transmittance.values())).m_as("1") i = 0 integrated = [] binset = self.spectral_set(quad_spec=quad_spec) for _bin in binset.bins: ng = _bin.quad.weights.size # number of quadrature g-points integrated.append( Quad.new("gauss_legendre", n=ng).integrate( t_values[i : i + ng], interval=[0, 1], # range of g ) ) i = i + ng w = np.stack([bin.wcenter for bin in self.spectral_set().bins]) wunits = symbol(ucc.get("wavelength")) return xr.DataArray( integrated, dims=["w"], coords={ "w": ( "w", w.m_as(wunits), ATTRIBUTES["radiation_wavelength"], ) }, attrs={"units": "1", "long_name": "transmittance"}, ) # -------------------------------------------------------------------------- # Kernel dictionary generation # -------------------------------------------------------------------------- @property def _template_medium(self) -> dict: # Inherit docstring if isinstance(self.geometry, PlaneParallelGeometry): to_world = self.geometry.atmosphere_volume_to_world volumes = { "albedo": { "type": "gridvolume", "grid": InitParameter( lambda ctx: mi.VolumeGrid( np.reshape( self.eval_albedo(ctx.si).m_as(ureg.dimensionless), (-1, 1, 1), ).astype(np.float32) ), ), "to_world": to_world, }, "sigma_t": { "type": "gridvolume", "grid": InitParameter( lambda ctx: mi.VolumeGrid( np.reshape( self.eval_sigma_t(ctx.si).m_as( uck.get("collision_coefficient") ), (-1, 1, 1), ).astype(np.float32) ), ), "to_world": to_world, }, } elif isinstance(self.geometry, SphericalShellGeometry): volume_rmin = self.geometry.atmosphere_volume_rmin to_world = self.geometry.atmosphere_volume_to_world volumes = { "albedo": { "type": "sphericalcoordsvolume", "volume": { "type": "gridvolume", "grid": InitParameter( lambda ctx: mi.VolumeGrid( np.reshape( self.eval_albedo(ctx.si).m_as(ureg.dimensionless), (1, 1, -1), ).astype(np.float32) ), ), }, "to_world": to_world, "rmin": volume_rmin, }, "sigma_t": { "type": "sphericalcoordsvolume", "volume": { "type": "gridvolume", "grid": InitParameter( lambda ctx: mi.VolumeGrid( np.reshape( self.eval_sigma_t(ctx.si).m_as( uck.get("collision_coefficient") ), (1, 1, -1), ).astype(np.float32) ), ), }, "to_world": to_world, "rmin": volume_rmin, }, } else: raise ValueError( f"unhandled scene geometry type '{type(self.geometry).__name__}'" ) # Create medium dictionary result = { "type": "heterogeneous", **volumes # Note: "phase" is deliberately unset, this is left to the # Atmosphere.template property } if self.scale is not None: result["scale"] = self.scale return result @property def _params_medium(self) -> dict[str, UpdateParameter]: # Inherit docstring if isinstance(self.geometry, PlaneParallelGeometry): return { "albedo.data": UpdateParameter( lambda ctx: np.reshape( self.eval_albedo(ctx.si).m_as(ureg.dimensionless), (-1, 1, 1, 1), ).astype(np.float32), UpdateParameter.Flags.SPECTRAL, lookup_strategy=TypeIdLookupStrategy( node_type=mi.Medium, node_id=self.medium_id, parameter_relpath="albedo.data", ), ), "sigma_t.data": UpdateParameter( lambda ctx: np.reshape( self.eval_sigma_t(ctx.si).m_as( uck.get("collision_coefficient") ), (-1, 1, 1, 1), ).astype(np.float32), UpdateParameter.Flags.SPECTRAL, lookup_strategy=TypeIdLookupStrategy( node_type=mi.Medium, node_id=self.medium_id, parameter_relpath="sigma_t.data", ), ), } elif isinstance(self.geometry, SphericalShellGeometry): return { "albedo.volume.data": UpdateParameter( lambda ctx: np.reshape( self.eval_albedo(ctx.si).m_as(ureg.dimensionless), (1, 1, -1, 1), ).astype(np.float32), UpdateParameter.Flags.SPECTRAL, lookup_strategy=TypeIdLookupStrategy( node_type=mi.Medium, node_id=self.medium_id, parameter_relpath="albedo.volume.data", ), ), "sigma_t.volume.data": UpdateParameter( lambda ctx: np.reshape( self.eval_sigma_t(ctx.si).m_as( uck.get("collision_coefficient") ), (1, 1, -1, 1), ).astype(np.float32), UpdateParameter.Flags.SPECTRAL, lookup_strategy=TypeIdLookupStrategy( node_type=mi.Medium, node_id=self.medium_id, parameter_relpath="sigma_t.volume.data", ), ), } else: # Shouldn't happen, prevented by validator raise ValueError( f"unhandled scene geometry type '{type(self.geometry).__name__}'" )