Source code for genno.core.sparsedataarray

import logging
from typing import Any, Dict, Hashable, Mapping, Optional, Sequence, Tuple, Union
from warnings import filterwarnings

import numpy as np
import pandas as pd

try:
    import sparse

    HAS_SPARSE = True
except ImportError:  # pragma: no cover
    HAS_SPARSE = False

import xarray as xr

from genno.compat.xarray import dtypes, either_dict_or_kwargs

from .base import BaseQuantity, collect_attrs, rank, single_column_df

log = logging.getLogger(__name__)

# Occurs below in SparseDataArray.squeeze()
filterwarnings(
    "ignore",
    "Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in "
    "future.",
    DeprecationWarning,
    "sparse",
)


[docs] @xr.register_dataarray_accessor("_sda") class SparseAccessor: """:mod:`xarray` accessor to help :class:`SparseDataArray`. See the xarray accessor documentation, e.g. :func:`~xarray.register_dataarray_accessor`. """ def __init__(self, obj): self.da = obj
[docs] def convert(self): """Return a :class:`SparseDataArray` instance.""" if not self.da._sda.COO_data: # Dense (numpy.ndarray) data; convert to sparse data = sparse.COO.from_numpy(self.da.data, fill_value=np.nan) elif not np.isnan(self.da.data.fill_value): # sparse.COO with non-NaN fill value; copy and change data = self.da.data.copy(deep=False) data.fill_value = data.dtype.type(np.nan) else: # No change data = self.da.data if isinstance(self.da, SparseDataArray): # Replace the variable, returning a copy variable = self.da.variable._replace(data=data) return self.da._replace(variable=variable) else: # Construct return SparseDataArray( data=data, coords=self.da.coords, dims=self.da.dims, name=self.da.name, attrs=self.da.attrs, )
@property def COO_data(self): """:obj:`True` if the DataArray has :class:`sparse.COO` data.""" return isinstance(self.da.data, sparse.COO) @property def dense(self): """Return a copy with dense (:class:`numpy.ndarray`) data.""" try: # Use existing method xr.Variable._to_dense() return self.da._replace(variable=self.da.variable._to_dense()) except TypeError: # self.da.variable was already dense return self.da @property def dense_super(self): """Return a proxy to a :class:`numpy.ndarray`-backed :class:`xarray.DataArray`.""" return super(SparseDataArray, self.dense)
class OverrideItem: """Override :meth:`xarray.DataArray.item`. The :meth:`item` method is set dynamically by :class:`xarray.ops.IncludeNumpySameMethods`, a parent of :class:`xarray.arithmetic.DataArrayArithmetic` and thus of DataArray. That has the effect of overriding an ordinary :meth:`item` method defined on :class:`SparseDataArray`. This class, placed higher in the MRO for SparseDataArray, cancels out that effect. """ __slots__ = () def __init_subclass__(cls, **kwargs): setattr(cls, "item", cls._item)
[docs] class SparseDataArray(BaseQuantity, OverrideItem, xr.DataArray): """:class:`~xarray.DataArray` with sparse data. SparseDataArray uses :class:`sparse.COO` for storage with :data:`numpy.nan` as its :attr:`sparse.SparseArray.fill_value`. Some methods of :class:`~xarray.DataArray` are overridden to ensure data is in sparse, or dense, format as necessary, to provide expected functionality not currently supported by :mod:`sparse`, and to avoid exhausting memory for some operations that require dense data. """ __slots__: Tuple[str, ...] = tuple() def __init__( self, data: Any = dtypes.NA, coords: Union[Sequence[Tuple], Mapping[Hashable, Any], None] = None, dims: Union[str, Sequence[Hashable], None] = None, name: Hashable = None, attrs: Optional[Mapping] = None, # internal parameters indexes: Optional[Dict[Hashable, pd.Index]] = None, fastpath: bool = False, **kwargs, ): if fastpath: return xr.DataArray.__init__( self, data, coords, dims, name, attrs, indexes, fastpath ) attrs = collect_attrs(data, attrs, kwargs) assert 0 == len( kwargs ), f"Unrecognized kwargs {kwargs.keys()} to SparseDataArray()" if isinstance(data, int): data = float(data) data, name = single_column_df(data, name) if isinstance(data, pd.Series): # Possibly converted from pd.DataFrame, above if data.dtype == int: # Ensure float data data = data.astype(float) data = xr.DataArray.from_series(data, sparse=True) if isinstance(data, xr.DataArray): # Possibly converted from pd.Series, above coords = data._coords name = name or data.name data = data.variable # Invoke the xr.DataArray constructor xr.DataArray.__init__(self, data, coords, dims, name, attrs) if not isinstance(self.variable.data, sparse.COO): dtype = self.variable.data.dtype if issubclass(dtype.type, np.integer): log.warning(f"Force dtype {self.variable.data.dtype} → float") dtype = float # Dense (numpy.ndarray) data; convert to sparse data = sparse.COO.from_numpy( self.variable.data.astype(dtype), fill_value=np.nan ) elif not np.isnan(self.variable.data.fill_value): # sparse.COO with non-NaN fill value; copy and change data = self.variable.data.copy(deep=False) data.fill_value = data.dtype.type(np.nan) else: # No change return # Replace the variable self._variable = self._variable._replace(data=data)
[docs] @classmethod def from_series(cls, obj, sparse=True): """Convert a pandas.Series into a SparseDataArray.""" # Call the parent method always with sparse=True, then re-wrap return xr.DataArray.from_series(obj, sparse=True)._sda.convert()
@staticmethod def _perform_binary_op( op, left: "SparseDataArray", right: "SparseDataArray", factor: float ) -> "SparseDataArray": # xr.DataArray-specific: outer join if rank(op) == 1: left, right = xr.align(left, right, join="outer", fill_value=0.0) # super() `left` if this hasn't already happened left_ = left if isinstance(left, super) else super(xr.DataArray, left) # Invoke an xr.DataArray method like .__mul__() return getattr(left_, f"__{op.__name__}__")(right) def __len__(self) -> int: v = self.variable return 0 if getattr(v.data, "nnz", 1) == 0 else len(v) @property def size(self) -> int: return 0 if getattr(self.variable.data, "nnz", 1) == 0 else self.variable.size
[docs] def clip(self, min=None, max=None, *, keep_attrs=None): """Override :meth:`~xarray.DataArray.clip` to return SparseDataArray.""" return super().clip(min, max, keep_attrs=keep_attrs)._sda.convert()
[docs] def ffill(self, dim: Hashable, limit: Optional[int] = None): """Override :meth:`~xarray.DataArray.ffill` to auto-densify.""" return self._sda.dense_super.ffill(dim, limit)._sda.convert()
[docs] def interp( self, coords=None, method="linear", assume_sorted=False, kwargs=None, **coords_kwargs: Any, ): """Override :meth:`~xarray.DataArray.interp` to auto-densify.""" return self._sda.dense_super.interp( coords, method, assume_sorted, kwargs, **coords_kwargs )._sda.convert()
def _item(self, *args): """Like :meth:`~xarray.DataArray.item`.""" # See OverrideItem d = self.data if args: raise NotImplementedError("item() with args") elif d.size > 1: raise ValueError("can only convert an array of size 1 to a Python scalar") elif isinstance(d, sparse.COO): # sparse.COO.item() does not exist return d.fill_value if d.nnz == 0 else d.data.tolist()[0] else: # numpy.ndarray or something else return d.item()
[docs] def sel( self, indexers: Optional[Mapping[Any, Any]] = None, method: Optional[str] = None, tolerance=None, drop: bool = False, **indexers_kwargs: Any, ) -> "SparseDataArray": """Return a new array by selecting labels along the specified dim(s). Overrides :meth:`~xarray.DataArray.sel` to handle >1-D indexers with sparse data. """ indexers = either_dict_or_kwargs(indexers, indexers_kwargs, "sel") if isinstance(indexers, dict) and len(indexers) > 1: result = self for k, v in indexers.items(): result = result.sel( {k: v}, method=method, tolerance=tolerance, drop=drop ) else: result = ( super() .sel(indexers=indexers, method=method, tolerance=tolerance, drop=drop) ._sda.convert() ) return self._keep(result, name=True, attrs=True)
[docs] def squeeze(self, dim=None, drop=False, axis=None): return self._sda.dense_super.squeeze( dim=dim, drop=drop, axis=axis )._sda.convert()
[docs] def to_dataframe( self, name: Optional[Hashable] = None, dim_order: Optional[Sequence[Hashable]] = None, ) -> pd.DataFrame: """Convert this array and its coords into a :class:`pandas.DataFrame`. Overrides :meth:`~xarray.DataArray.to_dataframe`. """ if dim_order is not None: raise NotImplementedError("dim_order arg to to_dataframe()") return self.to_series().to_frame(name or self.name or "value")
[docs] def to_series(self) -> pd.Series: """Convert this array into a :class:`~pandas.Series`. Overrides :meth:`~xarray.DataArray.to_series` to create the series without first converting to a potentially very large :class:`numpy.ndarray`. """ # Use SparseArray.coords and .data (each already 1-D) to construct the pd.Series # Construct a pd.MultiIndex without using .from_product if self.dims: index = pd.MultiIndex.from_arrays( self.data.coords, names=self.dims ).set_levels([self.coords[d].values for d in self.dims]) else: index = pd.MultiIndex.from_arrays([[0]], names=[None]) return pd.Series(self.data.data, index=index, name=self.name)
[docs] def where(self, cond: Any, other: Any = dtypes.NA, drop: bool = False): """Override :meth:`~xarray.DataArray.where` to auto-densify.""" return self._sda.dense_super.where(cond, other, drop)._sda.convert()