"""
Utility functions to manipulate atmosphere thermophysical properties data
sets.
"""
from __future__ import annotations
import warnings
from datetime import datetime
import numpy as np
import pint
import scipy.constants
import xarray as xr
from .. import data
from ..units import to_quantity
from ..units import unit_registry as ureg
ATOMIC_MASS_CONSTANT = ureg.Quantity(
*scipy.constants.physical_constants["atomic mass constant"][:-1]
)
[docs]def column_number_density(ds: xr.Dataset, species: str) -> pint.Quantity:
"""
Computes the column number density of a given species in an atmospheric
profile.
The column number density is computed according to the formula:
.. math::
N = \\sum_{i=0}^{L-1} n_i \\, (z_{i+1} - z_i)
where
:math:`N` denotes the column number density,
:math:`z_i` are the level altitudes and
:math:`n_i` are the number densities of that given species inside the
:math:`L` atmospheric layers.
Parameters
----------
ds : Dataset
Atmosphere thermophysical properties data set.
species : str
Species.
Returns
-------
quantity
Column number density.
"""
mr = ds.mr.sel(species=species).values
n = to_quantity(ds.n)
n_species = mr * n
z_level = to_quantity(ds.z_level)
dz = z_level[1:] - z_level[:-1]
return (n_species * dz).sum()
[docs]def column_mass_density(ds: xr.Dataset, species: str) -> pint.Quantity:
"""
Computes the column (mass) density of a given species in an atmospheric
profile.
The column mass density is computed according to the formula:
.. math::
\\sigma = N * m_u
where
:math:`\\sigma` denotes the column mass density,
:math:`N` is the column number density,
:math:`m_u` is the atomic mass constant.
Parameters
----------
ds : Dataset
Atmosphere thermophysical property data set.
species : str
Species.
Returns
-------
quantity
Column mass density.
"""
with data.open_dataset("chemistry/molecular_masses.nc") as molecular_mass:
m = molecular_mass.m.sel(s=species).values * ATOMIC_MASS_CONSTANT
return m * column_number_density(ds=ds, species=species)
[docs]def number_density_at_surface(ds: xr.Dataset, species: str) -> pint.Quantity:
"""
Compute the number density at the surface of a given species in an
atmospheric profile.
Parameters
----------
ds : Dataset
Atmosphere thermophysical properties data set.
species : str
Species.
Returns
-------
:class:`~pint.Quantity`
Number density at the surface.
"""
surface_mr = to_quantity(ds.mr.sel(species=species))[0]
surface_n = to_quantity(ds.n)[0]
return surface_mr * surface_n
[docs]def mass_density_at_surface(ds: xr.Dataset, species: str) -> pint.Quantity:
"""Compute the mass density at the surface of a given species in an
atmospheric profile.
Parameters
----------
ds : Dataset
Atmosphere thermophysical properties data set.
species : str
Species.
Returns
-------
quantity
Mass density at the surface.
"""
with data.open_dataset("chemistry/molecular_masses.nc") as molecular_mass:
m = to_quantity(molecular_mass.m.sel(s=species)) * ATOMIC_MASS_CONSTANT
return m * number_density_at_surface(ds=ds, species=species)
[docs]def volume_mixing_ratio_at_surface(ds: xr.Dataset, species: str) -> pint.Quantity:
"""Compute the volume mixing ratio at the surface of a given species in an
atmospheric profile.
Parameters
----------
ds : Dataset
Atmosphere thermophysical properties data set.
species : str
Species.
Returns
-------
quantity
Volume mixing ratio at the surface.
"""
return to_quantity(ds.mr.sel(species=species))[0]
def _scaling_factor(
initial_amount: pint.Quantity,
target_amount: pint.Quantity,
) -> float:
"""
Compute the scaling factor to reach target amount from initial amount.
Initial and target amount should be scalar and have compatible dimensions.
Parameters
----------
initial_amount: quantity
Initial amount.
target_amount: quantity
Target quantity.
Raises
------
ValueError
When both initial and target amounts are zero.
Returns
-------
float
Scaling factor.
"""
if initial_amount.m == 0.0:
if target_amount.m == 0.0:
return 0.0
else:
raise ValueError(
f"Cannot compute scaling factor when initial amount is 0.0 and "
f"target amount is non-zero (got {target_amount})"
)
else:
return (target_amount / initial_amount).m_as(ureg.dimensionless)
[docs]def compute_scaling_factors(
ds: xr.Dataset, concentration: dict[str, pint.Quantity]
) -> dict[str, float]:
r"""
Compute the scaling factors to be applied to the mixing ratio values
of each species in an atmosphere thermophysical properties data set, so
that the integrated number/mass density and/or the number/mass density at
the surface, match given values.
Parameters
----------
ds : Dataset
Atmosphere thermophysical properties data set.
concentration : dict
Mapping of species (str) and target concentration
(:class:`~pint.Quantity`).
If the target concentration has dimensions of inverse square length
(:math:`[L^{-2}]`), the value is interpreted as a column
number density for that given species and the scaling factor, :math:`f`,
is obtained by dividing that column number density,
:math:`N_{\mathrm{target}}`,
by the initial column number density,
:math:`N_{\mathrm{initial}}`:
.. math::
f = \frac{N_{\mathrm{target}}}{N_{\mathrm{initial}}}
If the target concentration has dimensions of mass times inverse square
length (:math:`[ML^{-2}]`), the value is interpreted as a column (mass)
density for that species and the scaling factor is obtained by dividing
that column mass density,
:math:`\sigma_{\mathrm{target}}`,
by the initial column mass density,
:math:`\sigma_{\mathrm{initial}}`:
.. math::
f = \frac{\sigma_{\mathrm{target}}}{\sigma_{\mathrm{initial}}}
If the target concentration has dimensions of inverse cubic length
(:math:`[L^{-3}]`), the value is interpreted as a number density at the
surface for that given species and the scaling factor is computed by
dividing that number density at the surface,
:math:`n_{\mathrm{surface, target}}`,
by the initial number density at the surface,
:math:`n_{\mathrm{surface, initial}}`:
.. math::
f = \frac{n_{\mathrm{surface, target}}}{n_{\mathrm{surface, initial}}}
If the target concentration has dimensions of inverse cubic length
(:math:`[ML^{-3}]`), the value is interpreted as a mass density at the
surface for that given species and the scaling factor is computed by
dividing that mass density at the surface,
:math:`\sigma_{\mathrm{surface, target}}`,
by the initial mass density at the surface,
:math:`\sigma_{\mathrm{surface, initial}}`:
.. math::
f = \frac{\sigma_{\mathrm{surface, target}}}{\sigma_{\mathrm{surface, initial}}}
If the target concentration is dimensionless, the value is
interpreted as a mixing ratio at the surface for that given species
and the scaling factor is computed by dividing that mixing ratio at
the surface,
:math:`x_{\mathrm{surface, target}}`,
by the initial mixing ratio at the surface,
:math:`x_{\mathrm{surface, initial}}`:
.. math::
f = \frac{x_{\mathrm{target}}}{x_{\mathrm{initial}}}
Returns
-------
dict
Mapping of species (str) and scaling factors (float).
"""
compute_initial_amount = {
"[length]^-2": column_number_density,
"[mass] * [length]^-2": column_mass_density,
"[length]^-3": number_density_at_surface,
"[mass] * [length]^-3": mass_density_at_surface,
"": volume_mixing_ratio_at_surface,
}
factors = {}
for species in concentration:
target_amount = concentration[species]
initial_amount = None
for dimensions in list(compute_initial_amount.keys()):
if target_amount.check(dimensions):
initial_amount = compute_initial_amount[dimensions](
ds=ds, species=species
)
if initial_amount is None:
raise ValueError(
f"Invalid dimension for {species} concentration: {target_amount.units}."
f"Expected dimension: [length]^-2, [mass] * [length]^-2, "
f"[length]^-3, [mass] * [length]^-2 or no dimension."
)
factors[species] = _scaling_factor(
initial_amount=initial_amount,
target_amount=target_amount,
)
return factors
[docs]def human_readable(items: list[str]) -> str:
"""
Transforms a list into readable human text.
Example: ``["a", "b", "c"]`` -> ``"a, b and c"``
Parameters
----------
elements : list
List.
Returns
-------
str
Human readable text.
"""
x = f"{items[0]}"
for s in items[1:-1]:
x += f", {s}"
if len(items) > 1:
x += f" and {items[-1]}"
return x
[docs]def rescale_concentration(
ds: xr.Dataset, factors: dict[str, float], inplace: bool = False
) -> xr.Dataset | None:
"""
Rescale mixing ratios in an atmosphere thermophysical properties data
set by given factors for each species.
Parameters
----------
ds : Dataset
Initial atmosphere thermophysical properties data set.
factors : dict
Mapping of species (str) and scaling factors (float).
inplace : bool
If ``True``, the atmosphere thermophysical properties data set object
is modified.
Else, a new atmosphere thermophysical properties data set object is
returned.
Returns
-------
Dataset
Rescaled atmosphere thermophysical properties data set.
"""
if not inplace:
ds = ds.copy(deep=True)
mr = ds.mr.values
for i, species in enumerate(ds.species.values):
if species in factors:
mr[i] *= factors[species]
machine_epsilon = np.finfo(float).eps
if any(mr.sum(axis=0) > 1.0 + machine_epsilon):
raise ValueError(
f"Cannot rescale concentration with these factors "
f"({factors}) because the sum of mixing ratios would "
f"be larger than 1: {mr.sum(axis=0)}"
)
species = list(factors.keys())
ds.attrs["history"] += (
f"\n"
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - "
f"Concentration-rescaled for {human_readable(species)} - "
f"eradiate.thermoprops.util.rescale_concentration"
)
return ds
[docs]@ureg.wraps(ret=None, args=(None, "km", None, None), strict=False)
def interpolate(
ds: xr.Dataset,
z_level: np.ndarray | pint.Quantity,
method: str = "linear",
conserve_columns: bool = False,
) -> xr.Dataset:
"""
Interpolates an atmosphere thermophysical properties data set onto a
new level altitude mesh.
Parameters
----------
ds : Dataset
Initial atmosphere thermophysical properties data set.
z_level : ndarray
Level altitude mesh [km].
method : str
The method used to interpolate (same for all data variables).
Choose from ``"linear"``, ``"nearest"``, ``"zero"``, ``"slinear"``, \
``"quadratic"``, and ``"cubic"``.
Default: ``"linear"``.
conserve_columns : bool
If ``True``, multiply the number densities in the atmosphere
thermophysical properties data set so that the column number densities
in the initial and interpolated atmosphere thermophysical properties
data set are in same.
Default: ``False``.
Returns
-------
Dataset
Interpolated atmosphere thermophysical properties data set.
Notes
-----
Returns a new atmosphere thermophysical properties data set object.
"""
z_level = ureg.Quantity(z_level, "km")
z_layer = (z_level[1:] + z_level[:-1]) / 2.0
interpolated = ds.interp(
z_layer=z_layer.m_as(ds.z_layer.units),
method=method,
kwargs=dict(fill_value="extrapolate"),
)
z_level_attrs = ds.z_level.attrs
interpolated.update(
dict(z_level=("z_level", z_level.m_as(ds.z_level.units), z_level_attrs))
)
if conserve_columns:
initial_amounts = {s: column_number_density(ds, s) for s in ds.species.values}
factors = compute_scaling_factors(
ds=interpolated, concentration=initial_amounts
)
interpolated = rescale_concentration(
ds=interpolated, factors=factors, inplace=True
)
return interpolated
[docs]@ureg.wraps(ret="Pa", args="K", strict=False)
def water_vapor_saturation_pressure(t: float) -> pint.Quantity:
"""
Computes the water vapor saturation pressure over liquid water or ice, at
the given temperature.
Parameters
----------
t : float
Temperature [K].
Returns
-------
quantity
Water vapor saturation pressure.
Notes
-----
Valid for pressures larger than the triple point pressure (~611 Pa).
"""
try:
import iapws
except ModuleNotFoundError:
warnings.warn(
"To use the collision detection feature, you must install IAPWS.\n"
"See instructions on https://iapws.readthedocs.io/en/latest/modules.html#installation."
)
raise
if t >= 273.15: # water is liquid
p = ureg.Quantity(iapws.iapws97._PSat_T(t), "MPa")
else: # water is solid
p = ureg.Quantity(iapws._iapws._Sublimation_Pressure(t), "MPa")
return p.m_as("Pa")
[docs]@ureg.wraps(ret=ureg.dimensionless, args=("Pa", "K"), strict=False)
def equilibrium_water_vapor_fraction(p: float, t: float) -> pint.Quantity:
"""
Computes the water vapor volume fraction at equilibrium, i.e., when the
rate of condensation of water vapor equals the rate of evaporation of
liquid water or ice, depending on the temperature.
The water vapor volume fraction :math:`x_w` is computed with:
.. math::
x_w(p,T) = \\frac{p_w(T)}{p}
where
* :math:`p` is the pressure,
* :math:`T` is the temperature and
* :math:`p_w` is the water vapor saturation pressure at the given
temperature.
This water vapor volume fraction corresponds to a relative humidity of 100%.
Parameters
----------
p : float
Pressure [Pa].
t : float
Temperature [K].
Returns
-------
quantity
Water vapor volume fraction.
Raises
------
ValueError
If equilibrium cannot be reached in the pressure and temperature
conditions.
Notes
-----
For some values of the pressure and temperature, the equilibrium does not
exist. An exception is raised in those cases.
"""
p_water_vapor = water_vapor_saturation_pressure(t).magnitude
if p_water_vapor <= p:
return p_water_vapor / p
else:
raise ValueError(
f"Equilibrium cannot be reached in these conditions (p = "
f"{round(p, 2)} Pa, t = {round(t, 2)} K)"
)
[docs]def make_profile_regular(profile: xr.Dataset, atol: float) -> xr.Dataset:
"""
Converts the atmosphere thermophysical properties data set with an
irregular altitude mesh to a profile defined over a regular altitude mesh.
Parameters
----------
profile : Dataset
Original atmosphere thermophysical properties data set, defined over
an irregular altitude mesh.
atol : float
Absolute tolerance used in the conversion of the irregular altitude
mesh to a regular altitude mesh.
Returns
-------
Dataset
Converted atmosphere thermophysical properties data set, defined over
a regular altitude mesh.
"""
# compute the regular altitude nodes mesh
regular_z_level = _to_regular(mesh=profile.z_level.values, atol=atol)
# compute corresponding altitude centers mesh
regular_z_layer = (regular_z_level[:-1] + regular_z_level[1:]) / 2.0
# compute the atmospheric variables with the regular altitude mesh
n_z = len(regular_z_layer)
p = np.zeros(n_z)
t = np.zeros(n_z)
n = np.zeros(n_z)
n_species = profile.species.size
mr = np.zeros((n_species, n_z))
layer = 0
for i, z in enumerate(regular_z_layer):
# when altitude is larger than current layer's upper bound, jump to
# the next layer
if z >= profile.z_level.values[layer + 1]:
layer += 1
p[i] = profile.p.values[layer]
t[i] = profile.t.values[layer]
n[i] = profile.n.values[layer]
mr[:, i] = profile.mr.values[:, layer]
species = profile["species"].values
# copy attributes
attrs = profile.attrs
# update history
new_line = (
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - made "
f"profile altitude mesh regular - "
f"eradiate.thermoprops.util.make_profile_regular "
)
attrs["history"] += f"\n{new_line}"
dataset = xr.Dataset(
data_vars=dict(
p=("z_layer", p),
t=("z_layer", t),
n=("z_layer", n),
mr=(("species", "z_layer"), mr),
),
coords={
"z_layer": ("z_layer", regular_z_layer),
"z_level": ("z_level", regular_z_level),
"species": ("species", species),
},
attrs=attrs,
)
return dataset
def _to_regular(mesh: np.ndarray, atol: float) -> np.ndarray:
"""
Converts an irregular altitude mesh into a regular altitude mesh.
Parameters
----------
mesh : ndarray
Irregular altitude mesh with values sorted in increasing order.
atol : float
Absolute tolerance used in the conversion.
Returns
-------
ndarray
Regular altitude mesh.
Warnings
--------
The algorithm is not optimized to find the approximating regular mesh with
the smallest number of layers. Depending on the value of ``atol``, the
resulting mesh size can be large.
Notes
-----
The bound altitudes in the irregular altitude mesh remain the same in
the output regular altitude mesh. Only the intermediate node altitudes
are modified.
"""
n, _ = _find_regular_params_gcd(mesh, atol)
return np.linspace(start=mesh[0], stop=mesh[-1], num=n)
def _find_regular_params_gcd(
mesh: np.ndarray, unit_number: float = 1.0
) -> tuple[int, float]:
"""
Finds the parameters (number of cells, constant cell width) of the
regular mesh that approximates the irregular input mesh.
The algorithm finds the greatest common divisor (GCD) of all cells widths
in the integer representation specified by the parameter ``unit_number``.
This GCD is used to define the constant cells width of the approximating
regular mesh.
Parameters
----------
mesh : ndarray
1-D array with floating point values.
Values must be sorted by increasing order.
unit_number : float, default: 1.0
Defines the unit used to convert the floating point numbers to integer.
numbers.
Returns
-------
int
Number of points in the regular mesh.
float
Value of the constant cells width.
Warnings
--------
There are no safeguards regarding how large the number of cells in the
regular mesh can be. Use the parameter ``unit_number`` with caution.
"""
# Convert float cell widths to integer cell widths
eps = np.finfo(float).eps
if unit_number >= eps:
mesh = np.divide(mesh, unit_number).astype(int)
else:
raise ValueError(
f"Parameter unit_number ({unit_number}) must be "
f"larger than machine epsilon ({eps})."
)
widths = mesh[1:] - mesh[:-1]
# Find the greatest common divisor (GCD) of all integer cell widths
# The constant cell width in the regular mesh is given by that GCD.
from math import gcd
w = gcd(widths[0], widths[1])
for x in widths[2:]:
w = gcd(x, w)
# Compute the number of points in the regular mesh
total_width = mesh[-1] - mesh[0]
n = total_width // w + 1
return n, float(w) * unit_number
# def find_regular_params_tol(mesh, rtol=1e-3, n_cells_max=10000):
# r"""
# Finds the number of cells and constant cell width of the regular 1-D
# mesh that approximates a 1-D irregular mesh the best.
#
# Parameters
# ----------
# mesh : ndarray
# Irregular 1-D mesh. Values must be sorted in increasing order.
#
# rtol : float
# Relative tolerance on the cells widths. This parameter controls the
# accuracy of the approximation.
# The parameters of the approximating regular mesh are computed so that
# for each layer of the irregular mesh, the corresponding layer or layers
# in the regular mesh has a width or have a total width that is not larger
# than ``rtol`` times the width of the cell in the irregular mesh.
#
# Default: 1e-3
#
# n_cells_max : float
# Maximum number of cells in the regular mesh. This parameter controls the
# size of the resulting regular mesh.
#
# Default: 10000
#
# Returns
# -------
# int
# Number of cells in the approximating regular mesh.
# float
# Constant cells width in the approximating regular mesh.
# """
#
# raise NotImplemented
# TODO: implement this function