Source code for genno.compat.pyam.util

import logging
from typing import Collection, Mapping, Sequence, Union

import pandas as pd
import pint
import pyam

log = logging.getLogger(__name__)

IAMC_IDX = frozenset(pyam.IAMC_IDX + ["year", "time"])

[docs]def clean_units(df: pd.DataFrame, unit=None) -> pd.DataFrame: """Convert magnitudes and units of `df` to `unit` in :class:`str`. Raises ------ ValueError if there is more than one unit. """ # Convert units if len(df) == 0: return df if unit: from_unit = df["unit"].unique() if len(from_unit) > 1: raise ValueError(f"cannot convert non-unique units {list(from_unit)}") q = pint.Quantity(df["value"].values, from_unit[0]).to(unit) df["value"] = q.magnitude df["unit"] = unit # pyam requires units are str, not pint.Unit if not isinstance(df.loc[0, "unit"], str): df["unit"] = f"{df.loc[0, 'unit']:~}" return df
[docs]def collapse( df: pd.DataFrame, columns: Mapping[str, Sequence[str]] = dict(), sep="|" ) -> pd.DataFrame: """Collapse `columns` into the IAMC columns of `df`.""" to_drop = set() for target_col, values in columns.items(): if target_col not in IAMC_IDX: raise ValueError(f"non-IAMC column {repr(target_col)}") entries = [] for v in values: if v in df: entries.append(df[v]) to_drop.add(v) else: entries.append(pd.Series(str(v), index=df.index)) if target_col not in df or df[target_col].isna().all(): # df doesn't contain the column, or all entries are None, e.g. if # was not set. Initialize with first entry. df[target_col] = entries.pop(0) df[target_col] = df[target_col], sep=sep) return df.drop(to_drop, axis=1)
def _extra(obj): """Extra columns in `obj`.""" return sorted(set(obj.columns) - IAMC_IDX - {"value"})
[docs]def drop(df: pd.DataFrame, columns: Union[Collection[str], str]) -> pd.DataFrame: """Drop `columns` if given, or all non-IAMC columns.""" if isinstance(columns, str): if columns != "auto": raise ValueError(columns) # Drop all non-IAMC columns return df.drop(_extra(df), axis=1) else: result = df.drop(columns, axis=1) extra = _extra(result) if extra:"Extra columns {repr(extra)} when converting to IAMC format") return result