Source code for veros.tools.assets

import os
import json
import shutil
import hashlib
import urllib.parse as urlparse

import requests

from veros.tools.filelock import FileLock
from veros import logger, runtime_state


ASSET_DIRECTORY = os.environ.get("VEROS_ASSET_DIR") or os.path.join(os.path.expanduser("~"), ".veros", "assets")


class AssetError(Exception):
    pass


class AssetStore:
    def __init__(self, asset_dir, asset_config, skip_md5=False):
        self._asset_dir = asset_dir
        self._asset_config = asset_config
        self._stored_assets = {}
        self._skip_md5 = skip_md5

    def _get_asset(self, key):
        url = self._asset_config[key]["url"]
        md5 = self._asset_config[key].get("md5")
        skip_md5 = self._skip_md5

        target_filename = os.path.basename(urlparse.urlparse(url).path)
        target_path = os.path.join(self._asset_dir, target_filename)
        target_lock = target_path + ".lock"

        with FileLock(target_lock):
            if not os.path.isfile(target_path):
                logger.info("Downloading asset {} ...", target_filename)
                _download_file(url, target_path)
                # always validate freshly downloaded files
                skip_md5 = False

            check_md5 = not skip_md5 and md5 is not None and runtime_state.proc_rank == 0
            if check_md5:
                if _filehash(target_path) != md5:
                    raise AssetError(f"Mismatching MD5 checksum on asset {target_filename}")

        return target_path

    def keys(self):
        return self._asset_config.keys()

    def __contains__(self, key):
        return key in self.keys()

    def __getitem__(self, key):
        if key not in self:
            raise KeyError(f"unknown asset {key}")

        if key not in self._stored_assets:
            self._stored_assets[key] = self._get_asset(key)

        return self._stored_assets[key]

    def __repr__(self):
        out = f"{self.__class__.__name__}(asset_dir={self._asset_dir}, asset_config={self._asset_config})"
        return out


[docs] def get_assets(asset_id, asset_file, skip_md5=False): """Handles automatic download and verification of external assets (such as forcing files). By default, assets are stored in ``$HOME/.veros/assets`` (can be overwritten by setting ``VEROS_ASSET_DIR`` environment variable to the desired location). Arguments: asset_id (str): Identifier of the collection of assets. Should be unique for each setup. asset_file (str): JSON file containing URLs and (optionally) MD5 hashsums of each asset. skip_md5 (bool): Whether to skip MD5 checksum validation (useful for huge asset files) Returns: A ``dict``-like mapping of each asset to file name on disk. Assets are downloaded lazily. Example: >>> assets = get_assets('mysetup', 'assets.json') >>> assets['forcing'] "/home/user/.veros/assets/mysetup/mysetup_forcing.h5", "initial_conditions": "/home/user/.veros/assets/mysetup/initial.h5" } In this case, ``assets.json`` contains:: { "forcing": { "url": "https://mywebsite.com/veros_assets/mysetup_forcing.h5", "md5": "ef3be0a58782771c8ee5a6d0206b87f6" }, "initial_conditions": { "url": "https://mywebsite.com/veros_assets/initial.h5", "md5": "d1b4e0e199d7a5883cf7c88d3d6bcb28" } } """ with open(asset_file, "r") as f: assets = json.load(f) asset_dir = os.path.join(ASSET_DIRECTORY, asset_id) if not os.path.isdir(asset_dir): try: # possible race-condition os.makedirs(asset_dir) except OSError: if os.path.isdir(asset_dir): pass return AssetStore(asset_dir, assets, skip_md5)
def _download_file(url, target_path, timeout=10): """Download a file and save it to a folder""" tmpfile = f"{target_path}.incomplete" with requests.get(url, stream=True, timeout=timeout) as response: response.raise_for_status() response.raw.decode_content = True try: with open(tmpfile, "wb") as dst: shutil.copyfileobj(response.raw, dst) except: # noqa: E722 os.remove(tmpfile) raise shutil.move(tmpfile, target_path) return target_path def _filehash(path): hash_md5 = hashlib.md5() with open(path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest()