Source code for eradiate.pipelines._aggregate
import itertools
import logging
import typing as t
from collections import OrderedDict
import attrs
import numpy as np
import xarray as xr
from ._core import PipelineStep
from ..attrs import documented, parse_docs
from ..scenes.measure import Measure
from ..spectral.ckd import BinSet
from ..units import symbol
from ..units import unit_context_config as ucc
from ..units import unit_context_kernel as uck
logger = logging.getLogger(__name__)
[docs]@parse_docs
@attrs.define
class AggregateCKDQuad(PipelineStep):
"""
Compute CKD quadrature.
In CKD modes, this pipeline step aggregates spectral data and
evaluates the selected quadrature rule. The following updates to the input
data are expected:
* the ``index`` dimension is dropped;
* the ``bin`` dimension is renamed ``w``;
* the ``bin`` coordinate persists and is indexed by ``w``;
* a ``w`` coordinate is created and contains the central wavelength of each
bin;
* a ``bin_wmin`` (resp. ``bin_wmax``) coordinate is created and contains the
lower (resp. upper) spectral bound of each bin;
* the dataset is reordered by ascending ``w`` values.
Notes
-----
* The ``spp`` variable is averaged on the ``index`` dimension.
* In non-CKD modes, this step is a no-op.
"""
measure: Measure = documented(
attrs.field(
validator=attrs.validators.instance_of(Measure),
repr=lambda self: f"{self.__class__.__name__}(id='{self.id}', ...)",
),
doc="Measure from which the processed data was obtained.",
type=":class:`.Measure`",
)
binset: BinSet = documented(
attrs.field(validator=attrs.validators.instance_of(BinSet)),
doc="Bin set.",
type=":class:`.BinSet`",
)
var: str = documented(
attrs.field(default="img", validator=attrs.validators.instance_of(str)),
doc="Name of the variable for which CKD quadrature computation "
"is to be performed.",
type="str",
default='"img"',
)
[docs] def transform(self, x: t.Any) -> t.Any:
logger.debug("aggregate_ckd_quad: begin")
# Compute quadrature spectrum-indexed variables and turn spp
# into a per-bin average
var = self.var
# Get dimensions of current variable
img = x.data_vars[var]
dims = OrderedDict((y, len(img.coords[y])) for y in img.dims)
if "bin" not in dims:
raise ValueError(f"variable '{var}' is missing dimension 'bin'")
if "index" not in dims:
raise ValueError(f"variable '{var}' is missing dimension 'index'")
# Init storage
del dims["bin"]
del dims["index"]
# Deduplicate bin list preserving order
bin_ids = list(OrderedDict.fromkeys(x.bin.to_index()))
bin_ids = [str(int(_)) for _ in x.bin.values]
logger.debug(f"{bin_ids=}")
n_bins = len(bin_ids)
# Collect wavelengths associated with each bin
wavelength_units = ucc.get("wavelength")
wavelengths = np.array(x.bin.values, dtype=float)
bin_wmins = []
bin_wmaxs = []
for bin in self.binset.bins:
wmin, wmax = bin.wmin, bin.wmax
bin_wmins.append(wmin.m_as(wavelength_units))
bin_wmaxs.append(wmax.m_as(wavelength_units))
result = x
aggregated = xr.DataArray(
np.zeros([n_bins] + list(dims.values())),
coords={"bin": img.bin, **{dim: img.coords[dim] for dim in dims}},
)
# TODO: PR#311 hack (next 3 lines)
_bins = self.binset.bins
_bins_wcenters = [b.wcenter.m_as(wavelength_units) for b in _bins]
_bins_wcenters_str = [str(int(w)) for w in _bins_wcenters]
# For each bin and each pixel, compute quadrature and store the result
for i_bin, bin in enumerate(bin_ids):
# TODO: PR#311 hack (next 2 lines)
_bin_index = _bins_wcenters_str.index(bin)
_bin = _bins[_bin_index]
values_at_nodes = img.sel(bin=bin).values
# Rationale: Avoid using xarray's indexing in this loop for
# performance reasons (wrong data indexing method will result in
# 10x+ speed reduction)
for indexes in itertools.product(*[list(range(n)) for n in dims.values()]):
aggregated.values[(i_bin, *indexes)] = _bin.quad.integrate(
values_at_nodes[(slice(None), *indexes)],
interval=np.array([0.0, 1.0]),
)
result = result.assign({var: aggregated})
result[var].attrs = x[var].attrs
# Average the 'spp' variable over the 'index' dimension
with xr.set_options(keep_attrs=True):
result["spp"] = x.spp.mean(dim="index")
# Add spectral coordinates
result = result.assign_coords(
{
"w": (
"bin",
wavelengths,
{
"standard_name": "wavelength",
"long_name": "wavelength",
"units": symbol(wavelength_units),
},
),
"bin_wmin": (
"bin",
bin_wmins,
{
"standard_name": "bin_wmin",
"long_name": "spectral bin lower bound",
"units": symbol(wavelength_units),
},
),
"bin_wmax": (
"bin",
bin_wmaxs,
{
"standard_name": "bin_wmax",
"long_name": "spectral bin upper bound",
"units": symbol(wavelength_units),
},
),
}
)
# Swap the 'bin' and 'w' dimensions
result = result.swap_dims({"bin": "w"})
# Remove the 'index' dimension
result = result.drop_dims("index")
# Reorder by ascending "w"
result = result.sortby("w")
logger.debug("aggregate_ckd_quad: end")
return result
[docs]@parse_docs
@attrs.define
class AggregateRadiosity(PipelineStep):
"""
Aggregate flux density field.
"""
sector_radiosity_var: str = documented(
attrs.field(
default="sector_radiosity", validator=attrs.validators.instance_of(str)
),
doc="Name of the variable containing radiosity values for the "
"hemisphere sector corresponding to each film pixel. This quantity is "
"expressed in flux units (typically W/m²) and, when summed over the "
"entire film, aggregates into a radiosity.",
type="str",
default='"sector_radiosity"',
)
radiosity_var: str = documented(
attrs.field(default="radiosity", validator=attrs.validators.instance_of(str)),
doc="Name of the variable storing the computed radiosity "
"(leaving flux) value.",
type="str",
default='"radiosity"',
)
[docs] def transform(self, x: t.Any) -> t.Any:
result = x.copy(deep=False)
result[self.radiosity_var] = result[self.sector_radiosity_var].sum(
dim=("x_index", "y_index")
)
result[self.radiosity_var].attrs = {
"standard_name": "toa_outgoing_flux_density_per_unit_wavelength",
"long_name": "top-of-atmosphere outgoing spectral flux density",
"units": symbol(uck.get("irradiance")),
}
return result