Source code for veros.diagnostics.base

import abc

import os

from veros.io_tools import netcdf as nctools
from veros.signals import do_not_disturb
from veros.state import VerosVariables
from veros import distributed, runtime_settings, time


[docs]class VerosDiagnostic(metaclass=abc.ABCMeta): """Base class for diagnostics. Provides an interface and wrappers for common I/O. Any diagnostic needs to implement the 5 interface methods and set some attributes. """ name = None #: Name that identifies the current diagnostic sampling_frequency = 0.0 output_frequency = 0.0 output_path = None output_variables = None var_meta = None #: Metadata of internal variables extra_dimensions = None #: Dict of extra dimensions used in var_meta def __init__(self, state): pass
[docs] @abc.abstractmethod def initialize(self, state): """Called at the end of setup. Use this to process user settings and handle setup.""" pass
[docs] @abc.abstractmethod def diagnose(self, state): """Called with frequency ``sampling_frequency``.""" pass
[docs] @abc.abstractmethod def output(self, state): """Called with frequency ``output_frequency``.""" pass
def initialize_variables(self, state): if self.var_meta is None: self.variables = None return dimensions = dict(state.dimensions) if self.extra_dimensions is not None: dimensions.update(self.extra_dimensions) self.variables = VerosVariables(self.var_meta, dimensions) # we leave diagnostic variables unlocked self.variables.__locked__ = False def get_output_file_name(self, state): statedict = dict(state.variables.items()) statedict.update(state.settings.items()) return self.output_path.format(**statedict) @do_not_disturb def initialize_output(self, state): inactive = not self.output_frequency and not self.sampling_frequency no_output = not self.output_path or not self.output_variables if runtime_settings.diskless_mode or inactive or no_output: return output_path = self.get_output_file_name(state) if os.path.isfile(output_path) and not runtime_settings.force_overwrite: raise IOError( f'output file {output_path} for diagnostic "{self.name}" exists ' "(change output path or enable force_overwrite runtime setting)" ) # possible race condition ahead! distributed.barrier() with nctools.threaded_io(output_path, "w") as outfile: nctools.initialize_file(state, outfile, extra_dimensions=self.extra_dimensions) for key in self.output_variables: var = self.var_meta[key] if key not in outfile.variables: nctools.initialize_variable(state, key, var, outfile) if not var.time_dependent: var_data = self.variables.get(key) nctools.write_variable(state, key, var, var_data, outfile) @do_not_disturb def write_output(self, state): vs = state.variables if runtime_settings.diskless_mode: return with nctools.threaded_io(self.get_output_file_name(state), "r+") as outfile: current_days = time.convert_time(vs.time, "seconds", "days") nctools.advance_time(current_days, outfile) for key in self.output_variables: var = self.var_meta[key] var_data = self.variables.get(key) nctools.write_variable(state, key, var, var_data, outfile)