# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown copyright. The Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""init for cli and clize"""
import pathlib
import shlex
import time
from collections import OrderedDict
from functools import partial
import clize
from clize import parameters
from clize.help import ClizeHelp, HelpForAutodetectedDocstring
from clize.parser import value_converter
from clize.runner import Clize
from sigtools.wrappers import decorator
# Imports are done in their functions to make calls to -h quicker.
# selected clize imports/constants
IGNORE = clize.Parameter.IGNORE
LAST_OPTION = clize.Parameter.LAST_OPTION
REQUIRED = clize.Parameter.REQUIRED
UNDOCUMENTED = clize.Parameter.UNDOCUMENTED
# help helpers
[docs]def docutilize(obj):
"""Convert Numpy or Google style docstring into reStructuredText format.
Args:
obj (str or object):
Takes an object and changes it's docstrings to a reStructuredText
format.
Returns:
str or object:
A converted string or an object with replaced docstring depending
on the type of the input.
"""
from inspect import cleandoc, getdoc
from sphinx.ext.napoleon.docstring import GoogleDocstring, NumpyDocstring
if isinstance(obj, str):
doc = cleandoc(obj)
else:
doc = getdoc(obj)
doc = str(NumpyDocstring(doc))
doc = str(GoogleDocstring(doc))
doc = doc.replace(":exc:", "")
doc = doc.replace(":data:", "")
doc = doc.replace(":keyword", ":param")
doc = doc.replace(":kwtype", ":type")
if isinstance(obj, str):
return doc
obj.__doc__ = doc
return obj
[docs]class HelpForNapoleonDocstring(HelpForAutodetectedDocstring):
"""Subclass to add support for google style docstrings"""
[docs] def add_docstring(self, docstring, *args, **kwargs):
"""Adds the updated docstring."""
docstring = docutilize(docstring)
super().add_docstring(docstring, *args, **kwargs)
[docs]class DocutilizeClizeHelp(ClizeHelp):
"""Subclass to build Napoleon docstring from subject."""
def __init__(self, subject, owner, builder=HelpForNapoleonDocstring.from_subject):
super().__init__(subject, owner, builder)
# input handling
[docs]class ObjectAsStr(str):
"""Hide object under a string to pass it through Clize parser."""
__slots__ = ("original_object",)
def __new__(cls, obj, name=None):
if isinstance(obj, cls): # pass object through if already wrapped
return obj
if name is None:
name = cls.obj_to_name(obj)
self = str.__new__(cls, name)
self.original_object = obj
return self
def __hash__(self):
# make sure our hash doesn't clash with normal string hash
return super().__hash__(self) ^ hash(type(self))
[docs] @staticmethod
def obj_to_name(obj, cls=None):
"""Helper function to create the string."""
if isinstance(obj, str):
return obj
if cls is None:
cls = type(obj)
try:
obj_id = hash(obj)
except TypeError:
obj_id = id(obj)
return "<%s.%s@%i>" % (cls.__module__, cls.__name__, obj_id)
[docs]def maybe_coerce_with(converter, obj, **kwargs):
"""Apply converter if str, pass through otherwise."""
obj = getattr(obj, "original_object", obj)
return converter(obj, **kwargs) if isinstance(obj, str) else obj
[docs]@value_converter
def comma_separated_list(to_convert):
"""Converts comma separated string to list or returns passed object.
Args:
to_convert (string or list)
comma separated string or list
Returns:
list
"""
return maybe_coerce_with(lambda s: s.split(","), to_convert)
[docs]@value_converter
def comma_separated_list_of_float(to_convert):
"""Converts comma separated string to list of floats or returns passed object.
Args:
to_convert (string or list)
comma separated string or list
Returns:
list
"""
return maybe_coerce_with(
lambda string: [float(s) for s in string.split(",")], to_convert
)
# output handling
[docs]@decorator
def with_output(
wrapped,
*args,
output=None,
pass_through_output=False,
compression_level=1,
least_significant_digit: int = None,
**kwargs,
):
"""Add `output` keyword only argument.
Add `compression_level` option.
Add `least_significant_digit` option.
This is used to add extra `output`, `compression_level` and `least_significant_digit` CLI
options. If `output` is provided, it saves the result of calling `wrapped` to file and returns
None, otherwise it returns the result. If `compression_level` is provided, it compresses the
data with the provided compression level (or not, if `compression_level` 0). If
`least_significant_digit` provided, it will quantize the data to a certain number of
significant figures.
Args:
wrapped (obj):
The function to be wrapped.
output (str, optional):
Output file name. If not supplied, the output object will be
printed instead.
pass_through_output (bool):
Pass through the output object even if saved to file.
Used in pipelines of commands if intermediate output needs to be saved.
compression_level (int):
Will set the compression level (1 to 9), or disable compression (0).
least_significant_digit (int):
If specified will truncate the data to a precision given by
10**(-least_significant_digit), e.g. if least_significant_digit=2, then the data will
be quantized to a precision of 0.01 (10**(-2)). See
http://www.esrl.noaa.gov/psd/data/gridded/conventions/cdc_netcdf_standard.shtml
for details. When used with `compression level`, this will result in lossy
compression.
Returns:
Result of calling `wrapped` or None if `output` is given.
"""
from improver.utilities.save import save_netcdf
result = wrapped(*args, **kwargs)
if output and result:
save_netcdf(result, output, compression_level, least_significant_digit)
if pass_through_output:
return ObjectAsStr(result, output)
return
return result
# cli object creation
[docs]def clizefy(obj=None, helper_class=DocutilizeClizeHelp, **kwargs):
"""Decorator for creating CLI objects.
"""
if obj is None:
return partial(clizefy, helper_class=helper_class, **kwargs)
if hasattr(obj, "cli"):
return obj
if not callable(obj):
return Clize.get_cli(obj, **kwargs)
return Clize.keep(obj, helper_class=helper_class, **kwargs)
# help command
[docs]@clizefy(help_names=())
def improver_help(prog_name: parameters.pass_name, command=None, *, usage=False):
"""Show command help."""
prog_name = prog_name.split()[0]
args = filter(None, [command, "--help", usage and "--usage"])
result = execute_command(SUBCOMMANDS_DISPATCHER, prog_name, *args)
if not command and usage:
result = "\n".join(
line
for line in result.splitlines()
if not line.endswith("--help [--usage]")
)
return result
[docs]def command_executor(*argv, verbose=False, dry_run=False):
"""Common entry point for straight command execution."""
return execute_command(
SUBCOMMANDS_DISPATCHER, *argv, verbose=verbose, dry_run=dry_run
)
[docs]def _cli_items():
"""Dynamically discover CLIs."""
import importlib
import pkgutil
from improver.cli import __path__ as improver_cli_pkg_path
yield ("help", improver_help)
for minfo in pkgutil.iter_modules(improver_cli_pkg_path):
mod_name = minfo.name
if mod_name != "__main__":
mcli = importlib.import_module("improver.cli." + mod_name)
yield (mod_name, clizefy(mcli.process))
SUBCOMMANDS_TABLE = OrderedDict(sorted(_cli_items()))
# main CLI object with subcommands
SUBCOMMANDS_DISPATCHER = clizefy(
SUBCOMMANDS_TABLE,
description="""IMPROVER NWP post-processing toolbox""",
footnotes="""See also improver --help for more information.""",
)
# IMPROVER top level main
[docs]def unbracket(args):
"""Convert input list with bracketed items into nested lists.
>>> unbracket('foo [ bar a b ] [ baz c ] -o z'.split())
['foo', ['bar', 'a', 'b'], ['baz', 'c'], '-o', 'z']
"""
outargs = []
stack = []
mismatch_msg = "Mismatched bracket at position %i."
for i, arg in enumerate(args):
if arg == "[":
stack.append(outargs)
outargs = []
elif arg == "]":
if not stack:
raise ValueError(mismatch_msg % i)
stack[-1].append(outargs)
outargs = stack.pop()
else:
outargs.append(arg)
if stack:
raise ValueError(mismatch_msg % len(args))
return outargs
[docs]class TimeIt:
def __init__(self, verbose=False):
self._verbose = verbose
self._elapsed = None
self._start = None
def __enter__(self):
self._start = time.perf_counter()
return self
def __exit__(self, *args):
self._elapsed = time.perf_counter() - self._start
if self._verbose:
print(str(self))
@property
def elapsed(self):
"""Return elapsed time in seconds."""
return self._elapsed
def __str__(self):
"""Print elapsed time in seconds."""
return f"Run-time: {self._elapsed}s"
[docs]def execute_command(dispatcher, prog_name, *args, verbose=False, dry_run=False):
"""Common entry point for command execution."""
args = list(args)
for i, arg in enumerate(args):
if isinstance(arg, (list, tuple)):
# process nested commands recursively
arg = execute_command(
dispatcher, prog_name, *arg, verbose=verbose, dry_run=dry_run
)
if isinstance(arg, pathlib.PurePath):
arg = str(arg)
elif not isinstance(arg, str):
arg = ObjectAsStr(arg)
args[i] = arg
msg = " ".join([shlex.quote(x) for x in (prog_name, *args)])
if dry_run:
if verbose:
print(msg)
return args
with TimeIt() as timeit:
result = dispatcher(prog_name, *args)
if verbose:
print(f"{timeit}; {msg}")
if result is not None:
print(ObjectAsStr.obj_to_name(result))
return result
[docs]@clizefy()
def main(
prog_name: parameters.pass_name,
command: LAST_OPTION,
*args,
profile: value_converter(lambda _: _, name="FILENAME") = None, # noqa: F821
memprofile: value_converter(lambda _: _, name="FILENAME") = None, # noqa: F821
verbose=False,
dry_run=False,
):
"""IMPROVER NWP post-processing toolbox
Results from commands can be passed into file-like arguments
of other commands by surrounding them by square brackets::
improver command [ command ... ] ...
Spaces around brackets are mandatory.
Args:
prog_name:
The program name from argv[0].
command (str):
Command to execute
args (tuple):
Command arguments
profile (str):
If given, will write profiling to the file given.
To write to stdout, use a hyphen (-)
memprofile (str):
Creates 2 files by adding a suffix to the provided arguemnt -
a tracemalloc snapshot at the point of highest memory consumption
of your program (suffixed with _SNAPSHOT)
and a track of the maximum memory used by your program
over time (suffixed with _MAX_TRACKER).
verbose (bool):
Print executed commands
dry_run (bool):
Print commands to be executed
See improver help [--usage] [command] for more information
on available command(s).
"""
args = unbracket(args)
exec_cmd = execute_command
if profile is not None:
from improver.profile import profile_hook_enable
profile_hook_enable(dump_filename=None if profile == "-" else profile)
if memprofile is not None:
from improver.memprofile import memory_profile_decorator
exec_cmd = memory_profile_decorator(exec_cmd, memprofile)
result = exec_cmd(
SUBCOMMANDS_DISPATCHER,
prog_name,
command,
*args,
verbose=verbose,
dry_run=dry_run,
)
return result
[docs]def run_main(argv=None):
"""Overrides argv[0] to be 'improver' then runs main.
Args:
argv (list of str):
Arguments that were from the command line.
"""
import sys
from clize import run
# clize help shows module execution as `python -m improver.cli`
# override argv[0] and pass it explicitly in order to avoid this
# so that the help command reflects the way that we call improver.
if argv is None:
argv = sys.argv[:]
argv[0] = "improver"
run(main, args=argv)