Source code for genno.core.quantity
import operator
from functools import update_wrapper
from numbers import Number
from typing import Any, Hashable, Optional
import pandas as pd
import pint
from genno.compat.xarray import DataArrayLike
#: Name of the class used to implement :class:`.Quantity`.
CLASS = "AttrSeries"
# CLASS = "SparseDataArray"
[docs]class Quantity(DataArrayLike["Quantity"]):
"""A sparse data structure that behaves like :class:`xarray.DataArray`.
Depending on the value of :data:`.CLASS`, Quantity is either :class:`.AttrSeries` or
:class:`.SparseDataArray`.
"""
_name: Optional[Hashable]
def __new__(cls, *args, **kwargs):
# Use _get_class() to retrieve either AttrSeries or SparseDataArray
return object.__new__(Quantity._get_class(cls))
[docs] @classmethod
def from_series(cls, series, sparse=True):
"""Convert `series` to the Quantity class given by :data:`.CLASS`."""
# NB signature is the same as xr.DataArray.from_series(); except sparse=True
assert sparse
return cls._get_class().from_series(series, sparse)
@property
def name(self) -> Optional[Hashable]:
"""The name of this quantity."""
return self._name # pragma: no cover
@name.setter
def name(self, value: Optional[Hashable]) -> None:
self._name = value # pragma: no cover
@property
# def units(self) -> pint.Unit: # NB can't do this currently; see python/mypy#3004
def units(self):
"""Retrieve or set the units of the Quantity.
Examples
--------
Create a quantity without units:
>>> qty = Quantity(...)
Set using a string; automatically converted to pint.Unit:
>>> qty.units = "kg"
>>> qty.units
<Unit('kilogram')>
"""
return self.attrs.setdefault(
"_unit", pint.get_application_registry().dimensionless
)
@units.setter
def units(
self,
value,
# value: Union[pint.Unit, str], # NB ditto re: python/mypy#3004
):
self.attrs["_unit"] = pint.get_application_registry().Unit(value)
# Internal methods
@staticmethod
def _get_class(cls=None):
"""Get :class:`.AttrSeries` or :class:`.SparseDataArray`, per :data:`.CLASS`."""
if cls in (Quantity, None):
if CLASS == "AttrSeries":
from .attrseries import AttrSeries as cls
elif CLASS == "SparseDataArray":
from .sparsedataarray import SparseDataArray as cls
else: # pragma: no cover
raise ValueError(CLASS)
return cls
@staticmethod
def _single_column_df(data, name):
"""Handle `data` and `name` arguments to Quantity constructors."""
if isinstance(data, pd.DataFrame):
if len(data.columns) != 1:
raise TypeError(
f"Cannot instantiate Quantity from {len(data.columns)}-D data frame"
)
# Unpack a single column; use its name if not overridden by `name`
return data.iloc[:, 0], (name or data.columns[0])
else:
return data, name
@staticmethod
def _collect_attrs(data, attrs_arg, kwargs):
"""Handle `attrs` and 'units' `kwargs` to Quantity constructors."""
# Use attrs, if any, from an existing object, if any
new_attrs = getattr(data, "attrs", dict()).copy()
# Overwrite with values from an explicit attrs argument
new_attrs.update(attrs_arg or dict())
# Store the "units" keyword argument as an attr
units = kwargs.pop("units", None)
if units is not None:
new_attrs["_unit"] = pint.Unit(units)
return new_attrs
def _binop_units(self, name: str, other) -> pint.Unit:
"""Determine result units for a binary operation between `self` and `other`."""
if name == "pow":
# Currently handled by operator.pow()
return self.units
# Retrieve units of `other`
other_units = other.units
# Ensure there is not a mix of pint.Unit and pint.registry.Unit; this throws
# off pint's internal logic
if other_units.__class__ is not self.units.__class__:
other_units = self.units.__class__(other_units)
# Allow pint to determine the output units
return getattr(operator, name)(self.units, other_units)
[docs]def assert_quantity(*args):
"""Assert that each of `args` is a Quantity object.
Raises
------
TypeError
with a indicative message.
"""
for i, arg in enumerate(args):
if not isinstance(arg, Quantity):
raise TypeError(
f"arg #{i+1} ({repr(arg)}) is not Quantity; likely an incorrect key"
)
[docs]def maybe_densify(func):
"""Wrapper for operations that densifies :class:`.SparseDataArray` input."""
def wrapped(*args, **kwargs):
if CLASS == "SparseDataArray":
def densify(arg):
return arg._sda.dense if isinstance(arg, Quantity) else arg
def sparsify(result):
return result._sda.convert()
else:
def densify(arg):
return arg
sparsify = densify
return sparsify(func(*map(densify, args), **kwargs))
update_wrapper(wrapped, func)
return wrapped
def possible_scalar(value) -> Quantity:
"""Convert `value`, possibly a scalar, to :class:`Quantity`."""
return Quantity(value) if isinstance(value, Number) else value
def unwrap_scalar(qty: Quantity) -> Any:
"""Unwrap `qty` to a scalar, if it is one."""
return qty if len(qty.dims) else qty.item()