Source code for genno.core.sparsedataarray

from typing import Any, Dict, Hashable, Mapping, Optional, Sequence, Tuple, Union
from warnings import filterwarnings

import numpy as np
import pandas as pd

try:
    import sparse

    HAS_SPARSE = True
except ImportError:  # pragma: no cover
    HAS_SPARSE = False

import xarray as xr
from xarray.core import dtypes
from xarray.core.utils import either_dict_or_kwargs

from genno.core.quantity import Quantity, possible_scalar

# sparse.COO raises this warning when the data is 0-D / length-1; self.coords.size is
# then 0 (no dimensions = no coordinates)
filterwarnings(
    "ignore",
    "coords should be an ndarray.*",
    DeprecationWarning,
    "sparse._coo.core",
)


def _binop(name: str, swap: bool = False):
    """Create a method for binary operator `name`."""

    def method(self, other):
        # Handle the case where `other` is scalar
        other = possible_scalar(other)

        # For __r*__ methods
        left, right = (other, self) if swap else (self, other)

        # Invoke an xr.DataArray method like .__mul__()
        result = getattr(super(xr.DataArray, left), f"__{name}__")(right)

        # Determine resulting units
        result.units = left._binop_units(name, right)

        return result

    return method


[docs] @xr.register_dataarray_accessor("_sda") class SparseAccessor: """:mod:`xarray` accessor to help :class:`SparseDataArray`. See the xarray accessor documentation, e.g. :func:`~xarray.register_dataarray_accessor`. """ def __init__(self, obj): self.da = obj
[docs] def convert(self): """Return a :class:`SparseDataArray` instance.""" if not self.da._sda.COO_data: # Dense (numpy.ndarray) data; convert to sparse data = sparse.COO.from_numpy(self.da.data, fill_value=np.nan) elif not np.isnan(self.da.data.fill_value): # sparse.COO with non-NaN fill value; copy and change data = self.da.data.copy(deep=False) data.fill_value = data.dtype.type(np.nan) else: # No change data = self.da.data if isinstance(self.da, SparseDataArray): # Replace the variable, returning a copy variable = self.da.variable._replace(data=data) return self.da._replace(variable=variable) else: # Construct return SparseDataArray( data=data, coords=self.da.coords, dims=self.da.dims, name=self.da.name, attrs=self.da.attrs, )
@property def COO_data(self): """:obj:`True` if the DataArray has :class:`sparse.COO` data.""" return isinstance(self.da.data, sparse.COO) @property def dense(self): """Return a copy with dense (:class:`numpy.ndarray`) data.""" try: # Use existing method xr.Variable._to_dense() return self.da._replace(variable=self.da.variable._to_dense()) except TypeError: # da.variable was already dense return self.da @property def dense_super(self): """Return a proxy to a :class:`numpy.ndarray`-backed :class:`xarray.DataArray`.""" return super(SparseDataArray, self.dense)
class OverrideItem: """Override :meth:`xarray.DataArray.item`. The :meth:`item` method is set dynamically by :class:`xarray.ops.IncludeNumpySameMethods`, a parent of :class:`xarray.arithmetic.DataArrayArithmetic` and thus of DataArray. That has the effect of overriding an ordinary :meth:`item` method defined on :class:`SparseDataArray`. This class, placed higher in the MRO for SparseDataArray, cancels out that effect. """ __slots__ = () def __init_subclass__(cls, **kwargs): setattr(cls, "item", cls._item)
[docs] class SparseDataArray(OverrideItem, xr.DataArray, Quantity): """:class:`~xarray.DataArray` with sparse data. SparseDataArray uses :class:`sparse.COO` for storage with :data:`numpy.nan` as its :attr:`sparse.SparseArray.fill_value`. Some methods of :class:`~xarray.DataArray` are overridden to ensure data is in sparse, or dense, format as necessary, to provide expected functionality not currently supported by :mod:`sparse`, and to avoid exhausting memory for some operations that require dense data. """ __slots__: Tuple[str, ...] = tuple() def __init__( self, data: Any = dtypes.NA, coords: Union[Sequence[Tuple], Mapping[Hashable, Any], None] = None, dims: Union[str, Sequence[Hashable], None] = None, name: Hashable = None, attrs: Optional[Mapping] = None, # internal parameters indexes: Optional[Dict[Hashable, pd.Index]] = None, fastpath: bool = False, **kwargs, ): if fastpath: return xr.DataArray.__init__( self, data, coords, dims, name, attrs, indexes, fastpath ) attrs = Quantity._collect_attrs(data, attrs, kwargs) assert 0 == len( kwargs ), f"Unrecognized kwargs {kwargs.keys()} to SparseDataArray()" if isinstance(data, int): data = float(data) data, name = Quantity._single_column_df(data, name) if isinstance(data, pd.Series): # Possibly converted from pd.DataFrame, above if data.dtype == int: # Ensure float data data = data.astype(float) data = xr.DataArray.from_series(data, sparse=True) if isinstance(data, xr.DataArray): # Possibly converted from pd.Series, above coords = data._coords name = name or data.name data = data.variable # Invoke the xr.DataArray constructor xr.DataArray.__init__(self, data, coords, dims, name, attrs) if not isinstance(self.variable.data, sparse.COO): # Dense (numpy.ndarray) data; convert to sparse data = sparse.COO.from_numpy(self.variable.data, fill_value=np.nan) elif not np.isnan(self.variable.data.fill_value): # sparse.COO with non-NaN fill value; copy and change data = self.variable.data.copy(deep=False) data.fill_value = data.dtype.type(np.nan) else: # No change return # Replace the variable self._variable = self._variable._replace(data=data)
[docs] @classmethod def from_series(cls, obj, sparse=True): """Convert a pandas.Series into a SparseDataArray.""" # Call the parent method always with sparse=True, then re-wrap return xr.DataArray.from_series(obj, sparse=True)._sda.convert()
# Binary operations __mul__ = _binop("mul") __rtruediv__ = _binop("truediv", swap=True) __truediv__ = _binop("truediv")
[docs] def ffill(self, dim: Hashable, limit: Optional[int] = None): """Override :meth:`~xarray.DataArray.ffill` to auto-densify.""" return self._sda.dense_super.ffill(dim, limit)._sda.convert()
def _item(self, *args): """Like :meth:`~xarray.DataArray.item`.""" # See OverrideItem if len(args): # pragma: no cover super().item(*args) elif len(self.data.shape) == 0: return ( self.data.data[0] if isinstance(self.data, sparse.COO) else self.data.item() ) else: raise ValueError("can only convert an array of size 1 to a Python scalar")
[docs] def sel( self, indexers: Optional[Mapping[Any, Any]] = None, method: Optional[str] = None, tolerance=None, drop: bool = False, **indexers_kwargs: Any, ) -> "SparseDataArray": """Return a new array by selecting labels along the specified dim(s). Overrides :meth:`~xarray.DataArray.sel` to handle >1-D indexers with sparse data. """ indexers = either_dict_or_kwargs(indexers, indexers_kwargs, "sel") if isinstance(indexers, dict) and len(indexers) > 1: result = self for k, v in indexers.items(): result = result.sel( {k: v}, method=method, tolerance=tolerance, drop=drop ) return result else: return ( super() .sel(indexers=indexers, method=method, tolerance=tolerance, drop=drop) ._sda.convert() )
[docs] def squeeze(self, dim=None, drop=False, axis=None): return self._sda.dense_super.squeeze( dim=dim, drop=drop, axis=axis )._sda.convert()
[docs] def to_dataframe( self, name: Optional[Hashable] = None, dim_order: Optional[Sequence[Hashable]] = None, ) -> pd.DataFrame: """Convert this array and its coords into a :class:`pandas.DataFrame`. Overrides :meth:`~xarray.DataArray.to_dataframe`. """ if dim_order is not None: raise NotImplementedError("dim_order arg to to_dataframe()") return self.to_series().to_frame(name or self.name or "value")
[docs] def to_series(self) -> pd.Series: """Convert this array into a :class:`~pandas.Series`. Overrides :meth:`~xarray.DataArray.to_series` to create the series without first converting to a potentially very large :class:`numpy.ndarray`. """ # Use SparseArray.coords and .data (each already 1-D) to construct the pd.Series # Construct a pd.MultiIndex without using .from_product if self.dims: index = pd.MultiIndex.from_arrays( self.data.coords, names=self.dims ).set_levels([self.coords[d].values for d in self.dims]) else: index = pd.MultiIndex.from_arrays([[0]], names=[None]) return pd.Series(self.data.data, index=index, name=self.name)