"""
Data loading, conversion and output components.
"""
from __future__ import annotations
import warnings
from typing import Literal
import numpy as np
import pint
import pinttrs
import xarray as xr
from ._file_resolver import fresolver
from ..typing import PathLike
from ..units import unit_context_config as ucc
from ..units import unit_registry as ureg
def _convert_units(value: str):
if value == "per cent":
return "percent"
return value
def _get_units(ds, var, fallback_units=None):
if "units" in ds[var].attrs:
return ureg.Unit(_convert_units(ds[var].attrs["units"]))
elif fallback_units is not None and var in fallback_units:
return ureg.Unit(_convert_units(fallback_units[var]))
else:
raise ValueError(
"load_aerosol_libradtran(): The input dataset specifies no units "
f"for variable '{var}'; this can be addressed by passing them through "
"the 'fallback_units' parameter."
)
[docs]
def load_aerosol_libradtran(
data: PathLike | xr.Dataset,
particle_shape: Literal["spherical", "spheroidal"] | None = None,
tolerance: dict[str, pint.Quantity | float] | None = None,
wbounds: tuple = (None, None),
fallback_units: dict[str, str] | None = None,
**kwargs,
) -> xr.Dataset:
"""
Convert a libRadtran NetCDF aerosol file to the Eradiate aerosol data format.
Both effective radius and humidity-indexed data are supported.
Parameters
----------
data : Dataset or path-like
A libRadtran NetCDF aerosol dataset. If a path is passed, it will be
resolved by the file resolver and tentatively loaded into memory.
particle_shape : {"spherical", "spheroidal"}, optional
The expected shape of the particle (this will tell the phase matrix
coefficients it should expect). If unset, the shape is inferred from the
input dataset.
reff, hum : float or quantity
For datasets with a humidity or effective radius dimension, the
coordinate point to select. By default, the nearest data point is
selected; tolerance can be adjusted using the ``tolerance`` parameter.
Quantities are accepted (see Notes for expected dimensions and
defaults). These parameters are optional if the dataset has only one
point on these dimensions.
wbounds : tuple, optional
Bounds to restrict the spectral domain where conversion is performed.
This parameter accepts a tuple with the minimum and maximum values
(use ``None`` to leave a bound open). Examples:
``(300, 3000)``, ``(None, 3000)``, ``(0.3, 3) * ureg.micron``,
``(None, 3 * ureg.micron)``.
tolerance : dict
A mapping that allows to specify a tolerance for nearest neighbour
lookup for relevant parameters. Units are applied with the same rules as
for the ``reff`` and ``hum`` parameters.
fallback_units : dict, optional
A mapping that specifies units to apply to variables that are missing them.
Returns
-------
Dataset
Notes
-----
.. list-table::
:widths: 1 2 2
:header-rows: 1
* - Parameter
- Dimension
- Default units
* - ``reff``
- Length
- μm
* - ``hum``
- Dimensionless (fraction)
- percent
* - ``wbounds``
- Wavelength
- ``ucc.get("wavelength")`` (usually, nm)
* The current implementation resamples the angular dimension at the
highest resolution to minimize the loss of information on phase matrix
coefficients.
* All conversion is done in memory: very large dataset might result in
massive converted data. In such case, an easy way to split the conversion
is to chunk it on the spectral dimension.
"""
VARS_TO_DIMS = {"wavelen": "nlam", "reff": "nreff", "hum": "nhum"}
KWARG_TO_DEFAULT_UNITS = {
"w": ucc.get("wavelength"),
"hum": ureg.Unit("percent"),
"reff": ureg.Unit("micrometer"),
}
# Load aerosol component dataset
if not isinstance(data, xr.Dataset):
data = fresolver.load_dataset(data)
# Extract required variables
vars = ["phase", "ext", "ssa", "theta", "wavelen"]
for var in ["reff", "hum"]:
if var in data:
vars.append(var)
data = data[vars]
# Select on humidity and effective radius
if tolerance is None:
tolerance = {}
for kwarg in ["hum", "reff"]:
var = kwarg
if var not in data:
continue
units = _get_units(data, var, fallback_units)
da = data[var]
default_units = KWARG_TO_DEFAULT_UNITS[kwarg]
if len(da) > 1:
if kwarg not in kwargs:
raise TypeError(
f"load_aerosol_libradtran() is missing keyword argument '{kwarg}' "
f"(allowed: {da.values})"
)
else:
kwarg_value = pinttrs.converters.ensure_units(
np.atleast_1d(kwargs.pop(kwarg)), default_units=default_units
).to(units)
else:
if kwarg not in kwargs:
kwarg_value = da.values * units
else:
kwarg_value = pinttrs.converters.ensure_units(
np.atleast_1d(kwargs.pop(kwarg)), default_units=default_units
).to(units)
sel_kwargs = {var: kwarg_value.m, "method": "nearest"}
if kwarg in tolerance:
sel_kwargs["tolerance"] = tolerance[kwarg]
data = data.swap_dims({VARS_TO_DIMS[var]: var})
data = data.sel(**sel_kwargs)
data = data.squeeze(var, drop=True)
if kwargs:
warnings.warn(
"load_aerosol_libradtran() got unexpected keyword arguments "
f"{list(kwargs.keys())}, which were not used"
)
# Filter wavelengths if requested
wmin, wmax = wbounds
units = _get_units(data, "wavelen", fallback_units)
default_units = KWARG_TO_DEFAULT_UNITS["w"]
if wmin is not None:
wmin = pinttrs.converters.ensure_units(wmin, default_units=default_units).m_as(
units
)
data = data.where(data["wavelen"] >= wmin).dropna("nlam", how="all")
if wmax is not None:
wmax = pinttrs.converters.ensure_units(wmax, default_units=default_units).m_as(
units
)
data = data.where(data["wavelen"] <= wmax).dropna("nlam", how="all")
wavelength = data["wavelen"].values * units
# Phase function
if particle_shape is None:
if len(data["nphamat"] == 4):
particle_shape = "spherical"
elif len(data["nphamat"] == 6):
particle_shape = "spheroidal"
else:
raise ValueError("Could not detect particle shape type")
if particle_shape == "spherical":
ij_to_nphamat = {
(0, 0): 0,
(1, 1): 0,
(0, 1): 1,
(1, 0): 1,
(2, 2): 2,
(3, 3): 2,
(2, 3): 3,
(3, 2): 3,
}
elif particle_shape == "spheroidal":
ij_to_nphamat = {
(0, 0): 0,
(0, 1): 1,
(1, 0): 1,
(1, 1): 4,
(2, 2): 2,
(2, 3): 3,
(3, 2): 3,
(3, 3): 5,
}
else:
raise NotImplementedError(f"Unknown particle shape '{particle_shape}'")
# -- Create angular grid (highest resolution possible)
mus = np.cos(np.deg2rad(data["theta"].values.ravel()))
mus = mus[~np.isnan(mus)]
mus = np.unique(mus)
# -- Resample all phase matrix components and fill data array with shape
# [wavelength, theta, i, j]
n_wavelength = len(wavelength)
n_theta = len(mus)
phase_np = np.zeros((n_wavelength, n_theta, 4, 4))
for i_wavelength in range(n_wavelength):
for (i, j), nphamat in ij_to_nphamat.items():
data_selected = data.isel(nlam=i_wavelength, nphamat=nphamat).dropna(
"nthetamax"
)
x = mus
xp = np.cos(np.deg2rad(data_selected["theta"].values)).ravel()
fp = data_selected["phase"].values
p = np.interp(x, xp, fp)
phase_np[i_wavelength, :, i, j] = p
# Populate Eradiate dataset with correct format
phase_eradiate = xr.Dataset(
data_vars={
"sigma_t": (["w"], data["ext"].values, {"units": "1/km"}),
"albedo": (["w"], data["ssa"].values, {"units": ""}),
"phase": (["w", "mu", "i", "j"], phase_np),
},
coords={
"w": ("w", wavelength.m_as("nm"), {"units": "nm"}),
"mu": ("mu", mus),
"i": ("i", range(4)),
"j": ("j", range(4)),
},
)
return phase_eradiate