Source code for eradiate.radprops.absorption

"""
Functions to compute atmospheric absorption properties.
"""
from __future__ import annotations

import logging
import warnings

import numpy as np
import pint
import xarray as xr

import eradiate

from ..exceptions import (
    InterpolationError,
    MissingCoordinateError,
    OutOfBoundsCoordinateError,
    ScalarCoordinateError,
    UnsupportedModeError,
)
from ..units import to_quantity
from ..units import unit_context_config as ucc
from ..units import unit_registry as ureg

logger = logging.getLogger(__name__)

# ------------------------------------------------------------------------------
#                       Interpolation errors handling
# ------------------------------------------------------------------------------

DEFAULT_X_ERROR_HANDLER_CONFIG = {
    "missing": "ignore",
    "scalar": "ignore",
    "bounds": "raise",
}

DEFAULT_P_ERROR_HANDLER_CONFIG = {
    "missing": "raise",
    "scalar": "raise",
    "bounds": "ignore",  # atmospheric pressure easily goes very low, but at
    # these altitude, the air is very thin and so is the
    # absorption coefficient
}

DEFAULT_T_ERROR_HANDLER_CONFIG = {
    "missing": "raise",
    "scalar": "raise",
    "bounds": "ignore",  # atmospheric temperature climbs fast above 80-100 km,
    # but at these altitude, the air is very thin and so is
    # the absorption coefficient
}

#: Default interpolation error handling configuration
DEFAULT_HANDLER_CONFIG = {
    "x": DEFAULT_X_ERROR_HANDLER_CONFIG,
    "p": DEFAULT_P_ERROR_HANDLER_CONFIG,
    "t": DEFAULT_T_ERROR_HANDLER_CONFIG,
}

ERROR_TYPE_TO_STR = {
    MissingCoordinateError: "missing",
    ScalarCoordinateError: "scalar",
    OutOfBoundsCoordinateError: "bounds",
}


[docs] def handle_error(error: InterpolationError, config): action = config[ERROR_TYPE_TO_STR[type(error)]] if action == "raise": raise error elif action == "warn": warnings.warn(str(error), UserWarning) elif action == "ignore": pass else: raise NotImplementedError(f"Action {action} not implemented.")
# ------------------------------------------------------------------------------ # Interpolation functions # ------------------------------------------------------------------------------
[docs] def xcoords(k: xr.DataArray) -> list[str]: return [c for c in k.coords if c.startswith("x_")]
[docs] def xrange(k: xr.DataArray) -> dict[str, pint.Quantity]: _xrange = {} for c in xcoords(k): if k[c].size > 1: x = to_quantity(k[c]) _xrange[c] = np.stack([x.min(), x.max()]) return _xrange
[docs] def xcoords_scalar(k: xr.DataArray) -> list[str]: return [c for c in xcoords(k) if k[c].size == 1]
[docs] def prange(k: xr.DataArray) -> pint.Quantity: p = to_quantity(k["p"]) return np.stack([p.min(), p.max()])
[docs] def trange(k: xr.DataArray) -> pint.Quantity: t = to_quantity(k["t"]) return np.stack([t.min(), t.max()])
[docs] def check_missing_x_coords(x_ds: list[str], x_thermoprops: list[str]): if not set(x_thermoprops).issubset(set(x_ds)): msg = ( "The absorption dataset does not contain all the volume fraction " "coordinates present in the thermophysical properties dataset." ) msg += f"\nVolume fraction coordinates in absorption data set: {x_ds}" msg += ( f"\nVolume fraction coordinates in thermophysical properties data " f"set: {x_thermoprops}" ) msg += ( f"\nMissing volume fraction coordinates: " f"{list(set(x_thermoprops) - set(x_ds))}" ) raise MissingCoordinateError(msg)
[docs] def check_scalar_x_coords( x_absorption_scalar: list[str], thermoprops: xr.Dataset, rtol=1e-3, ): x_thermoprops = [f"x_{m}" for m in thermoprops.joseki.molecules] if set(x_absorption_scalar).issubset(set(x_thermoprops)): x_intersection = set(x_absorption_scalar).intersection(set(x_thermoprops)) _x_nonconstant = [] for _x in x_intersection: values = thermoprops[_x].values if not np.all(np.isclose(values, values[0], rtol=rtol)): _x_nonconstant.append(_x) if _x_nonconstant: msg = ( f"These volume fraction coordinates show more than {rtol:.2%} " f"variation in the atmosphere's thermophysical properties " f"but are scalar coordinates in the absorption dataset: " f"{_x_nonconstant}" ) raise ScalarCoordinateError(msg)
[docs] def check_x_ranges(k: xr.DataArray, thermoprops: xr.Dataset) -> None: xrange_ds = xrange(k) out_of_bounds = {} for c in xrange_ds: x_thermoprops = to_quantity(thermoprops[c]) xrange_t = np.stack([x_thermoprops.min(), x_thermoprops.max()]) if xrange_t[0] < xrange_ds[c][0] or xrange_t[1] > xrange_ds[c][1]: out_of_bounds[c] = { "thermoprops": xrange_t, "absorption": xrange_ds[c], } if out_of_bounds: msg = "The following volume fraction coordinates are out of bounds." for c in out_of_bounds: msg += f"\n{c}: range in thermoprops {out_of_bounds[c]['thermoprops']}, " msg += f"range in absorption dataset {out_of_bounds[c]['absorption']}" raise OutOfBoundsCoordinateError(msg)
[docs] def interp_x( k: xr.DataArray, thermoprops: xr.Dataset, error_handler_config: dict[str, str] | None = None, ) -> xr.DataArray: """ Interpolate absorption coefficient data along volume fraction dimensions. Parameters ---------- k : xr.DataArray Absorption coefficient data array. thermoprops : xr.Dataset Atmosphere's thermophysical properties. error_handler_config : dict[str, str], optional Configuration of the error handler, by default DEFAULT_X_ERROR_HANDLER_CONFIG Returns ------- xr.DataArray Interpolated absorption coefficient data array. """ # update missing keys in handle_config with DEFAULT_HANDLE_CONFIG if error_handler_config is None: error_handler_config = DEFAULT_X_ERROR_HANDLER_CONFIG else: error_handler_config = { **DEFAULT_X_ERROR_HANDLER_CONFIG, **error_handler_config, } x_thermoprops = [f"x_{m}" for m in thermoprops.joseki.molecules] logger.debug("x_thermoprops: %s", x_thermoprops) x_absorption = xcoords(k) x_absorption_scalar = xcoords_scalar(k) x_absorption_array = set(x_absorption) - set(x_absorption_scalar) x_absorption_interp = x_absorption_array # interpolation along these coords x_absorption_sel = x_absorption_scalar # selection along these coords logger.debug("x_absorption_interp: %s", x_absorption_interp) logger.debug("x_absorption_sel: %s", x_absorption_sel) # ------------------------------------------------------------------------- # Check coordinates for interpolation errors # ------------------------------------------------------------------------- # Unspecified coordinates (always raise) # TODO: or set this coord to zero? # a coordinate is present in 'k' but not in 'thermoprops' if not set(x_absorption).issubset(set(x_thermoprops)): msg = ( f"The absorption dataset contains array coordinates (" f"{list(set(x_absorption) - set(x_thermoprops))} that are " f"not specified in the thermophysical properties dataset." ) logger.critical(msg) raise ValueError(msg) # Missing coordinates # a coordinate is specified in thermoprops but not in 'k' try: check_missing_x_coords(x_absorption, x_thermoprops) except MissingCoordinateError as e: handle_error(e, error_handler_config) # Scalar coordinates # a coordinate is present (and varying) in thermoprops that corresponds to # a scalar (one-value) coordinate in 'k' try: check_scalar_x_coords(x_absorption_scalar, thermoprops) except ScalarCoordinateError as e: handle_error(e, error_handler_config) # Out of bounds coordinate # a coordinate value in thermoprops is out of the range of the # corresponding coordinate in 'k' try: check_x_ranges(k, thermoprops) bounds_error = True except OutOfBoundsCoordinateError as e: handle_error(e, error_handler_config) bounds_error = False # ------------------------------------------------------------------------- # Select and/or interpolate # ------------------------------------------------------------------------- # Select k_selected = k.isel(**{x: 0 for x in x_absorption_sel}, drop=True) # Interpolate fill_value = None if bounds_error else 0.0 # TODO: use 2-element tuple? k_interp = k_selected.interp( **{x: thermoprops[x] for x in x_absorption_interp}, kwargs={ "bounds_error": bounds_error, "fill_value": fill_value, }, ).drop_vars(x_absorption_interp) return k_interp
[docs] def check_p_range(k: xr.Dataset, thermoprops: xr.Dataset): p_thermoprops = to_quantity(thermoprops.p) p_thermoprops_range = np.stack([p_thermoprops.min(), p_thermoprops.max()]) p_k = to_quantity(k.p) p_k_range = np.stack([p_k.min(), p_k.max()]) if p_thermoprops.min() < p_k.min() or p_thermoprops.max() > p_k.max(): raise OutOfBoundsCoordinateError( f"The pressure range of the thermophysical properties dataset " f"({p_thermoprops_range}) is outside the pressure range of the " f"absorption dataset ({p_k_range})." )
[docs] def interp_p( k: xr.DataArray, thermoprops: xr.Dataset, error_handler_config: dict[str, str] | None = None, ) -> xr.Dataset: """ Interpolate absorption dataset along pressure dimension. Parameters ---------- k : xr.DataArray Absorption coefficient data array. thermoprops : xr.Dataset Atmosphere's thermophysical properties. error_handler_config : dict[str, str], optional Configuration of the error handler, by default DEFAULT_X_ERROR_HANDLER_CONFIG Returns ------- xr.DataArray Interpolated absorption coefficient data array. """ try: check_p_range(k, thermoprops) bounds_error = True except OutOfBoundsCoordinateError as e: if error_handler_config is None: error_handler_config = DEFAULT_P_ERROR_HANDLER_CONFIG else: error_handler_config = { **DEFAULT_P_ERROR_HANDLER_CONFIG, **error_handler_config, } handle_error(e, error_handler_config) bounds_error = False fill_value = None if bounds_error else 0.0 # TODO: use 2-element tuple? k_interp = k.interp( p=thermoprops.p, kwargs={"bounds_error": bounds_error, "fill_value": fill_value}, ).drop_vars("p") return k_interp
[docs] def check_t_range(k: xr.DataArray, thermoprops: xr.Dataset): t_thermoprops = to_quantity(thermoprops.t) t_thermoprops_range = np.stack([t_thermoprops.min(), t_thermoprops.max()]) t_ds = to_quantity(k.t) t_ds_range = np.stack([t_ds.min(), t_ds.max()]) if t_thermoprops.min() < t_ds.min() or t_thermoprops.max() > t_ds.max(): raise OutOfBoundsCoordinateError( f"The temperature range of the thermophysical properties dataset " f"({t_thermoprops_range}) is outside the temperature range of the " f"absorption dataset ({t_ds_range})." )
[docs] def interp_t( k: xr.DataArray, thermoprops: xr.Dataset, error_handler_config: dict[str, str] | None = None, ) -> xr.Dataset: """ Interpolate absorption coefficient data set along temperature dimension. Parameters ---------- k : xr.DataArray Absorption coefficient data array. thermoprops : xr.Dataset Atmosphere's thermophysical properties. error_handler_config : dict[str, str], optional Configuration of the error handler, by default DEFAULT_X_ERROR_HANDLER_CONFIG Returns ------- xr.DataArray Interpolated absorption coefficient data array. """ try: check_t_range(k, thermoprops) bounds_error = True except OutOfBoundsCoordinateError as e: if error_handler_config is None: error_handler_config = DEFAULT_T_ERROR_HANDLER_CONFIG else: error_handler_config = { **DEFAULT_T_ERROR_HANDLER_CONFIG, **error_handler_config, } handle_error(e, error_handler_config) bounds_error = False fill_value = None if bounds_error else 0.0 # TODO: use 2-element tuple? k_interp = k.interp( t=thermoprops.t, kwargs={"bounds_error": bounds_error, "fill_value": fill_value}, ).drop_vars("t") return k_interp
# ------------------------------------------------------------------------------ # Absorption coefficient evaluation implementation in mono modes # ------------------------------------------------------------------------------
[docs] def wrange_mono(ds: xr.Dataset) -> tuple[pint.Quantity]: wds = to_quantity(ds.w) wunits = ucc.get("wavelength") if wds.check("[length]^-1"): return ((1.0 / wds.max()).to(wunits), (1.0 / wds.min()).to(wunits)) elif wds.check("[length]"): return (wds.min().to(wunits), wds.max().to(wunits)) else: raise ValueError( f"Spectral coordinate of absorption dataset has unexpected units " f"({ds.w.units})." )
[docs] def check_w_range_mono(ds: xr.Dataset, w: pint.Quantity): wmin, wmax = wrange_mono(ds=ds) if np.any((w < wmin) | (w > wmax)): msg = ( f"Requested w coordinate ({w}) is outside the range of the " f"absorption data set ({wmin} to {wmax})." ) raise OutOfBoundsCoordinateError(msg)
[docs] def interp_w(ds: xr.Dataset, w: pint.Quantity) -> xr.DataArray: """ Interpolate absorption data set to a given wavelength. Parameters ---------- ds : xr.Dataset Absorption data set. w : pint.Quantity Wavelength. Returns ------- xr.DataArray Wavelength-interpolated absorption data. """ # convert to wavenumber/wavelength wunits = ds.w.units if ureg(wunits).check("[length]^-1"): # interpolation according to wavenumber wm = (1 / w).m_as(wunits) elif ureg(wunits).check("[length]"): # interpolation according to wavelength wm = w.m_as(wunits) else: raise ValueError(f"Cannot interpret units {wunits}") # check wavelength range and interpolate check_w_range_mono(ds, w) return ds.sigma_a.interp( w=wm, method="linear", )
[docs] def eval_sigma_a_mono_impl_ds( ds: xr.Dataset, thermoprops: xr.Dataset, w: pint.Quantity, error_handler_config: dict[str, dict[str, str]] | None = None, ) -> pint.Quantity: """ Evaluate absorption coefficient in given spectral and thermophysical conditions. Parameters ---------- ds : Dataset Absorption coefficient dataset. thermoprops : Dataset Atmospheric thermophysical properties data set. w : quantity Wavelength. error_handler_config : dict Error handler configuration. Returns ------- quantity Absorption coefficient values. """ if error_handler_config is None: error_handler_config = DEFAULT_HANDLER_CONFIG kw = interp_w(ds, w) kwx = interp_x(kw, thermoprops, error_handler_config["x"]) kwxp = interp_p(kwx, thermoprops, error_handler_config["p"]) kwxpt = interp_t(kwxp, thermoprops, error_handler_config["t"]) return kwxpt
[docs] def eval_sigma_a_mono_impl( absorption_data: dict[tuple[pint.Quantity], xr.Dataset], thermoprops: xr.Dataset, w: pint.Quantity, error_handler_config: dict[str, dict[str, str]] | None = None, ) -> pint.Quantity: # NOTE: it is assumed that wavelength intervals do no intersect, namely that there # is at most one dataset relevant to compute the absorption coefficient at # a given wavelength # TODO: check that in absorption_data validator # NOTE: if the evaluation wavelengths span only one dataset, the evaluation # is vectorized. Otherwise, the evaluation is vectorized only when each # individual dataset is evaluated. if error_handler_config is None: error_handler_config = DEFAULT_HANDLER_CONFIG w = np.atleast_1d(w) wargsort = np.argsort(w) ws = w[wargsort] wargunsort = np.argsort(wargsort) # assume that w might be a large array das = [] upper_max = max([i[1] for i in absorption_data.keys()]) for interval, dataset in absorption_data.items(): # we assume that this list is not too long (<10 elements) so the # performance cost of this loop is not too high. # find the wavelength that are contained in the current wavelength # interval: if interval[1] != upper_max: w_where = ws[ (ws >= interval[0]) & (ws < interval[1]) # mind the strict inequality ] else: w_where = ws[(ws >= interval[0]) & (ws <= interval[1])] if w_where.size > 0: da = eval_sigma_a_mono_impl_ds( ds=dataset, thermoprops=thermoprops, w=w_where, error_handler_config=error_handler_config, ) das.append(da) concatenated = xr.concat(das, dim="w") return to_quantity(concatenated.isel(w=wargunsort).transpose("w", "z"))
# ------------------------------------------------------------------------------ # Absorption coefficient evaluation implementation in CKD modes # ------------------------------------------------------------------------------
[docs] def wrange_ckd(ds: xr.Dataset) -> tuple[pint.Quantity]: wbounds = to_quantity(ds.wbounds.squeeze()) wunits = ucc.get("wavelength") if wbounds.check("[length]^-1"): return tuple(np.sort((1 / wbounds.to(wunits)))) elif wbounds.check("[length]"): return tuple(np.sort(wbounds).to(wunits)) else: raise ValueError( f"Spectral coordinate of absorption dataset has unexpected units " f"({ds.w.units})." )
[docs] def check_w_range_ckd(ds: xr.Dataset, w: pint.Quantity, rtol: float = 1e-3): # This function is both for CKD absorption datasets # Check that the evaluation wavelength is amongst the absorption dataset # spectral coordinate values (to a relative tolerance of 'rtol'). wds = to_quantity(ds.w) if wds.check("[length]^-1"): w_k = (1 / wds).to(w.units) elif wds.check("[length]"): w_k = wds else: raise ValueError( f"Spectral coordinate of absorption dataset has unexpected units " f"({ds.w.units})." ) if not np.any(np.isclose(w_k, w, rtol=rtol)): msg = ( f"Requested w coordinate ({w}) is not amongst the spectral " f"coordinates of the absorption data set ({w_k})." ) raise OutOfBoundsCoordinateError(msg)
[docs] def interp_wg(ds: xr.Dataset, w: pint.Quantity, g: float) -> xr.DataArray: """ Interpolate absorption data set to given w and g coordinates. Parameters ---------- ds : xr.Dataset Absorption data set. w : pint.Quantity Wavelength corresponding to the CKD bin to select. g : float g-point to interpolate at. Returns ------- xr.DataArray Spectral-interpolated absorption data. """ # 1. bin selection wunits = ds.w.units if ureg(wunits).check("[length]^-1"): # select bin according to wavenumber wm = (1 / w).m_as(wunits) elif ureg(wunits).check("[length]"): # select bin according to wavelength wm = w.m_as(wunits) else: raise ValueError(f"Cannot interpret units {wunits}") check_w_range_ckd(ds, w, rtol=1e-3) # select the bin kw = ds.sigma_a.sel( w=wm, method="nearest", ).expand_dims("w") # 2. interpolation along g-point (pseudo-spectral coordinate) return kw.interp(g=g).drop_vars("g")
[docs] def eval_sigma_a_ckd_impl_ds( ds: xr.Dataset, thermoprops: xr.Dataset, w: pint.Quantity, g: float, error_handler_config: dict[str, dict[str, str]] | None = None, ) -> pint.Quantity: """ Evaluate absorption coefficient in given spectral and thermophysical conditions. Parameters ---------- ds : Dataset Absorption coefficient dataset. thermoprops : Dataset Atmospheric thermophysical properties data set. w : quantity Wavelength. g: float g-point. error_handler_config : dict Error handler configuration. Returns ------- quantity Absorption coefficient values. """ if error_handler_config is None: error_handler_config = DEFAULT_HANDLER_CONFIG kw = interp_wg(ds, w, g) kwx = interp_x(kw, thermoprops, error_handler_config["x"]) kwxp = interp_p(kwx, thermoprops, error_handler_config["p"]) kwxpt = interp_t(kwxp, thermoprops, error_handler_config["t"]) return kwxpt
def _isclose(a, b, **kwargs): """ Return values in b that are close to one value in a (different shapes supported). """ out = [] for value in a: isclosed = b[np.isclose(b, value, **kwargs)] if len(isclosed) > 0: out.append(isclosed) if len(out) > 0: return np.stack(out).squeeze() else: return np.empty(0)
[docs] def w_dataset(ds: xr.Dataset) -> pint.Quantity: w = to_quantity(ds.w) wunits = ucc.get("wavelength") if w.check("[length]"): return w.to(wunits) elif w.check("[length^-1]"): return (1.0 / w).to(wunits) else: raise ValueError
[docs] def eval_sigma_a_ckd_impl( absorption_data, thermoprops: xr.Dataset, w: pint.Quantity, g: float, error_handler_config: dict[str, dict[str, str]] | None = None, ) -> pint.Quantity: # NOTE: it is assumed that only one dataset corresponds to each wavelength # value in 'w' if error_handler_config is None: error_handler_config = DEFAULT_HANDLER_CONFIG w = np.atleast_1d(w) wargsort = np.argsort(w) ws = w[wargsort] wargunsort = np.argsort(wargsort) das = [] for _, dataset in absorption_data.items(): # we assume that this list is not too long (~10 elements) so the # performance cost of this loop is not too high. # find the wavelength values that match with the current dataset w_ds = w_dataset(dataset) w_where = _isclose(ws, w_ds, rtol=1e-3) if w_where.size > 0: da = eval_sigma_a_ckd_impl_ds( ds=dataset, thermoprops=thermoprops, w=w_where, g=g, error_handler_config=error_handler_config, ) das.append(da) if len(das) == 0: w_dss = [] w_wheres = [] for _, dataset in absorption_data.items(): w_ds = w_dataset(dataset) w_where = _isclose(ws, w_ds, rtol=1e-3) w_dss.append(w_ds) w_wheres.append(w_where) raise ValueError(f"{w_dss=}, {w_wheres=}") concatenated = xr.concat(das, dim="w") return to_quantity(concatenated.isel(w=wargunsort).transpose("w", "z"))
# ------------------------------------------------------------------------------ # wavelength range based on active mode # ------------------------------------------------------------------------------
[docs] def wrange(ds: xr.Dataset): if eradiate.mode().is_mono: wrange = wrange_mono(ds) elif eradiate.mode().is_ckd: wrange = wrange_ckd(ds) else: raise UnsupportedModeError return wrange