Source code for genno.core.quantity

from functools import update_wrapper
from numbers import Number
from typing import Any, Hashable, Optional

import pandas as pd
import pint

from genno.compat.xarray import DataArrayLike

#: Name of the class used to implement :class:`.Quantity`.
CLASS = "AttrSeries"
# CLASS = "SparseDataArray"


[docs]class Quantity(DataArrayLike["Quantity"]): """A sparse data structure that behaves like :class:`xarray.DataArray`. Depending on the value of :data:`CLASS`, Quantity is either :class:`.AttrSeries` or :class:`.SparseDataArray`. """ _name: Optional[Hashable] def __new__(cls, *args, **kwargs): # Use _get_class() to retrieve either AttrSeries or SparseDataArray return object.__new__(Quantity._get_class(cls))
[docs] @classmethod def from_series(cls, series, sparse=True): """Convert `series` to the Quantity class given by :data:`.CLASS`.""" # NB signature is the same as xr.DataArray.from_series(); except sparse=True assert sparse return cls._get_class().from_series(series, sparse)
@property def name(self) -> Optional[Hashable]: """The name of this quantity.""" return self._name # pragma: no cover @name.setter def name(self, value: Optional[Hashable]) -> None: self._name = value # pragma: no cover @property def units(self): """Retrieve or set the units of the Quantity. Examples -------- Create a quantity without units: >>> qty = Quantity(...) Set using a string; automatically converted to pint.Unit: >>> qty.units = "kg" >>> qty.units <Unit('kilogram')> """ return self.attrs.setdefault( "_unit", pint.get_application_registry().dimensionless ) @units.setter def units(self, value): self.attrs["_unit"] = pint.get_application_registry().Unit(value) # Internal methods @staticmethod def _get_class(cls=None): """Get :class:`.AttrSeries` or :class:`.SparseDataArray`, per :data:`.CLASS`.""" if cls in (Quantity, None): if CLASS == "AttrSeries": from .attrseries import AttrSeries as cls elif CLASS == "SparseDataArray": from .sparsedataarray import SparseDataArray as cls else: # pragma: no cover raise ValueError(CLASS) return cls @staticmethod def _single_column_df(data, name): """Handle `data` and `name` arguments to Quantity constructors.""" if isinstance(data, pd.DataFrame): if len(data.columns) != 1: raise TypeError( f"Cannot instantiate Quantity from {len(data.columns)}-D data frame" ) # Unpack a single column; use its name if not overridden by `name` return data.iloc[:, 0], (name or data.columns[0]) else: return data, name @staticmethod def _collect_attrs(data, attrs_arg, kwargs): """Handle `attrs` and 'units' `kwargs` to Quantity constructors.""" # Use attrs, if any, from an existing object, if any new_attrs = getattr(data, "attrs", dict()).copy() # Overwrite with values from an explicit attrs argument new_attrs.update(attrs_arg or dict()) # Store the "units" keyword argument as an attr units = kwargs.pop("units", None) if units is not None: new_attrs["_unit"] = pint.Unit(units) return new_attrs
[docs]def assert_quantity(*args): """Assert that each of `args` is a Quantity object. Raises ------ TypeError with a indicative message. """ for i, arg in enumerate(args): if not isinstance(arg, Quantity): raise TypeError( f"arg #{i+1} ({repr(arg)}) is not Quantity; likely an incorrect key" )
[docs]def maybe_densify(func): """Wrapper for computations that densifies :class:`.SparseDataArray` input.""" def wrapped(*args, **kwargs): if CLASS == "SparseDataArray": def densify(arg): return arg._sda.dense if isinstance(arg, Quantity) else arg def sparsify(result): return result._sda.convert() else: def densify(arg): return arg sparsify = densify return sparsify(func(*map(densify, args), **kwargs)) update_wrapper(wrapped, func) return wrapped
def possible_scalar(value) -> Quantity: """Convert `value`, possibly a scalar, to :class:`Quantity`.""" return Quantity(value) if isinstance(value, Number) else value def unwrap_scalar(qty: Quantity) -> Any: """Unwrap `qty` to a scalar, if it is one.""" return qty if len(qty.dims) else qty.item()