Source code for eradiate.pipelines._compute

from __future__ import annotations

import logging
import typing as t

import attrs
import numpy as np
from pinttr.util import always_iterable

from ._core import PipelineStep
from ..attrs import documented, parse_docs
from ..scenes.measure import Measure
from ..scenes.spectra import InterpolatedSpectrum, UniformSpectrum
from ..units import symbol, to_quantity
from ..units import unit_registry as ureg

logger = logging.getLogger(__name__)


[docs]@parse_docs @attrs.define class ApplySpectralResponseFunction(PipelineStep): r""" Apply spectral response function to specified variables. This post-processing pipeline step applies the spectral response function to specified variables. It creates new corresponding data variables with no dependency against the wavelength dimension. Notes ----- The processed dataset is expected to have a ``bin`` coordinate, associated with bounds ``bin_wmin`` and ``bin_wmax``. If not, :meth:`transform` will raise an exception. A common nomenclature practice refers to this process as "convolution" :cite:`Burggraaff2020BiasesIncorrectReflectance`, but we prefer to avoid this term as we consider it to refer to another mathematical operation defined by: .. math:: (f \ast g) \, (t) := \int_{-\infty}^{+\infty} f(\tau) \, g(t - \tau) \, d\tau Instead, we adopt the term "spectral response weighted average" since the operation is defined by: .. math:: \overline{x} = \frac{\int x(\lambda) \, w(\lambda) \, d\lambda} {\int w(\lambda) \, d\lambda} where * :math:`x` is the variable to be weighted, * :math:`w` is the spectral response function, and which effectively translates into a weighted average in numerical form, with the weights being the spectral response function values. """ measure: Measure = documented( attrs.field( validator=attrs.validators.instance_of(Measure), repr=lambda self: f"{self.__class__.__name__}(id='{self.id}', ...)", ), doc="A :class:`.Measure` instance from which the processed data originates.", type=":class:`.Measure`", ) vars: list[str] = documented( attrs.field( factory=list, converter=lambda x: list(always_iterable(x)), validator=attrs.validators.deep_iterable( member_validator=attrs.validators.instance_of(str) ), ), doc="List of variables to which the spectral response function is to be " "applied.", type="list of str", init_type="str or list of str", default="[]", )
[docs] def transform(self, x: t.Any) -> t.Any: result = x.copy(deep=False) if not {"bin_wmin", "bin_wmax"}.issubset(set(result.coords.keys())): raise ValueError( "input data is missing 'bin_wmin' and/or 'bin_wmax' coordinates" ) measure = self.measure # Evaluate integral of spectral response function within selected interval wmin = to_quantity(result.bin_wmin).min() wmax = to_quantity(result.bin_wmax).max() srf = measure.srf srf_int = srf.integral(wmin, wmax) if isinstance(srf, InterpolatedSpectrum): srf_w = srf.wavelengths elif isinstance(srf, UniformSpectrum): srf_w = np.array([wmin.m_as(ureg.nm), wmax.m_as(ureg.nm)]) * ureg.nm else: raise TypeError(f"unhandled SRF type '{srf.__class__.__name__}'") for var in self.vars: # Evaluate integral of product of variable and SRF within selected interval data_w = to_quantity(result.w) # Spectral grid is the finest between data and SRF grids w_units = data_w.units w_m = np.array(sorted(set(data_w.m_as(w_units)) | set(srf_w.m_as(w_units)))) # If data var has length 1 on spectral dimension, directly select # the value instead of using interpolation (it's a known scipy issue) if len(result[var].w) == 1: # Note: The tricky thing is to recreate and extend the 'w' # dimension with the same axis index as in the original data var_values = ( result[var] .isel(w=0, drop=True) .expand_dims(w=w_m, axis=result[var].get_axis_num("w")) ) # Otherwise, use nearest neighbour interpolation (we assume that var # is constant over each spectral bin) else: var_values = result[var].interp( w=w_m, method="nearest", kwargs={"fill_value": "extrapolate"} ) srf_values = ( srf.eval_mono(w_m * w_units) .reshape([-1 if dim == "w" else 1 for dim in var_values.dims]) .magnitude ) assert isinstance(srf_values, np.ndarray) # Check for leftover bugs var_srf_int = (var_values * srf_values).integrate("w") # Apply SRF to variable and store result dims = list(result[var].dims) dims.remove("w") result[f"{var}_srf"] = (dims, var_srf_int.values / srf_int.m_as(w_units)) attrs = result[var].attrs.copy() if "standard_name" in attrs: attrs["standard_name"] += "_srf" if "long_name" in attrs: attrs["long_name"] += " (SRF applied)" result[f"{var}_srf"].attrs = attrs logger.debug("ApplySpectralResponseFunction pipeline step: end") return result
[docs]@parse_docs @attrs.define class ComputeReflectance(PipelineStep): """ Derive reflectance from radiance and irradiance values. """ radiance_var: str = documented( attrs.field(default="radiance", validator=attrs.validators.instance_of(str)), doc="Name of the variable storing leaving radiance values.", type="str", default='"radiance"', ) irradiance_var: str = documented( attrs.field(default="irradiance", validator=attrs.validators.instance_of(str)), doc="Name of the variable storing irradiance (incoming flux) values.", type="str", default='"irradiance"', ) brdf_var: str = documented( attrs.field(default="brdf", validator=attrs.validators.instance_of(str)), doc="Name of the variable storing BRDF values.", type="str", default='"brdf"', ) brf_var: str = documented( attrs.field(default="brf", validator=attrs.validators.instance_of(str)), doc="Name of the variable storing BRF values.", type="str", default='"brf"', )
[docs] def transform(self, x: t.Any) -> t.Any: logger.debug("ComputeReflectance pipeline step: begin") # Compute BRDF and BRF result = x.copy(deep=False) # We assume that all quantities are stored in kernel units result[self.brdf_var] = result[self.radiance_var] / result[self.irradiance_var] result[self.brdf_var].attrs = { "standard_name": "brdf", "long_name": "bi-directional reflection distribution function", "units": symbol("1/sr"), } result[self.brf_var] = result[self.brdf_var] * np.pi result[self.brf_var].attrs = { "standard_name": "brf", "long_name": "bi-directional reflectance factor", "units": symbol("dimensionless"), } logger.debug("ComputeReflectance pipeline step: end") return result
[docs]@parse_docs @attrs.define class ComputeAlbedo(PipelineStep): """ Derive the albedo from radiosity and irradiance fields. """ radiosity_var: str = documented( attrs.field(default="radiosity", validator=attrs.validators.instance_of(str)), doc="Name of the variable storing the radiosity (leaving flux) value.", type="str", default='"radiosity"', ) irradiance_var: str = documented( attrs.field(default="irradiance", validator=attrs.validators.instance_of(str)), doc="Name of the variable storing the irradiance (incoming flux) value.", type="str", default='"irradiance"', ) albedo_var: str = documented( attrs.field(default="albedo", validator=attrs.validators.instance_of(str)), doc="Name of the variable storing the albedo value.", type="str", default='"albedo"', )
[docs] def transform(self, x: t.Any) -> t.Any: logger.debug("ComputeAlbedo pipeline step: begin") # Compute albedo result = x.copy(deep=False) # We assume that all quantities are stored in kernel units result[self.albedo_var] = ( result[self.radiosity_var] / result[self.irradiance_var] ) result[self.albedo_var].attrs = { "standard_name": "albedo", "long_name": "surface albedo", "units": "", } logger.debug("ComputeAlbedo pipeline step: end") return result