"""
Post-processing pipeline builder definitions.
This module contains components to imperatively assemble a
:class:`.Pipeline` from a configuration dictionary.
"""
from __future__ import annotations
from . import logic
from .engine import Pipeline
from .._mode import modes
_MODE_IDS_CKD = set(modes(lambda x: x.is_ckd))
_FINAL_DATA = {"final": True, "kind": "data"}
_FINAL_COORD = {"final": True, "kind": "coord"}
[docs]
def build_pipeline(config: dict) -> Pipeline:
"""
Build a post-processing pipeline from a configuration dictionary.
Parameters
----------
config : dict
Pipeline configuration dictionary. Expected keys:
``mode_id`` : str
Eradiate mode identifier.
``measure_distant`` : bool
Whether the measure is a distant measure.
``add_viewing_angles`` : bool
Whether to compute viewing angles.
``var_name`` : str
Name of the processed physical variable.
``var_metadata`` : dict
Metadata to attach to the variable data array.
``apply_spectral_response`` : bool
Whether to apply SRF weighting.
``calculate_variance`` : bool
Whether to compute variance.
``calculate_stokes`` : bool
Whether to compute the full Stokes vector.
Returns
-------
.Pipeline
A configured pipeline ready for execution.
"""
pipeline = Pipeline()
mode_id = config["mode_id"]
measure_distant = config["measure_distant"]
add_viewing_angles = config["add_viewing_angles"]
var_name = config["var_name"]
apply_srf = config["apply_spectral_response"]
calc_var = config["calculate_variance"]
calc_stokes = config["calculate_stokes"]
is_ckd = mode_id in _MODE_IDS_CKD
# ------------------------------------------------------------------
# viewing_angles node (optional)
# When False, 'viewing_angles' is a virtual input set to None.
# ------------------------------------------------------------------
if add_viewing_angles:
pipeline.add_node(
"viewing_angles",
func=lambda angles: logic.viewing_angles(angles),
dependencies=["angles"],
description="Compute viewing angles dataset",
metadata=_FINAL_COORD,
)
# ------------------------------------------------------------------
# spectral_response (optional)
# ------------------------------------------------------------------
if apply_srf:
pipeline.add_node(
"spectral_response",
func=lambda srf: logic.spectral_response(srf),
dependencies=["srf"],
description="Evaluate spectral response function",
)
# ------------------------------------------------------------------
# extract_irradiance — expands into irradiance + solar_angles
# ------------------------------------------------------------------
pipeline.add_node(
"_extract_irradiance",
func=lambda mode_id, illumination, spectral_grid: logic.extract_irradiance(
mode_id, illumination, spectral_grid
),
dependencies=["mode_id", "illumination", "spectral_grid"],
description="Extract irradiance and solar angles",
outputs={"irradiance": "irradiance", "solar_angles": "solar_angles"},
)
pipeline.get_node("irradiance").metadata.update(_FINAL_DATA)
pipeline.get_node("solar_angles").metadata.update(_FINAL_COORD)
# ------------------------------------------------------------------
# gather_bitmaps — expands into spp, weights_raw, <var>_raw [+ m2_raw]
# viewing_angles is either a real node (add_viewing_angles=True) or a
# virtual input set to None via inputs dict.
# ------------------------------------------------------------------
gather_outputs = ["spp", "weights_raw", f"{var_name}_raw"]
if calc_var:
gather_outputs.append(f"{var_name}_m2_raw")
def _gather_bitmaps_func(
mode_id,
var_name,
var_metadata,
calculate_variance,
calculate_stokes,
bitmaps,
solar_angles,
viewing_angles,
):
return logic.gather_bitmaps(
mode_id,
var_name,
var_metadata,
calculate_variance,
calculate_stokes,
bitmaps,
viewing_angles,
solar_angles,
)
pipeline.add_node(
"_gather_bitmaps",
func=_gather_bitmaps_func,
dependencies=[
"mode_id",
"var_name",
"var_metadata",
"calculate_variance",
"calculate_stokes",
"bitmaps",
"solar_angles",
"viewing_angles",
],
description="Gather raw bitmaps into xarray arrays",
outputs=gather_outputs,
)
# ------------------------------------------------------------------
# moment2_to_variance → <var>_var_raw (optional)
# ------------------------------------------------------------------
if calc_var:
_vn = var_name # capture for closure
def _m2_to_var(**kwargs):
return logic.moment2_to_variance(
kwargs[f"{_vn}_raw"],
kwargs[f"{_vn}_m2_raw"],
kwargs["spp"],
kwargs["calculate_stokes"],
)
pipeline.add_node(
f"{var_name}_var_raw",
func=_m2_to_var,
dependencies=[
f"{var_name}_raw",
f"{var_name}_m2_raw",
"spp",
"calculate_stokes",
],
description="Compute variance from 2nd moment",
)
# ------------------------------------------------------------------
# aggregate_ckd_quad → <var> (always — no-op in mono)
# ------------------------------------------------------------------
_vn = var_name # capture for closure
def _aggregate_main(**kwargs):
return logic.aggregate_ckd_quad(
kwargs["mode_id"],
kwargs[f"{_vn}_raw"],
kwargs["spectral_grid"],
kwargs["ckd_quads"],
False,
)
pipeline.add_node(
var_name,
func=_aggregate_main,
dependencies=["mode_id", f"{var_name}_raw", "spectral_grid", "ckd_quads"],
description=f"Aggregate CKD quadrature → {var_name}",
metadata=_FINAL_DATA,
)
if calc_var:
def _aggregate_var(**kwargs):
return logic.aggregate_ckd_quad(
kwargs["mode_id"],
kwargs[f"{_vn}_var_raw"],
kwargs["spectral_grid"],
kwargs["ckd_quads"],
True,
)
pipeline.add_node(
f"{var_name}_var",
func=_aggregate_var,
dependencies=[
"mode_id",
f"{var_name}_var_raw",
"spectral_grid",
"ckd_quads",
],
description=f"Aggregate CKD quadrature → {var_name}_var",
metadata=_FINAL_DATA,
)
# ------------------------------------------------------------------
# radiosity (sector_radiosity only)
# Must be added before radiosity_srf which depends on it.
# ------------------------------------------------------------------
if var_name == "sector_radiosity":
pipeline.add_node(
"radiosity",
func=lambda sector_radiosity: logic.radiosity(sector_radiosity),
dependencies=["sector_radiosity"],
description="Aggregate sector radiosity",
metadata=_FINAL_DATA,
)
# ------------------------------------------------------------------
# SRF nodes (CKD + apply_srf only)
# ------------------------------------------------------------------
if is_ckd and apply_srf:
def _make_srf_node(src_name):
def _srf_func(**kwargs):
return logic.apply_spectral_response(kwargs[src_name], kwargs["srf"])
return _srf_func
pipeline.add_node(
f"{var_name}_srf",
func=_make_srf_node(var_name),
dependencies=[var_name, "srf"],
description=f"Apply SRF → {var_name}_srf",
metadata=_FINAL_DATA,
)
if var_name == "sector_radiosity":
pipeline.add_node(
"radiosity_srf",
func=_make_srf_node("radiosity"),
dependencies=["radiosity", "srf"],
description="Apply SRF → radiosity_srf",
metadata=_FINAL_DATA,
)
if measure_distant:
pipeline.add_node(
"irradiance_srf",
func=_make_srf_node("irradiance"),
dependencies=["irradiance", "srf"],
description="Apply SRF → irradiance_srf",
metadata=_FINAL_DATA,
)
# ------------------------------------------------------------------
# albedo (sector_radiosity + distant)
# ------------------------------------------------------------------
if var_name == "sector_radiosity" and measure_distant:
pipeline.add_node(
"albedo",
func=lambda radiosity, irradiance: logic.compute_albedo(
radiosity, irradiance
),
dependencies=["radiosity", "irradiance"],
description="Compute surface albedo",
metadata=_FINAL_DATA,
)
if is_ckd and apply_srf:
pipeline.add_node(
"albedo_srf",
func=lambda radiosity_srf, irradiance_srf: logic.compute_albedo(
radiosity_srf, irradiance_srf
),
dependencies=["radiosity_srf", "irradiance_srf"],
description="Compute surface albedo (SRF-weighted)",
metadata=_FINAL_DATA,
)
# ------------------------------------------------------------------
# bidirectional_reflectance → brdf + brf (radiance + distant)
# ------------------------------------------------------------------
if var_name == "radiance" and measure_distant:
pipeline.add_node(
"_brdf_brf",
func=lambda radiance, irradiance, calculate_stokes: (
logic.compute_bidirectional_reflectance(
radiance, irradiance, calculate_stokes
)
),
dependencies=["radiance", "irradiance", "calculate_stokes"],
description="Compute BRDF and BRF",
outputs=["brdf", "brf"],
)
pipeline.get_node("brdf").metadata.update(_FINAL_DATA)
pipeline.get_node("brf").metadata.update(_FINAL_DATA)
if is_ckd and apply_srf:
def _brdf_brf_srf(radiance_srf, irradiance_srf, calculate_stokes):
result = logic.compute_bidirectional_reflectance(
radiance_srf, irradiance_srf, calculate_stokes
)
return {"brdf_srf": result["brdf"], "brf_srf": result["brf"]}
pipeline.add_node(
"_brdf_brf_srf",
func=_brdf_brf_srf,
dependencies=["radiance_srf", "irradiance_srf", "calculate_stokes"],
description="Compute BRDF and BRF (SRF-weighted)",
outputs=["brdf_srf", "brf_srf"],
)
pipeline.get_node("brdf_srf").metadata.update(_FINAL_DATA)
pipeline.get_node("brf_srf").metadata.update(_FINAL_DATA)
# ------------------------------------------------------------------
# dlp (Stokes only)
# ------------------------------------------------------------------
if calc_stokes:
pipeline.add_node(
"dlp",
func=lambda radiance: logic.degree_of_linear_polarization(radiance),
dependencies=["radiance"],
description="Compute degree of linear polarization",
metadata=_FINAL_DATA,
)
if apply_srf:
pipeline.add_node(
"dlp_srf",
func=lambda radiance_srf: logic.degree_of_linear_polarization(
radiance_srf
),
dependencies=["radiance_srf"],
description="Compute DLP (SRF-weighted)",
metadata=_FINAL_DATA,
)
return pipeline