Source code for eradiate.data.io

"""
Data loading, conversion and output components.
"""

from __future__ import annotations

import warnings
from typing import Literal

import numpy as np
import pint
import pinttrs
import xarray as xr

from ..data import fresolver
from ..typing import PathLike
from ..units import unit_context_config as ucc
from ..units import unit_registry as ureg


def _convert_units(value: str):
    if value == "per cent":
        return "percent"
    return value


def _get_units(ds, var, fallback_units=None):
    if "units" in ds[var].attrs:
        return ureg.Unit(_convert_units(ds[var].attrs["units"]))
    elif fallback_units is not None and var in fallback_units:
        return ureg.Unit(_convert_units(fallback_units[var]))
    else:
        raise ValueError(
            "load_aerosol_libradtran(): The input dataset specifies no units "
            f"for variable '{var}'; this can be addressed by passing them through "
            "the 'fallback_units' parameter."
        )


[docs] def load_aerosol_libradtran( data: PathLike | xr.Dataset, particle_shape: Literal["spherical", "spheroidal"] | None = None, tolerance: dict[str, pint.Quantity | float] | None = None, wbounds: tuple = (None, None), fallback_units: dict[str, str] | None = None, **kwargs, ) -> xr.Dataset: """ Convert a libRadtran NetCDF aerosol file to the Eradiate aerosol data format. Both effective radius and humidity-indexed data are supported. Parameters ---------- data : Dataset or path-like A libRadtran NetCDF aerosol dataset. If a path is passed, it will be resolved by the file resolver and tentatively loaded into memory. particle_shape : {"spherical", "spheroidal"}, optional The expected shape of the particle (this will tell the phase matrix coefficients it should expect). If unset, the shape is inferred from the input dataset. reff, hum : float or quantity For datasets with a humidity or effective radius dimension, the coordinate point to select. By default, the nearest data point is selected; tolerance can be adjusted using the ``tolerance`` parameter. Quantities are accepted (see Notes for expected dimensions and defaults). These parameters are optional if the dataset has only one point on these dimensions. wbounds : tuple, optional Bounds to restrict the spectral domain where conversion is performed. This parameter accepts a tuple with the minimum and maximum values (use ``None`` to leave a bound open). Examples: ``(300, 3000)``, ``(None, 3000)``, ``(0.3, 3) * ureg.micron``, ``(None, 3 * ureg.micron)``. tolerance : dict A mapping that allows to specify a tolerance for nearest neighbour lookup for relevant parameters. Units are applied with the same rules as for the ``reff`` and ``hum`` parameters. fallback_units : dict, optional A mapping that specifies units to apply to variables that are missing them. Returns ------- Dataset Notes ----- .. list-table:: :widths: 1 2 2 :header-rows: 1 * - Parameter - Dimension - Default units * - ``reff`` - Length - μm * - ``hum`` - Dimensionless (fraction) - percent * - ``wbounds`` - Wavelength - ``ucc.get("wavelength")`` (usually, nm) * The current implementation resamples the angular dimension at the highest resolution to minimize the loss of information on phase matrix coefficients. * All conversion is done in memory: very large dataset might result in massive converted data. In such case, an easy way to split the conversion is to chunk it on the spectral dimension. """ VARS_TO_DIMS = {"wavelen": "nlam", "reff": "nreff", "hum": "nhum"} KWARG_TO_DEFAULT_UNITS = { "w": ucc.get("wavelength"), "hum": ureg.Unit("percent"), "reff": ureg.Unit("micrometer"), } # Load aerosol component dataset if not isinstance(data, xr.Dataset): data = fresolver.load_dataset(data) # Extract required variables vars = ["phase", "ext", "ssa", "theta", "wavelen"] for var in ["reff", "hum"]: if var in data: vars.append(var) data = data[vars] # Select on humidity and effective radius if tolerance is None: tolerance = {} for kwarg in ["hum", "reff"]: var = kwarg if var not in data: continue units = _get_units(data, var, fallback_units) da = data[var] default_units = KWARG_TO_DEFAULT_UNITS[kwarg] if len(da) > 1: if kwarg not in kwargs: raise TypeError( f"load_aerosol_libradtran() is missing keyword argument '{kwarg}' " f"(allowed: {da.values})" ) else: kwarg_value = pinttrs.converters.ensure_units( np.atleast_1d(kwargs.pop(kwarg)), default_units=default_units ).to(units) else: if kwarg not in kwargs: kwarg_value = da.values * units else: kwarg_value = pinttrs.converters.ensure_units( np.atleast_1d(kwargs.pop(kwarg)), default_units=default_units ).to(units) sel_kwargs = {var: kwarg_value.m, "method": "nearest"} if kwarg in tolerance: sel_kwargs["tolerance"] = tolerance[kwarg] data = data.swap_dims({VARS_TO_DIMS[var]: var}) data = data.sel(**sel_kwargs) data = data.squeeze(var, drop=True) if kwargs: warnings.warn( "load_aerosol_libradtran() got unexpected keyword arguments " f"{list(kwargs.keys())}, which were not used" ) # Filter wavelengths if requested wmin, wmax = wbounds units = _get_units(data, "wavelen", fallback_units) default_units = KWARG_TO_DEFAULT_UNITS["w"] if wmin is not None: wmin = pinttrs.converters.ensure_units(wmin, default_units=default_units).m_as( units ) data = data.where(data["wavelen"] >= wmin).dropna("nlam", how="all") if wmax is not None: wmax = pinttrs.converters.ensure_units(wmax, default_units=default_units).m_as( units ) data = data.where(data["wavelen"] <= wmax).dropna("nlam", how="all") wavelength = data["wavelen"].values * units # Phase function if particle_shape is None: if len(data["nphamat"] == 4): particle_shape = "spherical" elif len(data["nphamat"] == 6): particle_shape = "spheroidal" else: raise ValueError("Could not detect particle shape type") if particle_shape == "spherical": ij_to_nphamat = { (0, 0): 0, (1, 1): 0, (0, 1): 1, (1, 0): 1, (2, 2): 2, (3, 3): 2, (2, 3): 3, (3, 2): 3, } elif particle_shape == "spheroidal": ij_to_nphamat = { (0, 0): 0, (0, 1): 1, (1, 0): 1, (1, 1): 4, (2, 2): 2, (2, 3): 3, (3, 2): 3, (3, 3): 5, } else: raise NotImplementedError(f"Unknown particle shape '{particle_shape}'") # -- Create angular grid (highest resolution possible) mus = np.cos(np.deg2rad(data["theta"].values.ravel())) mus = mus[~np.isnan(mus)] mus = np.unique(mus) # -- Resample all phase matrix components and fill data array with shape # [wavelength, theta, i, j] n_wavelength = len(wavelength) n_theta = len(mus) phase_np = np.zeros((n_wavelength, n_theta, 4, 4)) for i_wavelength in range(n_wavelength): for (i, j), nphamat in ij_to_nphamat.items(): data_selected = data.isel(nlam=i_wavelength, nphamat=nphamat).dropna( "nthetamax" ) x = mus xp = np.cos(np.deg2rad(data_selected["theta"].values)).ravel() fp = data_selected["phase"].values p = np.interp(x, xp, fp) phase_np[i_wavelength, :, i, j] = p # Populate Eradiate dataset with correct format phase_eradiate = xr.Dataset( data_vars={ "sigma_t": (["w"], data["ext"].values, {"units": "1/km"}), "albedo": (["w"], data["ssa"].values, {"units": ""}), "phase": (["w", "mu", "i", "j"], phase_np), }, coords={ "w": ("w", wavelength.m_as("nm"), {"units": "nm"}), "mu": ("mu", mus), "i": ("i", range(4)), "j": ("j", range(4)), }, ) return phase_eradiate