import logging
from copy import copy
from functools import partial
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
MutableMapping,
Optional,
Tuple,
Type,
Union,
)
import pint
import yaml
import genno.computations as computations
from genno.core.computer import Computer
from genno.core.exceptions import KeyExistsError
from genno.core.key import Key
from genno.util import REPLACE_UNITS
log = logging.getLogger(__name__)
#: Registry of configuration section handlers.
HANDLERS: Dict[str, Any] = {}
#: Configuration sections/keys to be stored with no action.
STORE = set(["cache_path", "cache_skip"])
[docs]def handles(section_name: str, iterate: bool = True, discard: bool = True):
"""Decorator to register a configuration section handler in :data:`HANDLERS`.
Parameters
----------
section_name: str
The name of the configuration section to handle. Using a name already present
in :data:`HANDLERS` overrides that handler.
iterate : bool, optional
If :obj:`True`, the handler is called once for each item (either list item, or
(key, value) tuple) in the section. If :obj:`False`, the entire section
contents, of whatever type, are passed to tha handler.
discard : bool, optional
If :obj:`True`, configuration section data is discarded after the handler is
called. If :obj:`False`, the data is retained and stored on the Computer.
"""
def wrapper(f: Callable):
if section_name in HANDLERS:
log.debug(
f"Override handler {repr(HANDLERS[section_name])} for "
f" '{section_name}:'"
)
HANDLERS[section_name] = f
setattr(f, "_iterate", iterate)
setattr(f, "_discard", discard)
return f
return wrapper
def parse_config(
c: Optional[Computer],
data: MutableMapping[str, Any],
fail: Optional[Union[str, int]] = None,
):
# Assemble a queue of (args, kwargs) for Computer.add_queue()
queue: List[Tuple[Tuple, Dict]] = []
try:
path = data.pop("path")
except KeyError:
pass
else:
# Load configuration from file
path = Path(path)
with open(path, "r") as f:
new_data = yaml.safe_load(f)
# Overwrite the file content with direct configuration values
new_data.update(data)
data = new_data
# Also store the directory where the configuration file was located
if c is None:
data["config_dir"] = path.parent
else:
# Early add to the graph
c.graph["config"]["config_dir"] = path.parent
# Sections to discard, e.g. with handler._store = False
discard = set()
for section_name, section_data in data.items():
handler = HANDLERS.get(section_name)
if not handler:
if section_name not in STORE:
log.info(
f"No handler for configuration section '{section_name}:'; ignored"
)
continue
if handler._discard:
discard.add(section_name)
if handler._iterate:
if isinstance(section_data, dict):
iterator: Iterable = section_data.items()
elif isinstance(section_data, list):
iterator = section_data
else: # pragma: no cover
raise NotImplementedError(handler.expected_type)
queue.extend((("apply", handler), dict(info=item)) for item in iterator)
else:
handler(c, section_data)
for section_name in discard:
data.pop(section_name)
if c:
# Process the entries
c.add_queue(queue, max_tries=2, fail=fail)
# Store configuration in the graph itself
c.graph["config"].update(data)
elif len(queue):
raise RuntimeError("Cannot apply non-global configuration without a Computer")
[docs]@handles("aggregate")
def aggregate(c: Computer, info):
"""Handle one entry from the ``aggregate:`` config section."""
# Copy for destructive .pop()
info = copy(info)
quantities = c.infer_keys(info.pop("_quantities"))
tag = info.pop("_tag")
fail = info.pop("_fail", None)
groups = {info.pop("_dim"): info}
for qty in quantities:
try:
keys = c.aggregate(qty, tag, groups, sums=True, fail=fail)
except KeyExistsError:
pass
else:
if len(keys):
log.info(f"Add {repr(keys[0])} + {len(keys)-1} partial sums")
[docs]@handles("alias")
def alias(c: Computer, info):
"""Handle one entry from the ``alias:`` config section."""
c.add(info[0], info[1])
[docs]@handles("combine")
def combine(c: Computer, info):
"""Handle one entry from the ``combine:`` config section."""
# Split inputs into three lists
quantities, select, weights = [], [], []
# Key for the new quantity
key = Key.from_str_or_key(info["key"])
# Loop over inputs to the combination
for i in info["inputs"]:
# Required dimensions for this input: output key's dims, plus any
# dims that must be selected on
selector = i.get("select", {})
dims = set(key.dims) | set(selector.keys())
quantities.append(c.infer_keys(i["quantity"], dims))
select.append(selector)
weights.append(i.get("weight", 1))
# Check for malformed input
assert len(quantities) == len(select) == len(weights)
# Computation
task = tuple(
[partial(computations.combine, select=select, weights=weights)] + quantities
)
added = c.add(key, task, strict=True, sums=True)
log.info(f"Add {repr(key)} + {len(added)-1} partial sums")
log.debug(" as combination of")
log.debug(f" {repr(quantities)}")
[docs]@handles("default", iterate=False)
def default(c: Computer, info):
"""Handle the ``default:`` config section."""
c.default_key = info
[docs]@handles("files")
def files(c: Computer, info):
"""Handle one entry from the ``files:`` config section."""
# Files with exogenous data
path = Path(info["path"])
if not path.is_absolute():
# Resolve relative paths relative to the directory containing the configuration
# file
path = c.graph["config"].get("config_dir", Path.cwd()) / path
info["path"] = path
c.add_file(**info)
[docs]@handles("general")
def general(c: Computer, info):
"""Handle one entry from the ``general:`` config section."""
# Inputs
inputs = c.infer_keys(info.get("inputs", []))
if info["comp"] in ("mul", "product"):
key = c.add_product(info["key"], *inputs)
log.info(f"Add {repr(key)} using .add_product()")
else:
# The resulting key
key = info["key"]
key = key if Key.bare_name(key) else Key.from_str_or_key(key)
# Infer the dimensions of the resulting key if ":*:" is given for the dims
if set(getattr(key, "dims", {})) == {"*"}:
key = Key.product(key.name, *inputs, tag=key.tag)
# log.debug(f"Inferred dimensions ({', '.join(key.dims)}) for '*'")
# If info["comp"] is None, the task is a list that collects other keys
_seq: Type = list
task = []
if info["comp"] is not None:
_seq = tuple # Task is a computation
# Retrieve the function for the computation
f = c.get_comp(info["comp"])
if f is None:
raise ValueError(info["comp"])
task = [partial(f, **info.get("args", {}))]
log.info(f"Add {repr(key)} using {f.__module__}.{f.__name__}(…)")
task.extend(inputs)
added = c.add(key, _seq(task), strict=True, sums=info.get("sums", False))
if isinstance(added, tuple):
log.info(f" + {len(added)-1} partial sums")
[docs]@handles("report")
def report(c: Computer, info):
"""Handle one entry from the ``report:`` config section."""
log.info(f"Add report {info['key']} with {len(info['members'])} table(s)")
# Concatenatqe pyam data structures
c.add(info["key"], tuple([c.get_comp("concat")] + info["members"]), strict=True)
[docs]@handles("units", iterate=False)
def units(c: Computer, info):
"""Handle the ``units:`` config section."""
# Define units
registry = pint.get_application_registry()
try:
defs = info["define"].strip()
registry.define(defs)
except KeyError:
pass
except (TypeError, pint.DefinitionSyntaxError, pint.RedefinitionError) as e:
log.warning(e)
else:
log.info(f"Apply global unit definitions {defs}")
# Add replacements
for old, new in info.get("replace", {}).items():
log.info(f"Replace unit {repr(old)} with {repr(new)}")
REPLACE_UNITS[old] = new