import logging
from collections.abc import Callable, Iterable, Mapping, MutableMapping, Sequence
from copy import copy
from dataclasses import dataclass
from functools import partial
from pathlib import Path
from typing import Any, Optional, Union
from warnings import warn
from genno import operator
from genno.core.computer import Computer
from genno.core.exceptions import KeyExistsError, MissingKeyError
from genno.core.key import Key, iter_keys
from genno.util import REPLACE_UNITS
log = logging.getLogger(__name__)
#: Registry of configuration section handlers.
HANDLERS: dict[str, "ConfigHandler"] = {}
#: .. deprecated:: 1.25.0
#: Instead, use:
#:
#: .. code-block:: python
#:
#: from genno.config import handles, store
#:
#: handles("section_name", False, False)(store)
#:
#: Configuration sections/keys to be stored with no action.
STORE: set[str] = set()
def _convert_deprecated_store_global():
if len(STORE):
warn(
'genno.config.STORE; use @handles("section_name", False, False)(store)',
DeprecationWarning,
stacklevel=2,
)
for name in STORE:
handles(name, iterate=False, discard=False)(store)
STORE.clear()
[docs]
@dataclass
class ConfigHandler:
"""Class for a configuration key/section handler."""
#: Configuration key or section handled.
key: str
#: Callable
callback: Callable[[Optional[Computer], Union[Mapping, Sequence]], Any]
#: If :any:`True`, apply :attr:`callback` iteratively to each member of the
#: value/section.
iterate: bool
#: If :any:`True`, discard the configuration contents after handling.
discard: bool
def handle(self, data: Union[Mapping, Sequence], c: Optional[Computer]):
if self.iterate:
if isinstance(data, Mapping):
iterator: Iterable = data.items()
elif isinstance(data, Sequence):
iterator = data
else: # pragma: no cover
raise NotImplementedError(type(data))
yield from [
(("apply", self.callback), dict(info=item)) for item in iterator
]
else:
self.callback(c, data)
class PathHandler(ConfigHandler):
"""Special :class:`ConfigHandler` that reads from a YAML file."""
def __init__(self):
pass
def handle(self, data, c):
path = data.pop("path", None)
if path is None:
return data
import yaml
# Load configuration from file
path = Path(path)
with open(path, "r") as f:
new_data = yaml.safe_load(f)
# Overwrite the file content with direct configuration values
new_data.update(data)
data = new_data
# Also store the directory where the configuration file was located
data.update(config_dir=path.parent)
return data
[docs]
def handles(section_name: str, iterate: bool = True, discard: bool = True):
"""Decorator to register a configuration section handler in :data:`HANDLERS`.
Parameters
----------
section_name: str
The name of the configuration section to handle. Using a name already present
in :data:`HANDLERS` overrides that handler.
iterate : bool, optional
If :obj:`True`, the handler is called once for each item (either list item, or
(key, value) tuple) in the section. If :obj:`False`, the entire section
contents, of whatever type, are passed to tha handler.
discard : bool, optional
If :obj:`True`, configuration section data is discarded after the handler is
called. If :obj:`False`, the data is retained and stored on the Computer.
"""
def wrapper(f: Callable):
ch = ConfigHandler(section_name, f, iterate, discard)
try:
log.debug(f"Override {HANDLERS[ch.key]!r}")
except KeyError:
pass
finally:
HANDLERS[section_name] = ch
return f
return wrapper
def parse_config(
c: Optional[Computer],
data: MutableMapping[str, Any],
fail: Optional[Union[str, int]] = None,
):
_convert_deprecated_store_global()
# Handle configuration from a file
data = PathHandler().handle(data, c)
# Assemble a queue of (args, kwargs) for Computer.add_queue()
queue: list[tuple[tuple, dict]] = []
# Sections to discard, e.g. with handler._store = False
discard = set()
for section_name, section_data in data.items():
handler = HANDLERS.get(section_name)
if not handler:
log.info(f"No handler for configuration section '{section_name}:'; ignored")
continue
if handler.discard:
discard.add(section_name)
# Allow the handler to extend the queue of computations to be added
queue.extend(handler.handle(section_data, c))
# Discard sections so marked
[data.pop(section_name) for section_name in discard]
if c:
# Store certain keys in the "config" dictionary of the graph itself
c.graph["config"].update(data)
# Process the entries
c.add_queue(queue, max_tries=2, fail=fail)
elif len(queue):
raise RuntimeError("Cannot apply non-global configuration without a Computer")
[docs]
@handles("aggregate")
def aggregate(c: Computer, info):
"""Handle one entry from the ``aggregate:`` config section."""
# Copy for destructive .pop()
info = copy(info)
# Unpack `info`
quantities = c.infer_keys(info.pop("_quantities"))
tag = info.pop("_tag")
# Keyword arguments for add()
kw = dict(
fail=info.pop("_fail", None),
groups={info.pop("_dim"): info},
keep=True,
strict=True,
sums=True,
)
def _log_or_raise(exc: Exception, default_level: str, message: str):
"""Either raise `exc` if ``kw["fail"]`` > `default_level`, or log `message`."""
fail_level = getattr(logging, (kw["fail"] or default_level).upper())
if fail_level >= logging.ERROR:
raise exc
else:
log.log(fail_level, message)
try:
quantities = c.check_keys(*quantities)
except MissingKeyError as e:
# Default to fail="error" here: stricter
_log_or_raise(e, "error", f"No key(s) {e.args!r} to aggregate")
# Iterate over quantities to be aggregated
for qty in map(Key, quantities):
try:
result = c.add(qty.add_tag(tag), "aggregate", qty, **kw)
except KeyExistsError:
pass
except MissingKeyError as e:
# Default to fail="warning": more permissive
_log_or_raise(e, "warning", repr(e))
else:
if keys := list(iter_keys(result)):
log.info(f"Add {repr(keys[0])} + {len(keys) - 1} partial sums")
[docs]
@handles("alias")
def alias(c: Computer, info):
"""Handle one entry from the ``alias:`` config section."""
c.add(info[0], info[1])
[docs]
@handles("combine")
def combine(c: Computer, info):
"""Handle one entry from the ``combine:`` config section."""
# Split inputs into three lists
quantities, select, weights = [], [], []
# Key for the new quantity
key = Key(info["key"])
# Loop over inputs to the combination
for i in info["inputs"]:
# Required dimensions for this input: output key's dims, plus any
# dims that must be selected on
selector = i.get("select", {})
dims = set(key.dims) | set(selector.keys())
quantities.append(c.infer_keys(i["quantity"], dims))
select.append(selector)
weights.append(i.get("weight", 1))
# Check for malformed input
assert len(quantities) == len(select) == len(weights)
# Computation
task = tuple(
[partial(operator.combine, select=select, weights=weights)] + quantities
)
added = iter_keys(c.add(key, task, strict=True, sums=True))
log.info(f"Add {repr(key)} + {len(list(added)) - 1} partial sums")
log.debug(" as combination of")
log.debug(f" {repr(quantities)}")
[docs]
@handles("default", iterate=False)
def default(c: Computer, info):
"""Handle the ``default:`` config section."""
c.default_key = info
[docs]
@handles("files")
def files(c: Computer, info):
"""Handle one entry from the ``files:`` config section."""
# Files with exogenous data
path = Path(info["path"])
if not path.is_absolute():
# Resolve relative paths relative to the directory containing the configuration
# file
path = c.graph["config"].get("config_dir", Path.cwd()) / path
info["path"] = path
c.add("load_file", **info)
[docs]
@handles("general")
def general(c: Computer, info):
"""Handle one entry from the ``general:`` config section."""
# Inputs
# TODO allow to specify a more narrow key and *not* have infer_keys applied; perhaps
# using "*"
inputs = c.infer_keys(info.get("inputs", []))
if info["comp"] in ("mul", "product"):
result = c.add(info["key"], "mul", *inputs)
log.info(f"Add {repr(result)} using .add_product()")
else:
# The resulting key
key = info["key"]
key = key if Key.bare_name(key) else Key(key)
# Infer the dimensions of the resulting key if ":*:" is given for the dims
if isinstance(key, Key) and key.dims == ("*",):
key = Key.product(key.name, *inputs, tag=key.tag)
# log.debug(f"Inferred dimensions ({', '.join(key.dims)}) for '*'")
# If info["comp"] is None, the task is a list that collects other keys
_seq: type = list
task = []
if info["comp"] is not None:
_seq = tuple # Task is a computation
# Retrieve the function for the computation
f = c.get_operator(info["comp"])
if f is None:
raise ValueError(info["comp"])
task = [partial(f, **info.get("args", {}))]
log.info(f"Add {repr(key)} using {f.__module__}.{f.__name__}(…)")
task.extend(inputs)
added = c.add(key, _seq(task), strict=True, sums=info.get("sums", False))
if isinstance(added, tuple):
log.info(f" + {len(added) - 1} partial sums")
[docs]
@handles("report")
def report(c: Computer, info):
"""Handle one entry from the ``report:`` config section."""
log.info(f"Add report {info['key']} with {len(info['members'])} table(s)")
# Concatenate pyam data structures
c.add(info["key"], tuple([c.get_operator("concat")] + info["members"]), strict=True)
@handles("cache_path", iterate=False, discard=False)
@handles("cache_skip", iterate=False, discard=False)
@handles("config_dir", iterate=False, discard=False)
def store(c: Computer, info):
"""Config sections/keys to be stored with no modification."""
pass
[docs]
@handles("units", iterate=False)
def units(c: Computer, info):
"""Handle the ``units:`` config section."""
import pint
# Define units
registry = pint.get_application_registry()
try:
defs = info["define"].strip()
registry.define(defs)
except KeyError:
pass
except (TypeError, pint.DefinitionSyntaxError, pint.RedefinitionError) as e:
log.warning(e)
else:
log.info(f"Apply global unit definitions {defs}")
# Add replacements
for old, new in info.get("replace", {}).items():
log.info(f"Replace unit {repr(old)} with {repr(new)}")
REPLACE_UNITS[old] = new