Source code for eradiate.data.io

"""
Data loading, conversion and output components.
"""

from __future__ import annotations

import warnings
from typing import Literal

import numpy as np
import pint
import pinttrs
import xarray as xr

from ._file_resolver import fresolver
from ..typing import PathLike
from ..units import unit_context_config as ucc
from ..units import unit_registry as ureg


def _convert_units(value: str):
    if value == "per cent":
        return "percent"
    return value


def _get_units(ds, var, fallback_units=None):
    if "units" in ds[var].attrs:
        return ureg.Unit(_convert_units(ds[var].attrs["units"]))
    elif fallback_units is not None and var in fallback_units:
        return ureg.Unit(_convert_units(fallback_units[var]))
    else:
        raise ValueError(
            "load_aerosol_libradtran(): The input dataset specifies no units "
            f"for variable '{var}'; this can be addressed by passing them through "
            "the 'fallback_units' parameter."
        )


[docs] def load_aerosol_libradtran( data: PathLike | xr.Dataset, particle_shape: Literal["spherical", "spheroidal"] | None = None, tolerance: dict[str, pint.Quantity | float] | None = None, wbounds: tuple = (None, None), fallback_units: dict[str, str] | None = None, **kwargs, ) -> xr.Dataset: """ Convert a libRadtran NetCDF aerosol file to the Eradiate aerosol data format. Both effective radius and humidity-indexed data are supported. Parameters ---------- data : Dataset or path-like A libRadtran NetCDF aerosol dataset. If a path is passed, it will be resolved by the file resolver and tentatively loaded into memory. particle_shape : {"spherical", "spheroidal"}, optional The expected shape of the particle (this will tell the phase matrix coefficients it should expect). If unset, the shape is inferred from the input dataset. reff, hum : float or quantity For datasets with a humidity or effective radius dimension, the coordinate point to select. By default, the nearest data point is selected; tolerance can be adjusted using the ``tolerance`` parameter. Quantities are accepted (see Notes for expected dimensions and defaults). These parameters are optional if the dataset has only one point on these dimensions. wbounds : tuple, optional Bounds to restrict the spectral domain where conversion is performed. This parameter accepts a tuple with the minimum and maximum values (use ``None`` to leave a bound open). Examples: ``(300, 3000)``, ``(None, 3000)``, ``(0.3, 3) * ureg.micron``, ``(None, 3 * ureg.micron)``. tolerance : dict A mapping that allows to specify a tolerance for nearest neighbour lookup for relevant parameters. Units are applied with the same rules as for the ``reff`` and ``hum`` parameters. fallback_units : dict, optional A mapping that specifies units to apply to variables that are missing them. Returns ------- Dataset Notes ----- .. list-table:: :widths: 1 2 2 :header-rows: 1 * - Parameter - Dimension - Default units * - ``reff`` - Length - μm * - ``hum`` - Dimensionless (fraction) - percent * - ``wbounds`` - Wavelength - ``ucc.get("wavelength")`` (usually, nm) * The current implementation resamples the angular dimension at the highest resolution to minimize the loss of information on phase matrix coefficients. * All conversion is done in memory: very large dataset might result in massive converted data. In such case, an easy way to split the conversion is to chunk it on the spectral dimension. """ VARS_TO_DIMS = {"wavelen": "nlam", "reff": "nreff", "hum": "nhum"} KWARG_TO_DEFAULT_UNITS = { "w": ucc.get("wavelength"), "hum": ureg.Unit("percent"), "reff": ureg.Unit("micrometer"), } # Load aerosol component dataset if not isinstance(data, xr.Dataset): data = fresolver.load_dataset(data) # Extract required variables vars = ["phase", "ext", "ssa", "theta", "wavelen"] for var in ["reff", "hum"]: if var in data: vars.append(var) data = data[vars] # Select on humidity and effective radius if tolerance is None: tolerance = {} for kwarg in ["hum", "reff"]: var = kwarg if var not in data: continue units = _get_units(data, var, fallback_units) da = data[var] default_units = KWARG_TO_DEFAULT_UNITS[kwarg] if len(da) > 1: if kwarg not in kwargs: raise TypeError( f"load_aerosol_libradtran() is missing keyword argument '{kwarg}' " f"(allowed: {da.values})" ) else: kwarg_value = pinttrs.converters.ensure_units( np.atleast_1d(kwargs.pop(kwarg)), default_units=default_units ).to(units) else: if kwarg not in kwargs: kwarg_value = da.values * units else: kwarg_value = pinttrs.converters.ensure_units( np.atleast_1d(kwargs.pop(kwarg)), default_units=default_units ).to(units) sel_kwargs = {var: kwarg_value.m, "method": "nearest"} if kwarg in tolerance: sel_kwargs["tolerance"] = tolerance[kwarg] data = data.swap_dims({VARS_TO_DIMS[var]: var}) data = data.sel(**sel_kwargs) data = data.squeeze(var, drop=True) if kwargs: warnings.warn( "load_aerosol_libradtran() got unexpected keyword arguments " f"{list(kwargs.keys())}, which were not used" ) # Filter wavelengths if requested wmin, wmax = wbounds units = _get_units(data, "wavelen", fallback_units) default_units = KWARG_TO_DEFAULT_UNITS["w"] if wmin is not None: wmin = pinttrs.converters.ensure_units(wmin, default_units=default_units).m_as( units ) data = data.where(data["wavelen"] >= wmin).dropna("nlam", how="all") if wmax is not None: wmax = pinttrs.converters.ensure_units(wmax, default_units=default_units).m_as( units ) data = data.where(data["wavelen"] <= wmax).dropna("nlam", how="all") wavelength = data["wavelen"].values * units # Phase function if particle_shape is None: if len(data["nphamat"] == 4): particle_shape = "spherical" elif len(data["nphamat"] == 6): particle_shape = "spheroidal" else: raise ValueError("Could not detect particle shape type") if particle_shape == "spherical": ij_to_nphamat = { (0, 0): 0, (1, 1): 0, (0, 1): 1, (1, 0): 1, (2, 2): 2, (3, 3): 2, (2, 3): 3, (3, 2): 3, } elif particle_shape == "spheroidal": ij_to_nphamat = { (0, 0): 0, (0, 1): 1, (1, 0): 1, (1, 1): 4, (2, 2): 2, (2, 3): 3, (3, 2): 3, (3, 3): 5, } else: raise NotImplementedError(f"Unknown particle shape '{particle_shape}'") # -- Create angular grid (highest resolution possible) mus = np.cos(np.deg2rad(data["theta"].values.ravel())) mus = mus[~np.isnan(mus)] mus = np.unique(mus) # -- Resample all phase matrix components and fill data array with shape # [wavelength, theta, i, j] n_wavelength = len(wavelength) n_theta = len(mus) phase_np = np.zeros((n_wavelength, n_theta, 4, 4)) for i_wavelength in range(n_wavelength): for (i, j), nphamat in ij_to_nphamat.items(): data_selected = data.isel(nlam=i_wavelength, nphamat=nphamat).dropna( "nthetamax" ) x = mus xp = np.cos(np.deg2rad(data_selected["theta"].values)).ravel() fp = data_selected["phase"].values p = np.interp(x, xp, fp) phase_np[i_wavelength, :, i, j] = p # Populate Eradiate dataset with correct format phase_eradiate = xr.Dataset( data_vars={ "sigma_t": (["w"], data["ext"].values, {"units": "1/km"}), "albedo": (["w"], data["ssa"].values, {"units": ""}), "phase": (["w", "mu", "i", "j"], phase_np), }, coords={ "w": ("w", wavelength.m_as("nm"), {"units": "nm"}), "mu": ("mu", mus), "i": ("i", range(4)), "j": ("j", range(4)), }, ) return phase_eradiate