Source code for genno.core.key

import logging
import re
from functools import partial, singledispatch
from itertools import chain, compress
from types import MappingProxyType
from typing import (
    TYPE_CHECKING,
    Callable,
    Dict,
    Generator,
    Hashable,
    Iterable,
    Iterator,
    Optional,
    Sequence,
    SupportsInt,
    Tuple,
    Union,
)
from warnings import warn

from .attrseries import AttrSeries
from .sparsedataarray import SparseDataArray

if TYPE_CHECKING:
    from .quantity import AnyQuantity


log = logging.getLogger(__name__)

#: Regular expression for valid key strings.
EXPR = re.compile(r"^(?P<name>[^:]+)(:(?P<dims>([^:-]*-)*[^:-]+)?(:(?P<tag>[^:]*))?)?$")

#: Regular expression for non-keylike strings.
BARE_STR = re.compile(r"^\s*(?P<name>[^:]+)\s*$")


@singledispatch
def _name_dims_tag(value) -> Tuple[str, Tuple[str, ...], Optional[str]]:
    """Convert various `value`s into (name, dims, tag) tuples.

    Helper for :meth:`.Key.__init__`.
    """
    raise TypeError(type(value))


@_name_dims_tag.register
def _(value: str):
    """Parse a string that may contain a Key expression."""
    match = EXPR.match(value)
    if match is None:
        raise ValueError(f"Invalid key expression: {repr(value)}")
    groups = match.groupdict()
    return (
        groups["name"],
        tuple() if not groups["dims"] else tuple(groups["dims"].split("-")),
        groups["tag"],
    )


@_name_dims_tag.register(AttrSeries)
@_name_dims_tag.register(SparseDataArray)
def _(value: "AnyQuantity"):  # register() only handles bare AnyQuantity in Python ≥3.11
    """Return (name, dims, tag) that describe an existing Quantity."""
    return str(value.name), tuple(map(str, value.dims)), None


[docs] class Key: """A hashable key for a quantity that includes its dimensionality.""" _name: str _dims: Tuple[str, ...] _tag: Optional[str] def __init__( self, name_or_value: Union[str, "Key", "AnyQuantity"], dims: Iterable[str] = [], tag: Optional[str] = None, _fast: bool = False, ): if _fast: # Fast path: don't handle arguments assert isinstance(name_or_value, str) self._name = name_or_value self._dims = tuple(dims) self._tag = tag or None else: # Convert various values into a (name, dims, tags) self._name, _dims, _tag = _name_dims_tag(name_or_value) # Check for conflicts between dims inferred from name_or_value and any # direct argument # TODO handle resolveable combinations without raising exceptions if bool(_dims) and bool(dims): raise ValueError( f"Conflict: {dims = } argument vs. {_dims!r} from {name_or_value!r}" ) elif bool(_tag) and bool(tag): raise ValueError( f"Conflict: {tag = } argument vs. {_tag!r} from {name_or_value!r}" ) self._dims = _dims or tuple(dims) self._tag = _tag or tag # Pre-compute string representation and hash self._str = ( self._name + ":" + "-".join(self._dims) + (f":{self._tag}" if self._tag else "") ) self._hash = hash(self._str) # Class methods
[docs] @classmethod def bare_name(cls, value) -> Optional[str]: """If `value` is a bare name (no dims or tags), return it; else :obj:`None`.""" if not isinstance(value, str): return None match = BARE_STR.match(value) return match.group("name") if match else None
[docs] @classmethod def from_str_or_key( cls, value: Union[str, "Key", "AnyQuantity"], drop: Union[Iterable[str], bool] = [], append: Iterable[str] = [], tag: Optional[str] = None, ) -> "Key": """Return a new Key from *value*. .. versionchanged:: 1.18.0 Calling :meth:`from_str_or_key` with a single argument is no longer necessary; simply give the same `value` as an argument to :class:`Key`. The class method is retained for convenience when calling with multiple arguments. However, the following are equivalent and may be more readable: .. code-block:: python k1 = Key("foo:a-b-c:t1", drop="b", append="d", tag="t2") k2 = Key("foo:a-b-c:t1").drop("b").append("d)" Parameters ---------- value : str or .Key Value to use to generate a new Key. drop : list of str or :obj:`True`, optional Existing dimensions of *value* to drop. See :meth:`drop`. append : list of str, optional New dimensions to append to the returned Key. See :meth:`append`. tag : str, optional Tag for returned Key. If *value* has a tag, the two are joined using a '+' character. See :meth:`add_tag`. Returns ------- :class:`Key` """ base = cls(value) # Return quickly if no further manipulations are required if not any([drop, append, tag]): warn( "Calling Key.from_str_or_key(value) with no other arguments is no " "longer necessary; simply use Key(value)", FutureWarning, stacklevel=2, ) return base # mypy is fussy here drop_args: Tuple[Union[str, bool], ...] = tuple( [drop] if isinstance(drop, bool) else drop ) # Drop and append dimensions; add tag return base.drop(*drop_args).append(*tuple(append)).add_tag(tag)
[docs] @classmethod def product(cls, new_name: str, *keys, tag: Optional[str] = None) -> "Key": """Return a new Key that has the union of dimensions on *keys*. Dimensions are ordered by their first appearance: 1. First, the dimensions of the first of the *keys*. 2. Next, any additional dimensions in the second of the *keys* that were not already added in step 1. 3. etc. Parameters ---------- new_name : str Name for the new Key. The names of *keys* are discarded. """ # Iterable of dimension names from all keys, in order, with repetitions dims = chain(*map(lambda k: cls(k).dims, keys)) # Return new key. Use dict to keep only unique *dims*, in same order return cls(new_name, dict.fromkeys(dims).keys()).add_tag(tag)
def __add__(self, other: str) -> "Key": if not isinstance(other, str): raise TypeError(type(other)) return self.add_tag(other) def __sub__(self, other: Union[str, Iterable[str]]) -> "Key": return self.remove_tag(*((other,) if isinstance(other, str) else other)) def __mul__(self, other: Union[str, "Key", Sequence[str]]) -> "Key": if isinstance(other, str): other_dims: Sequence[str] = (other,) elif isinstance(other, Key): other_dims = other.dims elif isinstance(other, Sequence): other_dims = other else: raise TypeError(type(other)) return self.append(*other_dims) def __truediv__(self, other: Union[str, "Key", Sequence[str]]) -> "Key": if isinstance(other, str): other_dims: Sequence[str] = (other,) elif isinstance(other, Key): other_dims = other.dims elif isinstance(other, Sequence): other_dims = other else: raise TypeError(type(other)) return self.drop(*other_dims) def __repr__(self) -> str: """Representation of the Key, e.g. '<name:dim1-dim2-dim3:tag>.""" return f"<{self._str}>" def __str__(self) -> str: """String equivalent of the Key, e.g. 'name:dim1-dim2-dim3:tag'.""" return self._str # Return the pre-computed value def __hash__(self): """Key hashes the same as :py:`str(Key)`.""" return self._hash def __eq__(self, other) -> bool: """Key is equal to :py:`str(Key)`.""" try: other = Key(other) except TypeError: return NotImplemented return ( (self.name == other.name) and (set(self.dims) == set(other.dims)) and (self.tag == other.tag) ) # Less-than and greater-than operations, for sorting def __lt__(self, other) -> bool: if isinstance(other, Key): return str(self.sorted) < str(other.sorted) elif isinstance(other, str): return str(self.sorted) < other else: return NotImplemented def __gt__(self, other) -> bool: if isinstance(other, Key): return str(self.sorted) > str(other.sorted) elif isinstance(other, str): return str(self.sorted) > other else: return NotImplemented @property def name(self) -> str: """Name of the quantity, :class:`str`.""" return self._name @property def dims(self) -> Tuple[str, ...]: """Dimensions of the quantity, :class:`tuple` of :class:`str`.""" return self._dims @property def tag(self) -> Optional[str]: """Quantity tag, :class:`str` or :obj:`None`.""" return self._tag @property def sorted(self) -> "Key": """A version of the Key with its :attr:`.dims` :func:`sorted`.""" return Key(self._name, sorted(self._dims), self._tag, _fast=True)
[docs] def rename(self, name: str) -> "Key": """Return a Key with a replaced `name`.""" return Key(name, self._dims, self._tag, _fast=True)
[docs] def drop(self, *dims: Union[str, bool]) -> "Key": """Return a new Key with `dims` dropped.""" return Key( self._name, [] if dims == (True,) else filter(lambda d: d not in dims, self._dims), self._tag, _fast=True, )
[docs] def drop_all(self) -> "Key": """Return a new Key with all dimensions dropped / zero dimensions.""" return Key(self._name, tuple(), self._tag, _fast=True)
[docs] def append(self, *dims: str) -> "Key": """Return a new Key with additional dimensions `dims`.""" return Key(self._name, list(self._dims) + list(dims), self._tag, _fast=True)
[docs] def add_tag(self, tag: Optional[str]) -> "Key": """Return a new Key with `tag` appended.""" return Key( self._name, self._dims, "+".join(filter(None, [self._tag, tag])), _fast=True )
[docs] def iter_sums(self) -> Generator[Tuple["Key", Callable, "Key"], None, None]: """Generate (key, task) for all possible partial sums of the Key.""" from genno.operator import sum for agg_dims, others in combo_partition(self.dims): yield ( Key(self._name, agg_dims, self.tag, _fast=True), partial(sum, dimensions=others, weights=None), self, )
[docs] def remove_tag(self, *tags: str) -> "Key": """Return a key with any of `tags` dropped. Raises ------ ValueError If none of `tags` are in :attr:`.tags`. """ new_tags = tuple(filter(lambda t: t not in tags, (self.tag or "").split("+"))) new_tag = "+".join(new_tags) if new_tags else None if new_tag == self.tag: raise ValueError(f"No existing tags {tags!r} to remove") return Key(self._name, self._dims, new_tag, _fast=True)
@_name_dims_tag.register def _(value: Key): """Return the (name, dims, tag) of an existing Key.""" return value._name, value._dims, value._tag
[docs] class KeySeq: """Utility class for generating similar :class:`Keys <.Key>`.""" #: Base :class:`.Key` of the sequence. base: Key # Keys that have been created. _keys: Dict[Hashable, Key] def __init__(self, *args, **kwargs): self.base = Key(*args, **kwargs) self._keys = {} def _next_int_tag(self) -> int: return max([-1] + [t for t in self._keys if isinstance(t, int)]) + 1 def __next__(self) -> Key: return self[self._next_int_tag()] def __call__(self, value: Optional[Hashable] = None) -> Key: return next(self) if value is None else self[value] def __getitem__(self, value: Hashable) -> Key: tag = int(value) if isinstance(value, SupportsInt) else str(value) result = self._keys[tag] = self.base + str(tag) return result def __repr__(self) -> str: return f"<KeySeq from '{self.base!s}'>" @property def keys(self) -> MappingProxyType: """Read-only view of previously-created :class:`Keys <.Key>`. In the form of a :class:`dict` mapping tags (:class:`int` or :class:`str`) to :class:`.Key` values. """ return MappingProxyType(self._keys) @property def prev(self) -> Key: """The most recently created :class:`.Key`.""" return next(reversed(self._keys.values())) # Access to Key properties @property def name(self) -> str: """Name of the :attr:`.base` Key.""" return self.base.name @property def dims(self) -> Tuple[str, ...]: """Dimensions of the :attr:`.base` Key.""" return self.base.dims @property def tag(self) -> Optional[str]: """Tag of the :attr:`.base` Key.""" return self.base.tag def __add__(self, other: str) -> "KeySeq": return KeySeq(self.base + other) def __mul__(self, other) -> "KeySeq": return KeySeq(self.base * other) def __sub__(self, other: Union[str, Iterable[str]]) -> "KeySeq": return KeySeq(self.base - other) def __truediv__(self, other) -> "KeySeq": return KeySeq(self.base / other)
#: Type shorthand for :class:`Key` or any other value that can be used as a key. KeyLike = Union[Key, str] def combo_partition(iterable): """Yield pairs of lists with all possible subsets of *iterable*.""" # Format string for binary conversion, e.g. '04b' fmt = "0{}b".format(len(iterable)) for n in range(2 ** len(iterable) - 1): # Two binary lists a, b = zip(*[(v, not v) for v in map(int, format(n, fmt))]) yield list(compress(iterable, a)), list(compress(iterable, b))
[docs] def iter_keys(value: Union[KeyLike, Tuple[KeyLike, ...]]) -> Iterator[Key]: """Yield :class:`Keys <Key>` from `value`. Raises ------ TypeError `value` is not an iterable of :class:`Key`. See also -------- .Computer.add """ if isinstance(value, (Key, str)): yield Key(value) tmp: Iterator[KeyLike] = iter(()) else: tmp = iter(value) for element in tmp: if not isinstance(element, Key): raise TypeError(type(element)) yield element
[docs] def single_key(value: Union[KeyLike, Tuple[KeyLike, ...], Iterator]) -> Key: """Ensure `value` is a single :class:`Key`. Raises ------ TypeError `value` is not a :class:`Key` or 1-tuple of :class:`Key`. See also -------- .Computer.add """ if isinstance(value, (Key, str)): return Key(value) tmp = iter(value) try: result = next(tmp) except StopIteration: raise TypeError("Empty iterable") else: try: next(tmp) except StopIteration: pass else: raise TypeError("Iterable of length >1") if isinstance(result, Key): return result else: raise TypeError(type(result))