Source code for genno.compat.pyam.operator

import logging
import re
from functools import partial
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Callable,
    Collection,
    Iterable,
    Mapping,
    Optional,
    Union,
)
from warnings import warn

import pyam

import genno
import genno.operator
from genno.core.key import Key, KeyLike
from genno.core.operator import Operator

from . import util

if TYPE_CHECKING:
    import pandas

    from genno.core.computer import Computer
    from genno.core.quantity import AnyQuantity

log = logging.getLogger(__name__)


__all__ = [
    "as_pyam",
    "quantity_from_iamc",
]


[docs] @Operator.define() def as_pyam( scenario, quantity: "AnyQuantity", *, rename: Optional[Mapping[str, str]] = None, collapse: Optional[Callable] = None, replace=dict(), drop: Union[Collection[str], str] = "auto", unit=None, prepend_name: bool = True, model_name: Optional[str] = None, scenario_name: Optional[str] = None, ): """Return a :class:`pyam.IamDataFrame` containing the data from `quantity`. Warnings are logged if the arguments result in additional, unhandled columns in the resulting data frame that are not part of the IAMC spec. The conversion has the following steps: 1. `quantity` is converted to a temporary :class:`pandas.DataFrame`. 2. Labels for the following IAMC dimensions are filled: - ``model``, ``scenario``: from attributes of the `scenario`, `model_name`, and/or `scenario_name` argument(s). - ``variable``: from the :attr:`~.Quantity.name` of `quantity`, if any. - ``unit``: from the :attr:`~.Quantity.units` of `quantity`, if any. 3. The actions specified by the optional arguments `rename`, `collapse`, `replace`, `drop`, and `unit`, if any, are applied in that order. 4. The resulting data frame is converted to :class:`pyam.IamDataFrame`. Parameters ---------- scenario : Any object with :py:`model` and :py:`scenario` attributes of type :class:`str`, for instance an :class:`ixmp.Scenario` or :class:`~message_ix_models.util.scenarioinfo.ScenarioInfo`; **or** a :class:`str`, which is equivalent to `scenario_name`. quantity : .Quantity Quantity to convert to IAMC data structure. rename : dict, optional Mapping from dimension names in `quantity` (:class:`str`) to column names (:class:`str`); either IAMC dimension names, or others that are consumed by `collapse`. collapse : callable, optional Function that takes a :class:`pandas.DataFrame` and returns the same type. This function **may** collapse 2 or more dimensions, for example to construct labels for the IAMC ``variable`` dimension, or any other. replace : optional Values to be replaced and their replaced. Passed directly to :meth:`pandas.DataFrame.replace`. drop : str or collection of str, optional Columns to drop. Passed to :func:`.util.drop`, so if not given, all non-IAMC columns are dropped. unit : str, optional Label for the IAMC ``unit`` dimension. Passed to :func:`~.pyam.util.clean_units`. prepend_name : bool, optional If :any:`True`, the :attr:`.Quantity.name` of `quantity` is prepended to the IAMC ``variable`` dimension. model_name : str, optional Value for the IAMC ``model`` dimension. scenario_name : str, optional Value for the IAMC ``scenario`` dimension. Raises ------ ValueError If the resulting data frame has duplicate keys in the IAMC dimensions. :class:`pyam.IamDataFrame` cannot handle such data. TypeError If both `scenario` and `scenario_name` are non-empty :class:`str`. """ import pyam # Values to assign on all rows assign = dict(unit=quantity.units) if prepend_name: assign.update(variable=quantity.name) try: assign.update(model=scenario.model, scenario=scenario.scenario) except AttributeError: if scenario and scenario_name: raise TypeError(f"Both {scenario=!r} and {scenario_name=!r} given") assign.update(model=model_name or "", scenario=scenario or scenario_name or "") # - Convert to pd.DataFrame # - Rename one dimension to 'year' or 'time' # - Fill variable, unit, model, and scenario columns # - Replace values # - Apply the collapse callback, if given # - Drop any unwanted columns # - Clean units df = ( quantity.to_series() .rename("value") .reset_index() .assign(**assign) .rename(columns=rename or dict()) .pipe(collapse or util.collapse) .replace(replace, regex=True) .pipe(util.drop, columns=drop) .pipe(util.clean_units, unit) ) # Raise exception for non-unique data duplicates = df.duplicated(subset=set(df.columns) - {"value"}) if duplicates.any(): raise ValueError( "Duplicate IAMC indices cannot be converted:\n" + str(df[duplicates].drop(columns=["model", "scenario"])) ) return pyam.IamDataFrame(df)
[docs] @as_pyam.helper def add_as_pyam( func, c: "Computer", quantities: Union[KeyLike, Iterable[KeyLike]], tag="iamc", /, **kwargs, ): """:meth:`.Computer.add` helper for :func:`.as_pyam`. Add conversion of one or more `quantities` to the IAMC data structure. Parameters ---------- quantities : str or .Key or list of str or .Key Keys for quantities to transform. tag : str, optional Tag to append to new Keys. Other parameters ---------------- kwargs : Any keyword arguments accepted by :func:`.as_pyam`. Returns ------- list of .Key Each task converts a :class:`.Quantity` into a :class:`pyam.IamDataFrame`. """ # Handle single vs. iterable of inputs if isinstance(quantities, (str, Key)): quantities = [quantities] multi_arg = False else: multi_arg = True if len(kwargs.get("replace", {})) and not isinstance( next(iter(kwargs["replace"].values())), dict ): kwargs["replace"] = dict(variable=kwargs.pop("replace")) warn( f"replace must be nested dict(), e.g. {repr(kwargs['replace'])}", DeprecationWarning, ) # Check keys quantities = c.check_keys(*quantities) # The callable for the task. If pyam is not available, require_compat() above will # fail; so this will never be None comp = partial(func, **kwargs) keys = [] for qty in quantities: # Key for the input quantity, e.g. foo:x-y-z key = Key(qty) # Key for the task/output, e.g. foo::iamc keys.append(Key(key.name, tag=tag)) # Add the task and store the key c.add_single(keys[-1], (comp, "scenario", key)) return tuple(keys) if multi_arg else keys[0]
@genno.operator.concat.register def _(*args: pyam.IamDataFrame, **kwargs) -> "pyam.IamDataFrame": """Concatenate `args`, which must all be :class:`pyam.IamDataFrame`. Otherwise, equivalent to :func:`genno.operator.concat`. """ # Use pyam.concat() top-level function return pyam.concat(args, **kwargs)
[docs] def quantity_from_iamc( qty: Union["AnyQuantity", "pyam.IamDataFrame", "pandas.DataFrame"], variable: str, *, fail: Union[int, str] = "warning", ) -> "AnyQuantity": """Extract data for a single measure from `qty` with IAMC-like structure. Parameters ---------- qty : Must have at least 2 dimensions named ‘v’ (or ‘variable’, any case) and ‘u’ (or ‘unit’, any case). variable : str Regular expression to match full labels on the ``v`` dimension of `qty`. If the expression contains match groups, they are used to rewrite ``v`` labels: only the contents of the first match group are kept. This may be used to discard a portion of the label. Returns ------- .Quantity The ‘variable’ dimension contains reduced labels. The :attr:`.Quantity.units` attribute contains the unique units for the subset of data. See also -------- unique_units_from_dim """ import pandas as pd from genno.operator import relabel, select, unique_units_from_dim from .util import IAMC_DIMS if isinstance(qty, pd.DataFrame): # Convert pandas.DataFrame to pyam.IamDataFrame qty = pyam.IamDataFrame(qty) if isinstance(qty, pyam.IamDataFrame): # Convert IamDataFrame to Quantity df = qty.as_pandas() qty = genno.Quantity(df.set_index(list(IAMC_DIMS & set(df.columns)))["value"]) # Identify a dimension whose name is in `targets` def identify_dim(targets: Collection[str]) -> str: result = list(filter(lambda d: d.lower() in targets, qty.dims)) if len(result) != 1: raise ValueError( f"cannot identify 1 unique dimension for {targets!r} among " f"{qty.dims!r}; found {result!r}" ) return result[0] v_dim = identify_dim(("v", "variable")) u_dim = identify_dim(("u", "unit")) # Compile expression expr = re.compile(variable) has_group = expr.groups > 0 # Process each label along v_dim variables, replacements = [], {} for var in qty.coords[v_dim].data: if match := expr.fullmatch(var): variables.append(match.group(0)) replacements.update({match.group(0): match.group(1)} if has_group else {}) if not variables: log.warning( f"0 of {len(qty.coords[v_dim])} labels on dimension {v_dim!r} were a full " f"match for {expr!r}" ) return ( qty.pipe(select, {v_dim: variables}) .pipe(relabel, {v_dim: replacements}) .pipe(unique_units_from_dim, u_dim, fail=fail) )
@genno.operator.write_report.register def _(quantity: pyam.IamDataFrame, path, kwargs=None) -> None: """Write `obj` to the file at `path`. If `obj` is a :class:`pyam.IamDataFrame` and `path` ends with ".csv" or ".xlsx", use :mod:`pyam` methods to write the file to CSV or Excel format, respectively. Otherwise, equivalent to :func:`genno.operator.write_report`. """ path = Path(path) if kwargs is not None and len(kwargs): raise NotImplementedError( "Keyword arguments to write_report(pyam.IamDataFrame, …)" ) if path.suffix == ".csv": quantity.to_csv(path) elif path.suffix == ".xlsx": quantity.to_excel(path) else: raise ValueError( f"pyam.IamDataFrame can be written to .csv or .xlsx, not {path.suffix}" ) def __getattr__(name: str): if name in ("concat", "write_report"): warn( f"Importing {name!r} from genno.compat.pyam.operator; import from " "genno.operator instead.", DeprecationWarning, 2, ) return getattr(genno.operator, name) else: raise AttributeError(name)