Source code for eradiate.scenes.atmosphere._heterogeneous

"""
Heterogeneous atmospheres.
"""
from __future__ import annotations

import logging
from collections import abc as cabc

import attrs
import mitsuba as mi
import numpy as np
import pint

from ._core import AbstractHeterogeneousAtmosphere, atmosphere_factory
from ._molecular import MolecularAtmosphere
from ._particle_layer import ParticleLayer
from ..core import traverse
from ..phase import BlendPhaseFunction, PhaseFunction
from ...attrs import documented, parse_docs
from ...contexts import KernelContext
from ...kernel import TypeIdLookupStrategy
from ...radprops import ZGrid
from ...spectral.ckd import BinSet, QuadSpec
from ...spectral.index import SpectralIndex
from ...spectral.mono import WavelengthSet
from ...units import unit_context_config as ucc
from ...units import unit_registry as ureg
from ...util.misc import cache_by_id

logger = logging.getLogger(__name__)


def _molecular_converter(value):
    if isinstance(value, cabc.MutableMapping) and ("type" not in value):
        value["type"] = "molecular"
    return atmosphere_factory.convert(value, allowed_cls=MolecularAtmosphere)


def _particle_layer_converter(value):
    if not value:
        return []

    if not isinstance(value, (list, tuple)):
        return _particle_layer_converter([value])

    else:
        result = []

        for element in value:
            if isinstance(element, cabc.MutableMapping) and ("type" not in element):
                element["type"] = "particle_layer"
            result.append(
                atmosphere_factory.convert(element, allowed_cls=ParticleLayer)
            )

        return result


[docs] @parse_docs @attrs.define(eq=False, slots=False) class HeterogeneousAtmosphere(AbstractHeterogeneousAtmosphere): """ Heterogeneous atmosphere scene element [``heterogeneous``]. """ molecular_atmosphere: MolecularAtmosphere | None = documented( attrs.field( default=None, converter=attrs.converters.optional(_molecular_converter), validator=attrs.validators.optional( attrs.validators.instance_of(MolecularAtmosphere) ), ), doc="Molecular atmosphere. May be specified as a dictionary interpreted " 'by :data:`.atmosphere_factory`; in that case, the ``"type"`` parameter ' 'may be omitted and will automatically be set to ``"molecular"``.', type=".MolecularAtmosphere or None", init_type=".MolecularAtmosphere or dict, optional", default="None", ) @molecular_atmosphere.validator def _molecular_atmosphere_validator(self, attribute, value): if value is None: return if value.scale is not None: raise ValueError( f"while validating {attribute.name}: components cannot be " "scaled individually" ) particle_layers: list[ParticleLayer] = documented( attrs.field( factory=list, converter=_particle_layer_converter, validator=attrs.validators.deep_iterable( attrs.validators.instance_of(ParticleLayer) ), ), doc="List of particle layers. Elements may be specified as " "dictionaries interpreted by :data:`.atmosphere_factory`; in that " "case, the ``type`` parameter may be omitted and will automatically " 'be set to ``"particle_layer"``.', type="list of .ParticleLayer", init_type="list of .ParticleLayer, optional", default="[]", ) @particle_layers.validator def _particle_layers_validator(self, attribute, value): if not all(component.scale is None for component in value): raise ValueError( f"while validating {attribute.name}: components cannot be " "scaled individually" ) _zgrid: ZGrid = documented( attrs.field( default=None, converter=attrs.converters.optional( lambda x: ZGrid(x) if not isinstance(x, ZGrid) else x ), validator=attrs.validators.optional(attrs.validators.instance_of(ZGrid)), repr=False, ), doc="A high-resolution layer altitude mesh on which the radiative " "properties of the components are interpolated. If unset, a default grid " "with one layer per 100 m (or 10 layers if the atmosphere object " "height is less than 100 m) is used.", type=".ZGrid", init_type=".ZGrid, optional", ) @property def components(self) -> list[MolecularAtmosphere | ParticleLayer]: """ Returns ------- list of .AbstractHeterogeneousAtmosphere The list of all registered atmospheric components. """ result = [self.molecular_atmosphere] if self.molecular_atmosphere else [] result.extend(self.particle_layers) return result
[docs] def update(self): # Inherit docstring if not self.components: raise ValueError("HeterogeneousAtmosphere must have at least one component") # Force component IDs for i, component in enumerate(self.components): component.update() component.id = f"{self.id}_component_{i}" logger.debug( "Override the geometry of component '%s' to %s (was %s)", component.id, self.geometry, component.geometry, ) component.geometry = self.geometry
# -------------------------------------------------------------------------- # Spatial extension and thermophysical properties # -------------------------------------------------------------------------- @property def zgrid(self) -> ZGrid: # Inherit docstring return self._zgrid @property def bottom(self) -> pint.Quantity: # Inherit docstring return min([component.bottom for component in self.components]) @property def top(self) -> pint.Quantity: # Inherit docstring return max([component.top for component in self.components])
[docs] def spectral_set( self, quad_spec: QuadSpec | None = None, ) -> None | BinSet | WavelengthSet: if quad_spec is None: quad_spec = QuadSpec.default() components_with_spectral_set = [ component for component in self.components if component.spectral_set(quad_spec=quad_spec) is not None ] # at most one component is allowed to have a spectral set if len(components_with_spectral_set) > 1: raise ValueError( "at most one component is allowed to have a spectral set", components_with_spectral_set, ) if len(components_with_spectral_set) == 1: return components_with_spectral_set[0].spectral_set(quad_spec=quad_spec) # Fall back return None
[docs] def eval_mfp(self, ctx: KernelContext) -> pint.Quantity: # Inherit docstring mfp = [component.eval_mfp(ctx=ctx) for component in self.components] return max(mfp)
# -------------------------------------------------------------------------- # Radiative properties # --------------------------------------------------------------------------
[docs] def eval_albedo( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: # Inherit docstring if zgrid is not None and zgrid is not self.geometry.zgrid: raise ValueError("zgrid must be left unset or set to self.geometry.zgrid") units = ucc.get("collision_coefficient") sigma_s = self.eval_sigma_s(si).m_as(units) sigma_t = self.eval_sigma_t(si).m_as(units) albedo = np.zeros_like(sigma_s) np.divide(sigma_s, sigma_t, where=sigma_t != 0.0, out=albedo) return albedo * ureg.dimensionless
@cache_by_id def _eval_sigma_t_impl(self, si: SpectralIndex) -> pint.Quantity: result = np.zeros((len(self.components), self.geometry.zgrid.n_layers)) sigma_units = ucc.get("collision_coefficient") # Evaluate extinction for current component for i, component in enumerate(self.components): result[i] = component.eval_sigma_t(si, self.geometry.zgrid).m_as( sigma_units ) return result * sigma_units
[docs] def eval_sigma_t( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: # Inherit docstring if zgrid is not None and zgrid is not self.geometry.zgrid: raise ValueError("zgrid must be left unset or set to self.geometry.zgrid") return self._eval_sigma_t_impl(si).sum(axis=0)
[docs] def eval_sigma_a( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: # Inherit docstring if zgrid is not None and zgrid is not self.geometry.zgrid: raise ValueError("zgrid must be left unset or set to self.geometry.zgrid") return self.eval_sigma_t(si) - self.eval_sigma_s(si)
@cache_by_id def _eval_sigma_s_impl(self, si: SpectralIndex) -> pint.Quantity: result = np.zeros((len(self.components), self.geometry.zgrid.n_layers)) sigma_units = ucc.get("collision_coefficient") # Evaluate scattering coefficient for current component for i, component in enumerate(self.components): result[i] = component.eval_sigma_s(si, self.geometry.zgrid).m_as( sigma_units ) return result * sigma_units def _eval_sigma_s_component( self, si: SpectralIndex, n_component: int ) -> pint.Quantity: return self._eval_sigma_s_impl(si)[n_component]
[docs] def eval_sigma_s( self, si: SpectralIndex, zgrid: ZGrid | None = None ) -> pint.Quantity: # Inherit docstring if zgrid is not None and zgrid is not self.geometry.zgrid: raise ValueError("zgrid must be left unset or set to self.geometry.zgrid") return self._eval_sigma_s_impl(si).sum(axis=0)
# -------------------------------------------------------------------------- # Kernel dictionary generation # -------------------------------------------------------------------------- @property def phase(self) -> PhaseFunction: # Inherit docstring if len(self.components) == 1: return self.components[0].phase else: components, weights = [], [] sigma_units = ucc.get("collision_coefficient") for i, component in enumerate(self.components): components.append(component.phase) def eval_sigma_s(si: SpectralIndex, n_component: int = i) -> np.ndarray: return self._eval_sigma_s_component(si, n_component).m_as( sigma_units ) weights.append(eval_sigma_s) return BlendPhaseFunction( components=components, weights=weights, geometry=self.geometry ) @property def _template_phase(self) -> dict: # Inherit docstring return traverse(self.phase)[0].data @property def _params_phase(self) -> dict: # Inherit docstring umap = traverse(self.phase)[1].data # Add prefix and lookup strategy to all entries result = {} for uparam_key, uparam in umap.items(): result[uparam_key] = attrs.evolve( uparam, lookup_strategy=TypeIdLookupStrategy( node_type=mi.Medium, node_id=self.medium_id, parameter_relpath=f"phase_function.{uparam_key}", ), ) return result