"""Spectral response function (SRF) filtering algorithms."""
from __future__ import annotations
import datetime
import typing as t
import warnings
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pint
import rich
import xarray as xr
from rich.prompt import Confirm
from rich.table import Table
from eradiate import __version__
from . import converters, data
from .exceptions import DataError
from .typing import PathLike
from .units import to_quantity
from .units import unit_registry as ureg
def load_from_id(value: str) -> xr.Dataset:
"""Load an SRF dataset from its identifier."""
# look for the prepared version of the SRF
try:
return data.load_dataset(f"spectra/srf/{value}.nc")
# if it doesn't exist, load the raw SRF
except DataError as e:
if value.endswith("-raw"):
raise e
else:
warnings.warn(
f"Could not serve SRF '{value}' from the data store."
f"Trying to serve '{value}-raw' instead."
)
return data.load_dataset(f"spectra/srf/{value}-raw.nc")
convert = converters.to_dataset(load_from_id=load_from_id)
convert_no_id = converters.to_dataset(load_from_id=None)
def update_attrs(srf: xr.Dataset, filter_name: str, filter_attr: str) -> None:
"""
Update data set attributes to indicate that a filtering operation occurred.
Parameters
----------
srf: Dataset
Data set whose attributes to update.
filter_name: str
Filter name.
filter_attr: str
Content of the 'filter' attribute.
Notes
-----
* The update is done inplace.
* If the dataset has no ``filter`` attribute, it is created. Otherwise, a
new line is appended to it.
"""
# filter attribute
_value = srf.attrs.get("filter")
previous_filter = _value + "\n" if _value is not None else ""
# history attribute
previous_history = srf.attrs["history"] + "\n"
utcnow = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
author = f"eradiate {__version__}"
history_attr = f"{utcnow} - data set filtering ({filter_name}) - {author}"
# update attributes
srf.attrs.update(
{
"history": f"{previous_history}{history_attr}",
"filter": f"{previous_filter}{filter_attr}",
}
)
[docs]
def wavelength_range_width(srf: PathLike | xr.Dataset) -> pint.Quantity:
"""
Compute the wavelength range width of a spectral response function.
Parameters
----------
srf: path-like, Dataset
Spectral response function data set.
Notes
-----
The wavelength range width is defined as the difference between the upper
and lower wavelength bounds.
"""
srf = convert_no_id(srf)
return to_quantity(srf.w).max() - to_quantity(srf.w).min()
[docs]
def wavelength_bandwidth(srf: PathLike | xr.Dataset) -> pint.Quantity:
r"""
Compute the wavelength bandwidth of a spectral response function.
Parameters
----------
srf: path-like, Dataset
Spectral response function data set.
Notes
-----
The wavelength bandwidth is defined according to the following formula:
.. math::
\Delta \lambda = \int \lambda \, \phi(\lambda) \mathrm{d} \lambda
where
* :math:`\phi(\lambda)` is the spectral response function.
* :math:`\lambda` is the wavelength.
* :math:`\Delta \lambda` is the wavelength bandwidth.
and the integral is performed over the wavelength range of the spectral
response function.
"""
srf = convert_no_id(srf)
return srf.srf.integrate(coord="w").values * ureg(srf.w.attrs["units"])
[docs]
def mean_wavelength(srf: PathLike | xr.Dataset) -> pint.Quantity:
r"""
Compute the mean wavelength of a spectral response function.
Parameters
----------
srf: path-like, Dataset
Spectral response function data set.
Notes
-----
The mean wavelength is defined according to the following formula:
.. math::
\overline{\lambda} = \frac{
\int \lambda \, \phi(\lambda) \, \mathrm{d} \lambda
}{
\int \phi(\lambda) \, \mathrm{d} \lambda
}
where
* :math:`\phi(\lambda)` is the spectral response function.
* :math:`\lambda` is the wavelength.
* :math:`\overline{\lambda}` is the mean wavelength.
and the integrals are performed over the wavelength range of the spectral
response function.
"""
srf = convert_no_id(srf)
srf_integrated = srf.srf.integrate(coord="w")
mean_value = (srf.w * srf.srf).integrate(coord="w").values / srf_integrated.values
return mean_value * ureg(srf.w.attrs["units"])
def filtering_summary(
srf: PathLike | xr.Dataset, filtered: xr.Dataset
) -> t.Mapping[str, t.Mapping[str, int | pint.Quantity]]:
srf = convert_no_id(srf)
ni = srf.w.size
nf = filtered.w.size
w_min_i = to_quantity(srf.w).min()
w_max_i = to_quantity(srf.w).max()
w_min_f = to_quantity(filtered.w).min()
w_max_f = to_quantity(filtered.w).max()
range_width_i = wavelength_range_width(srf)
range_width_f = wavelength_range_width(filtered)
bandwidth_i = wavelength_bandwidth(srf)
bandwidth_f = wavelength_bandwidth(filtered)
mean_wavelength_i = mean_wavelength(srf)
mean_wavelength_f = mean_wavelength(filtered)
return {
"Lower wavelength": {
"Initial": f"{w_min_i:.1f~}",
"Final": f"{w_min_f:.1f~}",
"Difference": f"{(w_min_f - w_min_i):.1f~}",
},
"Upper wavelength": {
"Initial": f"{w_max_i:.1f~}",
"Final": f"{w_max_f:.1f~}",
"Difference": f"{(w_max_f - w_max_i):.1f~}",
},
"# wavelength": {
"Initial": f"{ni}",
"Final": f"{nf}",
"Difference": f"{(nf - ni)}",
},
"Wavelength range width": {
"Initial": f"{range_width_i:.1f~}",
"Final": f"{range_width_f:.1f~}",
"Difference": f"{(range_width_f - range_width_i):.1f~}",
},
"Wavelength bandwidth": {
"Initial": f"{bandwidth_i:.1f~}",
"Final": f"{bandwidth_f:.1f~}",
"Difference": f"{(bandwidth_f - bandwidth_i):.1f~}",
},
"Mean wavelength": {
"Initial": f"{mean_wavelength_i:.1f~}",
"Final": f"{mean_wavelength_f:.1f~}",
"Difference": f"{(mean_wavelength_f - mean_wavelength_i):.1f~}",
},
}
[docs]
def summarize(
srf: PathLike | xr.Dataset,
filtered: PathLike | xr.Dataset,
) -> Table:
"""
Produce a summary table of the spectral response function filtering operation.
Parameters
----------
srf: Dataset
Initial spectral response function data set.
filtered: Dataset
Filtered spectral response function data set.
Returns
-------
:class:`~rich.table.Table`
Summary table.
"""
# convert inputs
srf = convert_no_id(srf)
filtered = convert_no_id(filtered)
summary = filtering_summary(srf=srf, filtered=filtered)
table = Table(
title="Filtering summary",
box=rich.box.HEAVY_HEAD,
)
columns = {
"Characteristic": "left",
"Initial": "center",
"Final": "center",
"Difference": "center",
}
for name, justification in columns.items():
table.add_column(name, justify=justification)
for key in summary:
row_items = [key] + list(summary[key].values())
if key == "Difference":
pass
else:
table.add_row(*row_items)
return table
[docs]
def trim(srf: PathLike | xr.Dataset) -> xr.Dataset:
"""
Trim all leading zeros except last and all trailing zeros except first.
Parameters
----------
srf: path-like, Dataset
Data set to trim.
Returns
-------
Dataset
Trimmed data set.
"""
ds = convert_no_id(srf)
# trim
wsize = ds.srf.values.size
fsize = np.trim_zeros(ds.srf.values, trim="f").size
bsize = np.trim_zeros(ds.srf.values, trim="b").size
istart = wsize - fsize - 1 if wsize > fsize else 0
istop = bsize if bsize < wsize else wsize - 1
trimmed = ds.isel(w=range(istart, istop + 1))
# update history attribute
previous_history = ds.attrs["history"] + "\n"
utcnow = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
author = f"eradiate, version {__version__}"
history_attr = f"{utcnow} - trimmed data set - {author}"
trimmed.attrs.update({"history": f"{previous_history}{history_attr}"})
return trimmed
def save(
ds: xr.Dataset,
path: PathLike,
verbose: bool = False,
dry_run: bool = False,
) -> None:
"""Save dataset to disk.
Parameters
----------
ds: Dataset
Dataset to save.
path: path-like
Path to which to save the dataset.
verbose: bool
If ``True``, display where the dataset is saved.
dry_run: bool
If ``True``, display where the dataset would be saved but does not save
it.
"""
output_path = Path(path).absolute()
if dry_run:
rich.print(f"Would write filtered data to {output_path}")
else:
if verbose:
rich.print(f"Writing filtered data to {output_path}")
ds.to_netcdf(path)
[docs]
def trim_and_save(
srf: PathLike | xr.Dataset,
path: PathLike,
verbose=False,
show_plot: bool = False,
dry_run: bool = False,
interactive: bool = False,
) -> None:
"""
Wraps around :meth:`trim` to save the filtered dataset.
Parameters
----------
srf: path-like, Dataset
Data set to trim.
path: path-like
Path to which to save the filtered dataset.
verbose : bool
If ``True``, display a summary of the trimming operation.
show_plot: bool
If ``True``, display a figure illustrating the filtered region.
dry_run: bool
If ``True``, displays where the trimmed data set would be saved but
does not save it.
interactive: bool
If ``True``, prompt the user to proceed to saving the filtered dataset.
This is useful in combination with ``verbose=True`` and
``show_plot=True``.
See Also
--------
:meth:`trim`
"""
# convert inputs
ds = convert_no_id(srf)
output_path = Path(path).absolute()
# trim
trimmed = trim(srf=srf)
# print trimming summary table
if verbose:
table = summarize(srf=ds, filtered=trimmed)
rich.print(table)
# save trimmed dataset
if show_plot:
show(
ds=ds,
title=" ".join(
[srf.attrs["platform"], srf.attrs["instrument"], srf.attrs["band"]]
),
trim_prior=False,
threshold=None,
wmin=to_quantity(trimmed.w).min(),
wmax=to_quantity(trimmed.w).max(),
percentage=None,
)
if interactive:
if not Confirm.ask("Save filtered dataset?"):
return
save(
ds=trimmed,
path=output_path,
verbose=verbose,
dry_run=dry_run,
)
[docs]
def spectral_filter(
srf: PathLike | xr.Dataset,
wmin: pint.Quantity | None = None,
wmax: pint.Quantity | None = None,
) -> xr.Dataset:
"""
Drop points falling out of wavelength range specified by ``wmin`` and ``wmax``.
Parameters
----------
srf: path-like or Dataset
Spectral response function data set to filter.
wmin: quantity
Minimum wavelength.
wmax: quantity
Maximum wavelength.
Returns
-------
Dataset
Filtered data set.
"""
srf = convert_no_id(srf)
# filter
w_units = srf.w.attrs["units"]
if wmin is not None:
_wmin = wmin.m_as(w_units)
filtered = srf.where(srf.w >= _wmin, drop=True)
_wmin = f"{_wmin:.2f}"
else:
filtered = srf
_wmin = 0.0
if wmax is not None:
_wmax = wmax.m_as(w_units)
filtered = filtered.where(srf.w <= _wmax, drop=True)
_wmax = f"{_wmax:.2f}"
else:
_wmax = "∞"
# update filtered data set attributes
update_attrs(
srf=filtered,
filter_name="spectral filter",
filter_attr=(
f"All points in the original data set that fell out of the "
f"wavelength range [{_wmin}, {_wmax}] {w_units} were dropped."
),
)
# sanity check
if filtered.w.size == 0:
raise ValueError(
f"Filtering this data set with wmin = {wmin} and wmax = {wmax} "
f"would result in empty data set."
)
return filtered
[docs]
def threshold_filter(
srf: PathLike | xr.Dataset,
value: float = 1e-3,
) -> xr.Dataset:
"""
Drop data points where response is smaller or equal than a threshold value.
Parameters
----------
srf: path-like or Dataset
Spectral response function data set to filter.
value: float
Spectral response threshold value.
Raises
------
ValueError: if the threshold value is not in [0, 1[.
Returns
-------
Dataset
Filtered data set.
"""
# Convert input
srf = convert_no_id(srf)
# validate input
if value < 0.0 or value >= 1.0:
raise ValueError(f"threshold value should be in [0, 1[ (got {value}).")
# check that filtering does not disconnect the wavelength space
filter_indices = np.where(srf.srf.values > value)[0]
consecutive = np.arange(filter_indices[0], filter_indices[0] + filter_indices.size)
if not np.all(filter_indices == consecutive):
warnings.warn(
f"Filtering this data set with threshold value of {value} would "
"disconnect the wavelength space. You probably do not want that."
)
# filter
filtered = srf.where(srf.srf > value, drop=True)
# update filtered data set attributes
update_attrs(
srf=filtered,
filter_name="threshold filter",
filter_attr=(
f"All points in the original data set where the spectral response "
f"function evaluated to {value} or smaller were dropped."
),
)
# sanity check
if filtered.w.size == 0:
raise ValueError(
f"Filtering this data set with threshold value of {value} would "
f"result in empty data set."
)
return filtered
def integral_filter_w_bounds(
ds: PathLike | xr.Dataset, percentage: float = 99.0
) -> tuple[float, float]:
"""
Compute the wavelength bounds for the integral filter.
The integrated spectral reponse is computed from the cumulative sum of the
spectral response function interpolated on a regular grid with the
smallest wavelength step of the initial data set.
Parameters
----------
ds: path-like or Dataset
Dataset to filter.
percentage: float
Keep data that contribute to this percentage of the integrated spectral
response.
Returns
-------
tuple
Wavelength bounds.
"""
# convert input
ds = convert_no_id(ds)
# interpolate the spectral reponse on a regular wavelength mesh
dwmin = ds.w.diff(dim="w").values.min()
wmin = ds.w.values.min()
wmax = ds.w.values.max()
wnum = int((wmax - wmin) / dwmin) + 1
wreg = np.linspace(wmin, wmax, wnum)
srfreg = ds.srf.interp(w=wreg)
# compute the cumulative sums of the interpolated spectral response
cumsum = np.cumsum(srfreg)
cumsum_max = cumsum.values.max() # this is the integrated spectral response
# this is half the fraction of data points to discard:
halffraction = (1 - (percentage / 100)) / 2
# wavelength points that contribute to less than half the percentage of the
# integrated response function, starting from the smallest wavelength values
w_left = cumsum.where(cumsum < halffraction * cumsum_max, drop=True).w.values
# wavelength points that contribute to less than half the percentage of the
# integrated response function, starting from the smallest wavelength values
w_right = cumsum.where(cumsum > (1 - halffraction) * cumsum_max, drop=True).w.values
# determine the wavelength bounds to discard these wavelength points that
# contribute to less than the specified percentage of the integrated
# spectral response:
w0 = w_left[-1] if w_left.size > 0 else wmin
w1 = w_right[0] if w_right.size > 0 else wmax
return w0, w1
[docs]
def integral_filter(
srf: PathLike | xr.Dataset,
percentage: float = 99.0,
) -> xr.Dataset:
"""
Keep only data that contribute to the integrated spectral response value
to the amount of the specified percentage.
The integrated spectral reponse is computed from the cumulative sum of the
spectral response function interpolated on a regular grid with the
smallest wavelength step of the initial data set.
Parameters
----------
ds: path-like or Dataset
Dataset to filter.
percentage: float
Keep data that contribute to this percentage of the integrated spectral
response.
Raises
------
ValueError: if the percentage value is not in [0, 100].
Returns
-------
Dataset
Filtered data set.
"""
# convert input
srf = convert_no_id(srf)
# validate_percentage
if percentage < 0.0 or percentage > 100.0:
raise ValueError(f"value must be within [0, 100.0] (got {percentage})")
# compute wavelength bounds and filter
w_bounds = integral_filter_w_bounds(ds=srf, percentage=percentage)
filtered = srf.where(srf.w >= w_bounds[0], drop=True).where(
srf.w <= w_bounds[1], drop=True
)
# update filtered data set attributes
update_attrs(
srf=filtered,
filter_name="integral filter",
filter_attr=(
f"Data points that did not contribute to {percentage} % of the "
f"integrated spectral reponse were dropped."
),
)
# sanity check
if filtered.w.size == 0:
raise ValueError(
f"Filtering this data set with {percentage=} "
f"would result in empty data set."
)
return filtered
[docs]
def show(
ds: PathLike | xr.Dataset,
trim_prior: bool = True,
title: str | None = None,
threshold: float | None = None,
wmin: pint.Quantity | None = None,
wmax: pint.Quantity | None = None,
percentage: float | None = None,
) -> None:
"""
Show filtered region on spectral response function plot.
Parameters
----------
ds: path-like, Dataset
Spectral response function to be filtered.
trim_prior: bool
If ``True``, trim spectral response function prior to filter.
title: str, optional
Figure title.
threshold: float, optional
Threshold value for the threshold filter.
wmin: quantity, optional
Lower wavelength value for the spectral filter.
wmax: quantity, optional
Upper wavelength value for the spectral filter.
percentage: float, optional
Percentage value for the integral filter.
Raises
------
ValueError:
If the threshold value is not in [0, 1[.
"""
# convert input
ds = convert_no_id(ds)
# setup figure
plt.figure(dpi=100)
ax = plt.gca()
plt_params = {
"lw": 0.6,
"marker": ".",
"markersize": 2,
"yscale": "log",
}
# optionally trim
if trim_prior:
trimmed = trim(srf=ds)
else:
trimmed = ds
plt_params.update({"yscale": "linear"})
# plot
trimmed.srf.plot(**plt_params)
if threshold is not None:
if threshold < 0.0 or threshold >= 1.0:
raise ValueError(f"threshold value should be in [0, 1[ (got {threshold}).")
plt.axhline(y=threshold, color="red", lw=0.5)
# keep region
ax.fill_between(
trimmed.w.values,
0,
trimmed.srf.values,
where=trimmed.srf.values > threshold,
facecolor="green",
alpha=0.1,
)
plt.axhline(y=threshold, color="red", lw=0.5)
# drop region
ax.fill_between(
trimmed.w.values,
0,
trimmed.srf.values,
where=trimmed.srf.values <= threshold,
facecolor="red",
alpha=0.1,
)
if wmin is not None or wmax is not None:
if wmin is not None:
_wmin_value = wmin.m_as(ds.w.attrs["units"])
plt.axvline(x=_wmin_value, color="red", lw=0.5)
# drop region
ax.fill_between(
trimmed.w.values,
0,
trimmed.srf.values,
where=trimmed.w.values < _wmin_value,
facecolor="red",
alpha=0.1,
)
else:
_wmin_value = 0.0
if wmax is not None:
_wmax_value = wmax.m_as(ds.w.attrs["units"])
plt.axvline(x=_wmax_value, color="red", lw=0.5)
# drop region
ax.fill_between(
trimmed.w.values,
0,
trimmed.srf.values,
where=trimmed.w.values > _wmax_value,
facecolor="red",
alpha=0.1,
)
else:
_wmax_value = np.inf
# keep region
ax.fill_between(
trimmed.w.values,
0,
trimmed.srf.values,
where=(
(trimmed.w.values >= _wmin_value) & (trimmed.w.values <= _wmax_value)
),
facecolor="green",
alpha=0.1,
)
if percentage is not None:
wmin, wmax = integral_filter_w_bounds(ds=ds, percentage=percentage)
# drop regions
plt.axvline(x=wmin, color="red", lw=0.5)
ax.fill_between(
trimmed.w.values,
0,
trimmed.srf.values,
where=trimmed.w.values < wmin,
facecolor="red",
alpha=0.1,
)
plt.axvline(x=wmax, color="red", lw=0.5)
ax.fill_between(
trimmed.w.values,
0,
trimmed.srf.values,
where=trimmed.w.values > wmax,
facecolor="red",
alpha=0.1,
)
# keep region
ax.fill_between(
trimmed.w.values,
0,
trimmed.srf.values,
where=((trimmed.w.values >= wmin) & (trimmed.w.values <= wmax)),
facecolor="green",
alpha=0.1,
)
if title is not None:
plt.title(title)
plt.tight_layout()
plt.grid()
plt.show()
[docs]
def filter_srf(
srf: PathLike | xr.Dataset,
path: PathLike,
verbose: bool = False,
show_plot: bool = False,
dry_run: bool = False,
interactive: bool = False,
trim_prior: bool = True,
threshold: float | None = None,
wmin: pint.Quantity | None = None,
wmax: pint.Quantity | None = None,
percentage: float | None = None,
) -> None:
"""
Filter a spectral response function data set.
Parameters
----------
srf: path-like, Dataset
Spectral response function data set to filter.
path: path-like
Path to which to save the filtered data set.
verbose: bool
If ``True``, display a filtering summary table and the path to which
filtered data set is saved.
show_plot: bool
If ``True``, display a figure emphasizing the filtered region.
dry_run: bool
If ``True``, display the path to which the filtered data set would be
saved, but does not write the data set to the disk.
interactive: bool
If ``True``, prompt the user to proceed to saving the filtered dataset.
This is useful in combination with ``verbose=True`` and
``show_plot=True``.
trim_prior: bool
Trim the data set prior to filtering.
threshold: float, optional
Threshold value for the threshold filter.
See :meth:`threshold_filter`.
If ``None``, disable the threshold filter.
wmin: quantity, optional
Lower wavelength bound for the spectral filter.
See :meth:`spectral_filter`.
If both ``wmin`` and ``wmax`` are ``None``, disable the spectral filter.
wmax: quantity, optional
Upper wavelength bound for the spectral filter.
See :meth:`spectral_filter`.
If both ``wmin`` and ``wmax`` are ``None``, disable the spectral filter.
percentage: float, optional
Percentage value for the integral filter.
See :meth:`integral_filter`.
If ``None``, disable the integral filter.
Notes
-----
Select the filtering algorithms corresponding to the specified parameters.
If multiple filters are selected, they are applied in this order:
* integral filter
* spectral filter
* threshold filter
See Also
--------
:meth:`integral_filter`, :meth:`spectral_filter`,
:meth:`threshold_filter`
"""
# convert inputs
srf = convert_no_id(srf)
output_path = Path(path).absolute()
# trimming
trimmed = trim(srf=srf) if trim_prior else srf
filtered = trimmed
# integral filter
if percentage is not None:
filtered = integral_filter(srf=filtered, percentage=percentage)
# spectral filter
if wmin is not None or wmax is not None:
filtered = spectral_filter(srf=filtered, wmin=wmin, wmax=wmax)
# threshold filter
if threshold is not None:
filtered = threshold_filter(srf=filtered, value=threshold)
# print filtering summary table
if verbose:
table = summarize(srf=srf, filtered=filtered)
rich.print(table)
# save filtered dataset
if show_plot:
show(
ds=trimmed,
title=" ".join(
[srf.attrs["platform"], srf.attrs["instrument"], srf.attrs["band"]]
),
trim_prior=trim_prior,
threshold=threshold,
wmin=wmin,
wmax=wmax,
percentage=percentage,
)
if interactive:
if not Confirm.ask("Save filtered dataset?"):
return
save(
ds=filtered,
path=output_path,
verbose=verbose,
dry_run=dry_run,
)