from __future__ import annotations
import logging
import os
import shutil
import time
import typing as t
from pathlib import Path
import attrs
import pooch
from requests import RequestException
from ._core import DataStore, expand_rules
from ..attrs import define, documented
from ..exceptions import DataError
from ..typing import PathLike
from ..util.misc import LoggingContext
logger = logging.getLogger(__name__)
[docs]
@define
class BlindOnlineDataStore(DataStore):
"""
Serve data downloaded from a remote source without integrity check.
"""
_base_url: str = documented(
attrs.field(converter=lambda x: x + "/" if not x.endswith("/") else x),
type="str",
doc="URL to the online storage location.",
)
path: Path = documented(
attrs.field(converter=lambda x: Path(x).absolute()),
type="Path",
init_type="path-like",
doc="Path to the local cache location.",
)
attempts: int = documented(
attrs.field(default=3, converter=int),
type="int",
doc="Number of download attempts to make before giving up because of "
"connection errors.",
)
@property
def base_url(self) -> str:
# Inherit docstring
return self._base_url
@property
def registry(self) -> dict:
"""
Raises :class:`NotImplementedError` (this data store has no registry).
"""
raise NotImplementedError
[docs]
def registry_files(
self, filter: t.Callable[[t.Any], bool] | None = None
) -> list[str]:
"""
Returns an empty list (this data store has no registry).
"""
return []
[docs]
def fetch(self, filename: PathLike, downloader: t.Callable | None = None) -> Path:
"""
Fetch a file from the data store. This method wraps
:func:`pooch.retrieve` and automatically selects compressed files
when they are available.
Parameters
----------
filename : path-like
File name to fetch from the local storage, relative to the storage
root.
downloader : callable, optional
A callable that will be called to download a given URL to a provided
local file name. This is mostly useful to
`display progress bars <https://www.fatiando.org/pooch/latest/progressbars.html>`_
during download.
Returns
-------
path : Path
Absolute path where the retrieved resource is located.
Notes
-----
If a compressed resource exists, it will be served automatically.
For instance, if ``"foo.nc"`` is requested and ``"foo.nc.gz"`` is
registered, the latter will be downloaded, decompressed and served as
``"foo.nc"``.
"""
fname = Path(filename).as_posix()
url = self.base_url + fname
max_wait = 10
result = None
with LoggingContext(
pooch.get_logger(), level="WARNING"
): # Silence pooch messages temporarily
for i in range(self.attempts):
# Try first to get a compressed file
try:
result = Path(
pooch.retrieve(
url + ".gz",
known_hash=None,
fname=fname + ".gz",
path=self.path,
processor=pooch.processors.Decompress(
name=os.path.basename(fname)
),
)
)
break
except RequestException:
pass
# If no gzip-compressed file is available, try the actual file
try:
result = Path(
pooch.retrieve(
url,
known_hash=None,
fname=fname,
path=self.path,
),
)
break
except RequestException:
pass
# If we get here, it means download failed
retries_left = self.attempts - (i + 1)
logger.info(
"Failed to download '%s'. "
"Will attempt the download again %d more time%s.",
fname,
retries_left,
"s" if retries_left > 1 else "",
)
time.sleep(min(i + 1, max_wait))
if result is None:
raise DataError(
f"file '{fname}' could not be retrieved from {self.base_url}"
)
else:
return result
[docs]
def purge(self, keep: None | str | list[str] = None) -> None:
"""
Purge local storage location. The default behaviour is very aggressive
and will wipe out the entire directory contents.
Parameters
----------
keep : str or list of str, optional
A list of exclusion rules (paths relative to the store's local
storage root, shell wildcards allowed) defining files which should
be excluded from the purge process.
Warnings
--------
This is a destructive operation, make sure you know what you're doing!
"""
if keep is not None and not isinstance(keep, (list, tuple)):
keep = list(keep)
# Fast track for simple case
if not keep:
for filename in os.scandir(self.path):
file_path = os.path.join(self.path, filename)
if os.path.isfile(file_path) or os.path.islink(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
return
# List files to keep and delete
included = expand_rules(rules=["**/*"], prefix=self.path)
excluded = expand_rules(rules=keep, prefix=self.path)
remove = sorted(included - excluded)
for file in remove:
os.remove(file)
# Clean up empty directories
for x in self.path.iterdir():
if x.is_dir():
try:
os.removedirs(x)
except OSError:
pass