import logging
from copy import copy
from functools import partial
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union
import pint
import yaml
import genno.computations as computations
from genno.core.computer import Computer
from genno.core.exceptions import KeyExistsError
from genno.core.key import Key
from genno.util import REPLACE_UNITS
log = logging.getLogger(__name__)
#: Registry of configuration section handlers.
HANDLERS: Dict[str, Any] = {}
#: Configuration sections/keys to be stored with no action.
STORE = set(["cache_path", "cache_skip"])
[docs]def handles(section_name: str, iterate: bool = True, discard: bool = True):
"""Decorator to register a configuration section handler in :data:`HANDLERS`.
Parameters
----------
section_name: str
The name of the configuration section to handle. Using a name already present
in :data:`HANDLERS` overrides that handler.
iterate : bool, optional
If :obj:`True`, the handler is called once for each item (either list item, or
(key, value) tuple) in the section. If :obj:`False`, the entire section
contents, of whatever type, are passed to tha handler.
discard : bool, optional
If :obj:`True`, configuration section data is discarded after the handler is
called. If :obj:`False`, the data is retained and stored on the Computer.
"""
def wrapper(f: Callable):
if section_name in HANDLERS:
log.debug(
f"Override handler {repr(HANDLERS[section_name])} for "
f" '{section_name}:'"
)
HANDLERS[section_name] = f
setattr(f, "_iterate", iterate)
setattr(f, "_discard", discard)
return f
return wrapper
def parse_config(
c: Optional[Computer],
data: dict,
fail: Optional[Union[str, int]] = None,
):
# Assemble a queue of (args, kwargs) for Computer.add_queue()
queue: List[Tuple[Tuple, Dict]] = []
try:
path = data.pop("path")
except KeyError:
pass
else:
# Load configuration from file
path = Path(path)
with open(path, "r") as f:
new_data = yaml.safe_load(f)
# Overwrite the file content with direct configuration values
new_data.update(data)
data = new_data
# Also store the directory where the configuration file was located
if c is None:
data["config_dir"] = path.parent
else:
# Early add to the graph
c.graph["config"]["config_dir"] = path.parent
# Sections to discard, e.g. with handler._store = False
discard = set()
for section_name, section_data in data.items():
handler = HANDLERS.get(section_name)
if not handler:
if section_name not in STORE:
log.info(
f"No handler for configuration section '{section_name}:'; ignored"
)
continue
if handler._discard:
discard.add(section_name)
if handler._iterate:
if isinstance(section_data, dict):
iterator: Iterable = section_data.items()
elif isinstance(section_data, list):
iterator = section_data
else: # pragma: no cover
raise NotImplementedError(handler.expected_type)
queue.extend((("apply", handler), dict(info=item)) for item in iterator)
else:
handler(c, section_data)
for section_name in discard:
data.pop(section_name)
if c:
# Process the entries
c.add_queue(queue, max_tries=2, fail=fail)
# Store configuration in the graph itself
c.graph["config"].update(data)
elif len(queue):
raise RuntimeError("Cannot apply non-global configuration without a Computer")
[docs]@handles("aggregate")
def aggregate(c: Computer, info):
"""Handle one entry from the ``aggregate:`` config section."""
# Copy for destructive .pop()
info = copy(info)
quantities = c.infer_keys(info.pop("_quantities"))
tag = info.pop("_tag")
fail = info.pop("_fail", None)
groups = {info.pop("_dim"): info}
for qty in quantities:
try:
keys = c.aggregate(qty, tag, groups, sums=True, fail=fail)
except KeyExistsError:
pass
else:
if len(keys):
log.info(f"Add {repr(keys[0])} + {len(keys)-1} partial sums")
[docs]@handles("alias")
def alias(c: Computer, info):
"""Handle one entry from the ``alias:`` config section."""
c.add(info[0], info[1])
[docs]@handles("combine")
def combine(c: Computer, info):
"""Handle one entry from the ``combine:`` config section."""
# Split inputs into three lists
quantities, select, weights = [], [], []
# Key for the new quantity
key = Key.from_str_or_key(info["key"])
# Loop over inputs to the combination
for i in info["inputs"]:
# Required dimensions for this input: output key's dims, plus any
# dims that must be selected on
selector = i.get("select", {})
dims = set(key.dims) | set(selector.keys())
quantities.append(c.infer_keys(i["quantity"], dims))
select.append(selector)
weights.append(i.get("weight", 1))
# Check for malformed input
assert len(quantities) == len(select) == len(weights)
# Computation
task = tuple(
[partial(computations.combine, select=select, weights=weights)] + quantities
)
added = c.add(key, task, strict=True, sums=True)
log.info(f"Add {repr(key)} + {len(added)-1} partial sums")
log.debug(" as combination of")
log.debug(f" {repr(quantities)}")
[docs]@handles("default", iterate=False)
def default(c: Computer, info):
"""Handle the ``default:`` config section."""
c.default_key = info
[docs]@handles("files")
def files(c: Computer, info):
"""Handle one entry from the ``files:`` config section."""
# Files with exogenous data
path = Path(info["path"])
if not path.is_absolute():
# Resolve relative paths relative to the directory containing the configuration
# file
path = c.graph["config"].get("config_dir", Path.cwd()) / path
info["path"] = path
c.add_file(**info)
[docs]@handles("general")
def general(c: Computer, info):
"""Handle one entry from the ``general:`` config section."""
# Inputs
inputs = c.infer_keys(info.get("inputs", []))
if info["comp"] == "product":
key = c.add_product(info["key"], *inputs)
log.info(f"Add {repr(key)} using .add_product()")
else:
# The resulting key
key = info["key"]
key = key if Key.bare_name(key) else Key.from_str_or_key(key)
# Infer the dimensions of the resulting key if ":*:" is given for the dims
if set(getattr(key, "dims", {})) == {"*"}:
key = Key.product(key.name, *inputs, tag=key.tag)
# log.debug(f"Inferred dimensions ({', '.join(key.dims)}) for '*'")
# If info["comp"] is None, the task is a list that collects other keys
_seq: Type = list
task = []
if info["comp"] is not None:
_seq = tuple # Task is a computation
# Retrieve the function for the computation
f = c.get_comp(info["comp"])
if f is None:
raise ValueError(info["comp"])
task = [partial(f, **info.get("args", {}))]
log.info(f"Add {repr(key)} using {f.__module__}.{f.__name__}(…)")
task.extend(inputs)
added = c.add(key, _seq(task), strict=True, sums=info.get("sums", False))
if isinstance(added, tuple):
log.info(f" + {len(added)-1} partial sums")
[docs]@handles("report")
def report(c: Computer, info):
"""Handle one entry from the ``report:`` config section."""
log.info(f"Add report {info['key']} with {len(info['members'])} table(s)")
# Concatenatqe pyam data structures
c.add(info["key"], tuple([c.get_comp("concat")] + info["members"]), strict=True)
[docs]@handles("units", iterate=False)
def units(c: Computer, info):
"""Handle the ``units:`` config section."""
# Define units
registry = pint.get_application_registry()
try:
defs = info["define"].strip()
registry.define(defs)
except KeyError:
pass
except pint.DefinitionSyntaxError as e:
log.warning(e)
else:
log.info(f"Apply global unit definitions {defs}")
# Add replacements
for old, new in info.get("replace", {}).items():
log.info(f"Replace unit {repr(old)} with {repr(new)}")
REPLACE_UNITS[old] = new