Source code for eradiate.pipelines._aggregate
import itertools
import logging
import typing as t
from collections import OrderedDict
import attrs
import numpy as np
import xarray as xr
from ._core import PipelineStep
from ..attrs import documented, parse_docs
from ..scenes.measure import Measure
from ..spectral.ckd import BinSet
from ..units import symbol
from ..units import unit_context_config as ucc
from ..units import unit_context_kernel as uck
logger = logging.getLogger(__name__)
[docs]@parse_docs
@attrs.define
class AggregateCKDQuad(PipelineStep):
"""
Compute CKD quadrature.
In CKD modes, this pipeline step aggregates spectral data and
evaluates the selected quadrature rule. The following updates to the input
data are expected:
* the ``g`` dimension is dropped;
* the ``bin`` dimension is renamed ``w``;
* the ``bin`` coordinate persists and is indexed by ``w``;
* a ``w`` coordinate is created and contains the central wavelength of each
bin;
* a ``bin_wmin`` (resp. ``bin_wmax``) coordinate is created and contains the
lower (resp. upper) spectral bound of each bin;
* the dataset is reordered by ascending ``w`` values.
Notes
-----
* The ``spp`` variable is averaged on the ``g`` dimension.
* In non-CKD modes, this step is a no-op.
"""
measure: Measure = documented(
attrs.field(
validator=attrs.validators.instance_of(Measure),
repr=lambda self: f"{self.__class__.__name__}(id='{self.id}', ...)",
),
doc="Measure from which the processed data was obtained.",
type=":class:`.Measure`",
)
binset: BinSet = documented(
attrs.field(validator=attrs.validators.instance_of(BinSet)),
doc="Bin set.",
type=":class:`.BinSet`",
)
var: str = documented(
attrs.field(default="img", validator=attrs.validators.instance_of(str)),
doc="Name of the variable for which CKD quadrature computation "
"is to be performed.",
type="str",
default='"img"',
)
[docs] def transform(self, x: t.Any) -> t.Any:
logger.debug("aggregate_ckd_quad: begin")
# Compute quadrature spectrum-indexed variables and turn spp
# into a per-bin average
var = self.var
# Get dimensions of current variable
img = x.data_vars[var]
dims = OrderedDict((y, len(img.coords[y])) for y in img.dims)
if "w" not in dims:
raise ValueError(f"variable '{var}' is missing dimension 'w'")
if "g" not in dims:
raise ValueError(f"variable '{var}' is missing dimension 'g'")
# Init storage
del dims["w"]
del dims["g"]
# Collect wavelengths associated with each bin
wunit = ucc.get("wavelength")
wavelengths = x.w.values
bin_wmins = []
bin_wmaxs = []
for bin in self.binset.bins:
wmin, wmax = bin.wmin, bin.wmax
bin_wmins.append(wmin.m_as(wunit))
bin_wmaxs.append(wmax.m_as(wunit))
result = x
aggregated = xr.DataArray(
np.zeros([wavelengths.size] + list(dims.values())),
coords={"w": img.w, **{dim: img.coords[dim] for dim in dims}},
)
# For each bin and each pixel, compute quadrature and store the result
for i in range(wavelengths.size):
bin_i = self.binset.bins[i]
values_at_nodes = img.isel(w=i).values
# Rationale: Avoid using xarray's indexing in this loop for
# performance reasons (wrong data indexing method will result in
# 10x+ speed reduction)
indexes_product = itertools.product(
*[list(range(n)) for n in dims.values()]
)
for indexes in indexes_product:
aggregated.values[(i, *indexes)] = bin_i.quad.integrate(
values_at_nodes[(slice(None), *indexes)],
interval=np.array([0.0, 1.0]),
)
result = result.assign({var: aggregated})
result[var].attrs = x[var].attrs
# Average the 'spp' variable over the 'index' dimension
with xr.set_options(keep_attrs=True):
result["spp"] = x.spp.mean(dim="g")
# Add spectral coordinates
result = result.assign_coords(
{
"w": (
"w",
wavelengths,
{
"standard_name": "radiation_wavelength",
"long_name": "wavelength",
"units": symbol(wunit),
},
),
}
)
# Remove the 'g' dimension
result = result.drop_dims("g")
# Reorder by ascending "w"
result = result.sortby("w")
logger.debug("aggregate_ckd_quad: end")
return result
[docs]@parse_docs
@attrs.define
class AggregateRadiosity(PipelineStep):
"""
Aggregate flux density field.
"""
sector_radiosity_var: str = documented(
attrs.field(
default="sector_radiosity", validator=attrs.validators.instance_of(str)
),
doc="Name of the variable containing radiosity values for the "
"hemisphere sector corresponding to each film pixel. This quantity is "
"expressed in flux units (typically W/m²) and, when summed over the "
"entire film, aggregates into a radiosity.",
type="str",
default='"sector_radiosity"',
)
radiosity_var: str = documented(
attrs.field(default="radiosity", validator=attrs.validators.instance_of(str)),
doc="Name of the variable storing the computed radiosity "
"(leaving flux) value.",
type="str",
default='"radiosity"',
)
[docs] def transform(self, x: t.Any) -> t.Any:
result = x.copy(deep=False)
result[self.radiosity_var] = result[self.sector_radiosity_var].sum(
dim=("x_index", "y_index")
)
result[self.radiosity_var].attrs = {
"standard_name": "toa_outgoing_flux_density_per_unit_wavelength",
"long_name": "top-of-atmosphere outgoing spectral flux density",
"units": symbol(uck.get("irradiance")),
}
return result