Source code for eradiate.units

from __future__ import annotations

__all__ = [
    "symbol",
    "to_quantity",
    "unit_context_config",
    "unit_context_kernel",
    "unit_registry",
    "PhysicalQuantity",
]

import enum
import importlib
import logging
import typing as t
from functools import lru_cache

import pint
import pinttr
import xarray
from importlib_resources import files
from pinttr.exceptions import UnitsError
from pinttr.util import units_compatible

logger = logging.getLogger(__name__)


# -- Global data members -------------------------------------------------------

# Joseki being a dependency of Eradiate, and given that it uses the application
# registry, we import its definitions early
importlib.import_module("joseki.units")

#: Unit registry common to all Eradiate components. All units used in Eradiate
#: must be created using this registry. Aliased in :mod:`eradiate`.
unit_registry = pint.get_application_registry()


def _parse_definitions(path):
    # Parse a unit definition file (i.e. strip it from line comments and empty
    # lines)
    definitions = []
    with open(path, "r") as f:
        for line in f:
            line = line.strip()

            if line.startswith("#") or not line:  # Skip comments and empty lines
                continue
            else:
                definitions.append(line)

    return definitions


def _load_definitions(ureg: pint.UnitRegistry, definitions: list[str]) -> None:
    # Add extra definitions, possibly skipping those that already exist. This is
    # a very simple wrapper around Pint's usual unit definition logic.

    for definition in definitions:
        _unit_name, _unit_definition = list(
            map(lambda x: x.strip(), definition.split("="))
        )[0:2]

        if _unit_name in ureg:
            if 1.0 * ureg(_unit_definition) == 1.0 * ureg(_unit_name):
                # Definitions are identical: skip
                # Note: we don't check symbols or aliases, might cause bugs in
                # very rare cases
                continue
            else:
                # Definitions are different: attempt a redefinition and let
                # user-controlled conflict handling policy apply
                ureg.define(definition)
        else:
            # Attempt definition
            ureg.define(definition)


_load_definitions(unit_registry, _parse_definitions(files("eradiate") / "units.txt"))


[docs] class PhysicalQuantity(enum.Enum): """An enumeration defining physical quantities known to Eradiate.""" ALBEDO = "albedo" ANGLE = "angle" COLLISION_COEFFICIENT = "collision_coefficient" DIMENSIONLESS = "dimensionless" INTENSITY = "intensity" IRRADIANCE = "irradiance" LENGTH = "length" MASS = "mass" RADIANCE = "radiance" REFLECTANCE = "reflectance" SPEED = "speed" TIME = "time" TRANSMITTANCE = "transmittance" WAVELENGTH = "wavelength" WAVENUMBER = "wavenumber"
[docs] @classmethod @lru_cache(maxsize=32) def spectrum(cls): """ Return a tuple containing a subset of :class:`PhysicalQuantity` members suitable for :class:`.Spectrum` initialisation. This function caches its results for improved efficiency. """ return ( cls.ALBEDO, cls.COLLISION_COEFFICIENT, cls.DIMENSIONLESS, cls.INTENSITY, cls.IRRADIANCE, cls.RADIANCE, cls.REFLECTANCE, cls.TRANSMITTANCE, cls.ANGLE, )
def _make_unit_context(): uctx = pinttr.UnitContext( interpret_str=True, ureg=unit_registry, key_converter=PhysicalQuantity ) # fmt: off for key, value in { # We allow for dimensionless quantities PhysicalQuantity.DIMENSIONLESS: pinttr.UnitGenerator(unit_registry.dimensionless), # noqa: E501 # Basic quantities must be named after their SI name # https://en.wikipedia.org/wiki/International_System_of_Units PhysicalQuantity.LENGTH: pinttr.UnitGenerator(unit_registry.m), PhysicalQuantity.TIME: pinttr.UnitGenerator(unit_registry.s), PhysicalQuantity.MASS: pinttr.UnitGenerator(unit_registry.kg), # Derived quantity names are more flexible PhysicalQuantity.ALBEDO: pinttr.UnitGenerator(unit_registry.dimensionless), PhysicalQuantity.ANGLE: pinttr.UnitGenerator(unit_registry.deg), PhysicalQuantity.REFLECTANCE: pinttr.UnitGenerator(unit_registry.dimensionless), PhysicalQuantity.TRANSMITTANCE: pinttr.UnitGenerator(unit_registry.dimensionless), # noqa: E501 PhysicalQuantity.WAVELENGTH: pinttr.UnitGenerator(unit_registry.nm), PhysicalQuantity.WAVENUMBER: pinttr.UnitGenerator(unit_registry.cm ** -1), }.items(): uctx.register(key, value) # fmt: on # The following quantities will update automatically based on their parent units uctx.register( PhysicalQuantity.COLLISION_COEFFICIENT, pinttr.UnitGenerator(lambda: uctx.get(PhysicalQuantity.LENGTH) ** -1), ) uctx.register( PhysicalQuantity.INTENSITY, pinttr.UnitGenerator( lambda: unit_registry.watt / unit_registry.steradian / uctx.get(PhysicalQuantity.WAVELENGTH) ), ) uctx.register( PhysicalQuantity.IRRADIANCE, pinttr.UnitGenerator( lambda: unit_registry.watt / uctx.get(PhysicalQuantity.LENGTH) ** 2 / uctx.get(PhysicalQuantity.WAVELENGTH) ), ) uctx.register( PhysicalQuantity.RADIANCE, pinttr.UnitGenerator( lambda: unit_registry.watt / uctx.get(PhysicalQuantity.LENGTH) ** 2 / unit_registry.steradian / uctx.get(PhysicalQuantity.WAVELENGTH) ), ) return uctx #: Unit context used when interpreting config dictionaries. #: Aliased in :mod:`eradiate`. unit_context_config = _make_unit_context() #: Unit context used when building kernel dictionaries. #: Aliased in :mod:`eradiate`. unit_context_kernel = _make_unit_context() # -- Public functions ----------------------------------------------------------
[docs] def symbol(units: pint.Unit | str) -> str: """ Normalize a string or Pint units to a symbol string. Parameters ---------- units : :class:`pint.Unit` or str Value to convert to a symbol string. Returns ------- str Symbol string (*e.g.* ``'m'`` for ``'metre'``, ``'W / m ** 2'`` for ``'W/m^2'``, etc.). """ units = unit_registry.Unit(units) return format(units, "~")
[docs] def to_quantity(da: xarray.DataArray) -> pint.Quantity: """ Converts a :class:`~xarray.DataArray` to a :class:`~pint.Quantity`. The array's ``attrs`` metadata mapping must contain a ``units`` field. Parameters ---------- da : DataArray :class:`~xarray.DataArray` instance which will be converted. Returns ------- quantity The corresponding Pint quantity. Raises ------ ValueError If the array's metadata do not contain a ``units`` field. Notes ----- This function can also be used on coordinate variables. """ try: units = da.attrs["units"] except KeyError as e: raise ValueError("this DataArray has no 'units' metadata field") from e else: return unit_registry.Quantity(da.values, units)
[docs] def interpret_quantities( d: dict[str, t.Any], quantity_map: dict[str, str], uctx: pinttr.UnitContext, force: bool = False, ): """ Advanced unit interpretation and wrapping for dictionaries. This function first calls :func:`pinttr.interpret_units` to interpret units attached to a given field. Then, it converts quantities and possibly applies default units to fields specified in ``quantity_map`` based on ``uctx``. Parameters ---------- d : dict Dictionary to apply unit conversion, checking and defaults. quantity_map : dict[str, str] Dictionary mapping fields to quantity identifiers (see :class:`.PhysicalQuantity` for valid quantity IDs). uctx : :class:`pinttr.UnitContext` Unit context containing quantity and default units definitions. force : bool, default: False If ``True``, fields specified as quantities will be converted to target units; otherwise, only units compatibility will be checked. Returns ------- dict Dictionary with units interpreted and checked, and default units applied to relevant fields. Raises ------ :class:`pinttr.UnitsError`: If a field and its mapped quantity have incompatible units. See Also -------- :class:`.PhysicalQuantity` """ ureg = uctx.ureg # Interpret unit fields result = pinttr.interpret_units(d, ureg) # Convert to or apply default units based on the unit map if quantity_map is None: quantity_map = {} for key, quantity in quantity_map.items(): value = result[key] if isinstance(value, pint.Quantity): units = uctx.get(quantity) if not units_compatible(value.units, units): raise UnitsError(value.units, units) if force: result[key] = value.to(units) else: result[key] = value else: result[key] = ureg.Quantity(result[key], uctx.get(quantity)) return result