Source code for eradiate.pipelines._assemble

from __future__ import annotations

import logging
import typing as t
import warnings

import attrs
import numpy as np
import pint
import pinttr
import xarray as xr

import eradiate

from ._core import PipelineStep
from ..attrs import documented, parse_docs
from ..exceptions import UnsupportedModeError
from ..frame import angles_in_hplane
from ..scenes.illumination import ConstantIllumination, DirectionalIllumination
from ..scenes.measure import Measure
from ..scenes.spectra import (
    InterpolatedSpectrum,
    Spectrum,
)
from ..units import symbol, to_quantity
from ..units import unit_context_kernel as uck
from ..units import unit_registry as ureg

logger = logging.getLogger(__name__)


[docs]@parse_docs @attrs.define class AddIllumination(PipelineStep): """ Add illumination data. This post-processing pipeline step adds illumination data: * if `illumination` is a :class:`.DirectionalIllumination` instance, then a data variable (holding the incoming top-of-scene flux with respect to a horizontal surface) is created, with dimensions ``sza`` and ``vaa``; * if `illumination` is a :class:`.ConstantIllumination` instance, then the created data variable has no coordinate. """ illumination: DirectionalIllumination | ConstantIllumination = documented( attrs.field( validator=attrs.validators.instance_of( (DirectionalIllumination, ConstantIllumination) ), repr=lambda self: f"{self.__class__.__name__}(id='{self.id}', ...)", ), doc="An :class:`.Illumination` instance from which the illumination " "data originates.", type=":class:`.DirectionalIllumination` or :class:`.ConstantIllumination`", ) measure: Measure = documented( attrs.field( validator=attrs.validators.instance_of(Measure), repr=lambda self: f"{self.__class__.__name__}(id='{self.id}', ...)", ), doc="A :class:`.Measure` instance from the data originates.", type=":class:`.Measure`", ) irradiance_var: str = documented( attrs.field(default="irradiance", validator=attrs.validators.instance_of(str)), doc="Name of the variable storing irradiance (incoming flux) values.", type="str", default='"irradiance"', )
[docs] def transform(self, x: t.Any) -> t.Any: logger.debug("add_illumination: begin") k_irradiance_units = uck.get("irradiance") illumination = self.illumination # measure = self.measure result = x.copy(deep=False) # Collect spectral coordinate values for verification purposes wavelengths_dataset = to_quantity(x.w) def eval_illumination_spectrum( field_name: str, k_units: pint.Unit ) -> pint.Quantity: # Local helper function to help with illumination spectrum evaluation spectrum: Spectrum = getattr(illumination, field_name) results_wavelengths = to_quantity(x.w) if eradiate.mode().is_mono: wavelengths = results_wavelengths assert np.allclose(wavelengths, wavelengths_dataset) return spectrum.eval_mono(wavelengths).m_as(k_units) elif eradiate.mode().is_ckd: result = spectrum.eval_ckd( w=results_wavelengths, g=[0, 1], ).m_as(k_units) # Reorder data by ascending wavelengths # indices = results_wavelengths.argsort() wavelengths = results_wavelengths # [indices] assert np.allclose(wavelengths, wavelengths_dataset) # result = result[indices] return result else: raise UnsupportedModeError(supported=("monochromatic", "ckd")) if isinstance(illumination, DirectionalIllumination): # Collect illumination angular data saa = illumination.azimuth.m_as(ureg.deg) sza = illumination.zenith.m_as(ureg.deg) cos_sza = np.cos(np.deg2rad(sza)) # Add angular dimensions result = result.expand_dims({"sza": [sza], "saa": [saa]}, axis=(0, 1)) result.coords["sza"].attrs = { "standard_name": "solar_zenith_angle", "long_name": "solar zenith angle", "units": symbol("deg"), } result.coords["saa"].attrs = { "standard_name": "solar_azimuth_angle", "long_name": "solar azimuth angle", "units": symbol("deg"), } # Collect illumination spectral data irradiances = eval_illumination_spectrum("irradiance", k_irradiance_units) # Add irradiance variable result[self.irradiance_var] = ( ("sza", "saa", "w"), (irradiances * cos_sza).reshape((1, 1, len(irradiances))), ) elif isinstance(illumination, ConstantIllumination): # Collect illumination spectral data k_radiance_units = uck.get("radiance") radiances = eval_illumination_spectrum("radiance", k_radiance_units) # Add irradiance variable result[self.irradiance_var] = ( ("w",), np.pi * radiances.reshape((len(radiances),)), ) else: raise TypeError( "field 'illumination' must be one of " "(DirectionalIllumination, ConstantIllumination), got a " f"{illumination.__class__.__name__}" ) result[self.irradiance_var].attrs = { "standard_name": "horizontal_solar_irradiance_per_unit_wavelength", "long_name": "horizontal spectral irradiance", "units": symbol(k_irradiance_units), } logger.debug("add_illumination: end") return result
[docs]@parse_docs @attrs.define class AddViewingAngles(PipelineStep): """ Create new ``vza`` and ``vaa`` coordinate variables mapping viewing angles to other coordinates. """ measure: Measure = documented( attrs.field( validator=attrs.validators.instance_of(Measure), repr=lambda self: f"{self.__class__.__name__}(id='{self.id}', ...)", ), doc="A :class:`.Measure` instance from which the processed data originates.", type=":class:`.Measure`", )
[docs] def transform(self, x: t.Any) -> t.Any: measure = self.measure viewing_angles = measure.viewing_angles # Collect zenith and azimuth values theta = viewing_angles[:, :, 0] phi = viewing_angles[:, :, 1] # Attach coordinates with xr.set_options(keep_attrs=True): result = x.assign_coords( { "vza": ( ("x_index", "y_index"), theta.m_as(ureg.deg), { "standard_name": "viewing_zenith_angle", "long_name": "viewing zenith angle", "units": symbol("deg"), }, ), "vaa": ( ("x_index", "y_index"), phi.m_as(ureg.deg), { "standard_name": "viewing_azimuth_angle", "long_name": "viewing azimuth angle", "units": symbol("deg"), }, ), } ) return result
@ureg.wraps(ret=("rad", "rad"), args=("rad", "rad", "rad"), strict=True) def _remap_viewing_angles_plane( plane: np.typing.ArrayLike, theta: np.typing.ArrayLike, phi: np.typing.ArrayLike, ) -> tuple[np.typing.ArrayLike, np.typing.ArrayLike]: r""" Remap viewing angles to a hemisphere plane cut. Parameters ---------- plane : quantity Plane cut orientation (scalar value). theta : quantity List of zenith angle values with (N,) shape. phi : quantity List of azimuth angle values with (N,) shape. Returns ------- theta : quantity List of zenith angle values in :math:`[-\pi, \pi]` with (N,) shape. phi : quantity List of azimuth angle values in :math:`[0, 2\pi]` with (N,) shape (equal to `plane` modulo :math:`\pi`). Warns ----- When zenith angle values are not sorted in ascending order. """ # Normalize all angles twopi = 2.0 * np.pi plane = plane % twopi theta = theta % twopi phi = phi % twopi # Check that phi values are compatible with requested plane in_plane_positive, in_plane_negative = angles_in_hplane( plane, theta, phi, raise_exc=True ) # Check if any point is allocated to both half-planes (uncomment to debug) # assert not np.any(in_plane_positive & in_plane_negative) # Normalise zenith values theta = np.where(in_plane_positive, theta, -theta) # Normalize azimuth values phi = np.full_like(theta, plane) # Check ordering and warn if it is not strictly increasing if not _is_sorted(theta): warnings.warn( "Viewing zenith angle values are not sorted in ascending order, " "you might want to consider changing direction definitions." ) return theta, phi def _is_sorted(x): return np.all(x[:-1] <= x[1:])
[docs]@parse_docs @attrs.define class AddSpectralResponseFunction(PipelineStep): """ Add spectral response function data. This post-processing pipeline step adds spectral response function data to the processed dataset. """ measure: Measure = documented( attrs.field( validator=attrs.validators.instance_of(Measure), repr=lambda self: f"{self.__class__.__name__}(id='{self.id}', ...)", ), doc="A :class:`.Measure` instance from which the processed data originates.", type=":class:`.Measure`", )
[docs] def transform(self, x: t.Any) -> t.Any: logger.debug("add_spectral_response_function: begin") result = x.copy(deep=False) # Evaluate SRF srf = self.measure.srf if isinstance(srf, InterpolatedSpectrum): srf_w = srf.wavelengths srf_values = pinttr.util.ensure_units(srf.values, ureg.dimensionless) else: raise TypeError(f"unsupported SRF type '{srf.__class__.__name__}'") # Add SRF spectral coordinates to dataset w_units = ureg.nm result = result.assign_coords( { "srf_w": ( "srf_w", srf_w.m_as(w_units), { "standard_name": "radiation_wavelength", "long_name": "wavelength", "units": "nm", }, ) } ) # Add SRF variable to dataset result["srf"] = ("srf_w", srf_values.m) result.srf.attrs = { "standard_name": "spectral_response_function", "long_name": "spectral response function", "units": "", } logger.debug("add_spectral_response_function: end") return result