import logging
import re
from functools import partial, singledispatch
from itertools import chain, compress
from types import MappingProxyType
from typing import (
TYPE_CHECKING,
Callable,
Generator,
Hashable,
Iterable,
Iterator,
Optional,
Sequence,
SupportsInt,
Union,
)
from warnings import warn
from .attrseries import AttrSeries
from .sparsedataarray import SparseDataArray
if TYPE_CHECKING:
from .quantity import AnyQuantity
log = logging.getLogger(__name__)
#: Regular expression for valid key strings.
EXPR = re.compile(r"^(?P<name>[^:]+)(:(?P<dims>([^:-]*-)*[^:-]+)?(:(?P<tag>[^:]*))?)?$")
#: Regular expression for non-keylike strings.
BARE_STR = re.compile(r"^\s*(?P<name>[^:]+)\s*$")
@singledispatch
def _name_dims_tag(value) -> tuple[str, tuple[str, ...], Optional[str]]:
"""Convert various `value`s into (name, dims, tag) tuples.
Helper for :meth:`.Key.__init__`.
"""
raise TypeError(type(value))
@_name_dims_tag.register
def _(value: str):
"""Parse a string that may contain a Key expression."""
match = EXPR.match(value)
if match is None:
raise ValueError(f"Invalid key expression: {repr(value)}")
groups = match.groupdict()
return (
groups["name"],
tuple() if not groups["dims"] else tuple(groups["dims"].split("-")),
groups["tag"],
)
@_name_dims_tag.register(AttrSeries)
@_name_dims_tag.register(SparseDataArray)
def _(value: "AnyQuantity"): # register() only handles bare AnyQuantity in Python ≥3.11
"""Return (name, dims, tag) that describe an existing Quantity."""
return str(value.name), tuple(map(str, value.dims)), None
[docs]
class Key:
"""A hashable key for a quantity that includes its dimensionality."""
_name: str
_dims: tuple[str, ...]
_tag: Optional[str]
def __init__(
self,
name_or_value: Union[str, "Key", "AnyQuantity"],
dims: Iterable[str] = [],
tag: Optional[str] = None,
_fast: bool = False,
):
if _fast:
# Fast path: don't handle arguments
assert isinstance(name_or_value, str)
self._name = name_or_value
self._dims = tuple(dims)
self._tag = tag or None
else:
# Convert various values into a (name, dims, tags)
self._name, _dims, _tag = _name_dims_tag(name_or_value)
# Check for conflicts between dims inferred from name_or_value and any
# direct argument
# TODO handle resolveable combinations without raising exceptions
if bool(_dims) and bool(dims):
raise ValueError(
f"Conflict: {dims = } argument vs. {_dims!r} from {name_or_value!r}"
)
elif bool(_tag) and bool(tag):
raise ValueError(
f"Conflict: {tag = } argument vs. {_tag!r} from {name_or_value!r}"
)
self._dims = _dims or tuple(dims)
self._tag = _tag or tag
# Pre-compute string representation and hash
self._str = (
self._name
+ ":"
+ "-".join(self._dims)
+ (f":{self._tag}" if self._tag else "")
)
self._hash = hash(self._str)
# Class methods
[docs]
@classmethod
def bare_name(cls, value) -> Optional[str]:
"""If `value` is a bare name (no dims or tags), return it; else :obj:`None`."""
if not isinstance(value, str):
return None
match = BARE_STR.match(value)
return match.group("name") if match else None
[docs]
@classmethod
def from_str_or_key(
cls,
value: Union[str, "Key", "AnyQuantity"],
drop: Union[Iterable[str], bool] = [],
append: Iterable[str] = [],
tag: Optional[str] = None,
) -> "Key":
"""Return a new Key from *value*.
.. versionchanged:: 1.18.0
Calling :meth:`from_str_or_key` with a single argument is no longer
necessary; simply give the same `value` as an argument to :class:`Key`.
The class method is retained for convenience when calling with multiple
arguments. However, the following are equivalent and may be more readable:
.. code-block:: python
k1 = Key("foo:a-b-c:t1", drop="b", append="d", tag="t2")
k2 = Key("foo:a-b-c:t1").drop("b").append("d)"
Parameters
----------
value : str or .Key
Value to use to generate a new Key.
drop : list of str or :obj:`True`, optional
Existing dimensions of *value* to drop. See :meth:`drop`.
append : list of str, optional
New dimensions to append to the returned Key. See :meth:`append`.
tag : str, optional
Tag for returned Key. If *value* has a tag, the two are joined
using a '+' character. See :meth:`add_tag`.
Returns
-------
:class:`Key`
"""
base = cls(value)
# Return quickly if no further manipulations are required
if not any([drop, append, tag]):
warn(
"Calling Key.from_str_or_key(value) with no other arguments is no "
"longer necessary; simply use Key(value)",
FutureWarning,
stacklevel=2,
)
return base
# mypy is fussy here
drop_args: tuple[Union[str, bool], ...] = tuple(
[drop] if isinstance(drop, bool) else drop
)
# Drop and append dimensions; add tag
return base.drop(*drop_args).append(*tuple(append)).add_tag(tag)
[docs]
@classmethod
def product(cls, new_name: str, *keys, tag: Optional[str] = None) -> "Key":
"""Return a new Key that has the union of dimensions on *keys*.
Dimensions are ordered by their first appearance:
1. First, the dimensions of the first of the *keys*.
2. Next, any additional dimensions in the second of the *keys* that
were not already added in step 1.
3. etc.
Parameters
----------
new_name : str
Name for the new Key. The names of *keys* are discarded.
"""
# Iterable of dimension names from all keys, in order, with repetitions
dims = chain(*map(lambda k: cls(k).dims, keys))
# Return new key. Use dict to keep only unique *dims*, in same order
return cls(new_name, dict.fromkeys(dims).keys()).add_tag(tag)
def __add__(self, other: str) -> "Key":
if not isinstance(other, str):
raise TypeError(type(other))
return self.add_tag(other)
def __sub__(self, other: Union[str, Iterable[str]]) -> "Key":
return self.remove_tag(*((other,) if isinstance(other, str) else other))
def __mul__(self, other: Union[str, "Key", Sequence[str]]) -> "Key":
if isinstance(other, str):
other_dims: Sequence[str] = (other,)
elif isinstance(other, Key):
other_dims = other.dims
elif isinstance(other, Sequence):
other_dims = other
else:
raise TypeError(type(other))
return self.append(*other_dims)
def __truediv__(self, other: Union[str, "Key", Sequence[str]]) -> "Key":
if isinstance(other, str):
other_dims: Sequence[str] = (other,)
elif isinstance(other, Key):
other_dims = other.dims
elif isinstance(other, Sequence):
other_dims = other
else:
raise TypeError(type(other))
return self.drop(*other_dims)
def __repr__(self) -> str:
"""Representation of the Key, e.g. '<name:dim1-dim2-dim3:tag>."""
return f"<{self._str}>"
def __str__(self) -> str:
"""String equivalent of the Key, e.g. 'name:dim1-dim2-dim3:tag'."""
return self._str # Return the pre-computed value
def __hash__(self):
"""Key hashes the same as :py:`str(Key)`."""
return self._hash
def __eq__(self, other) -> bool:
"""Key is equal to :py:`str(Key)`."""
try:
other = Key(other)
except TypeError:
return NotImplemented
return (
(self.name == other.name)
and (set(self.dims) == set(other.dims))
and (self.tag == other.tag)
)
# Less-than and greater-than operations, for sorting
def __lt__(self, other) -> bool:
if isinstance(other, Key):
return str(self.sorted) < str(other.sorted)
elif isinstance(other, str):
return str(self.sorted) < other
else:
return NotImplemented
def __gt__(self, other) -> bool:
if isinstance(other, Key):
return str(self.sorted) > str(other.sorted)
elif isinstance(other, str):
return str(self.sorted) > other
else:
return NotImplemented
@property
def name(self) -> str:
"""Name of the quantity, :class:`str`."""
return self._name
@property
def dims(self) -> tuple[str, ...]:
"""Dimensions of the quantity, :class:`tuple` of :class:`str`."""
return self._dims
@property
def tag(self) -> Optional[str]:
"""Quantity tag, :class:`str` or :obj:`None`."""
return self._tag
@property
def sorted(self) -> "Key":
"""A version of the Key with its :attr:`.dims` :func:`sorted`."""
return Key(self._name, sorted(self._dims), self._tag, _fast=True)
[docs]
def rename(self, name: str) -> "Key":
"""Return a Key with a replaced `name`."""
return Key(name, self._dims, self._tag, _fast=True)
[docs]
def drop(self, *dims: Union[str, bool]) -> "Key":
"""Return a new Key with `dims` dropped."""
return Key(
self._name,
[] if dims == (True,) else filter(lambda d: d not in dims, self._dims),
self._tag,
_fast=True,
)
[docs]
def drop_all(self) -> "Key":
"""Return a new Key with all dimensions dropped / zero dimensions."""
return Key(self._name, tuple(), self._tag, _fast=True)
[docs]
def append(self, *dims: str) -> "Key":
"""Return a new Key with additional dimensions `dims`."""
return Key(self._name, list(self._dims) + list(dims), self._tag, _fast=True)
[docs]
def add_tag(self, tag: Optional[str]) -> "Key":
"""Return a new Key with `tag` appended."""
return Key(
self._name, self._dims, "+".join(filter(None, [self._tag, tag])), _fast=True
)
[docs]
def iter_sums(self) -> Generator[tuple["Key", Callable, "Key"], None, None]:
"""Generate (key, task) for all possible partial sums of the Key."""
from genno.operator import sum
for agg_dims, others in combo_partition(self.dims):
yield (
Key(self._name, agg_dims, self.tag, _fast=True),
partial(sum, dimensions=others, weights=None),
self,
)
[docs]
def remove_tag(self, *tags: str) -> "Key":
"""Return a key with any of `tags` dropped.
Raises
------
ValueError
If none of `tags` are in :attr:`.tags`.
"""
new_tags = tuple(filter(lambda t: t not in tags, (self.tag or "").split("+")))
new_tag = "+".join(new_tags) if new_tags else None
if new_tag == self.tag:
raise ValueError(f"No existing tags {tags!r} to remove")
return Key(self._name, self._dims, new_tag, _fast=True)
@_name_dims_tag.register
def _(value: Key):
"""Return the (name, dims, tag) of an existing Key."""
return value._name, value._dims, value._tag
[docs]
class KeySeq:
"""Utility class for generating similar :class:`Keys <.Key>`."""
#: Base :class:`.Key` of the sequence.
base: Key
# Keys that have been created.
_keys: dict[Hashable, Key]
def __init__(self, *args, **kwargs):
self.base = Key(*args, **kwargs)
self._keys = {}
def _next_int_tag(self) -> int:
return max([-1] + [t for t in self._keys if isinstance(t, int)]) + 1
def __next__(self) -> Key:
return self[self._next_int_tag()]
def __call__(self, value: Optional[Hashable] = None) -> Key:
return next(self) if value is None else self[value]
def __getitem__(self, value: Hashable) -> Key:
tag = int(value) if isinstance(value, SupportsInt) else str(value)
result = self._keys[tag] = self.base + str(tag)
return result
def __repr__(self) -> str:
return f"<KeySeq from '{self.base!s}'>"
@property
def keys(self) -> MappingProxyType:
"""Read-only view of previously-created :class:`Keys <.Key>`.
In the form of a :class:`dict` mapping tags (:class:`int` or :class:`str`) to
:class:`.Key` values.
"""
return MappingProxyType(self._keys)
@property
def prev(self) -> Key:
"""The most recently created :class:`.Key`."""
return next(reversed(self._keys.values()))
# Access to Key properties
@property
def name(self) -> str:
"""Name of the :attr:`.base` Key."""
return self.base.name
@property
def dims(self) -> tuple[str, ...]:
"""Dimensions of the :attr:`.base` Key."""
return self.base.dims
@property
def tag(self) -> Optional[str]:
"""Tag of the :attr:`.base` Key."""
return self.base.tag
def __add__(self, other: str) -> "KeySeq":
return KeySeq(self.base + other)
def __mul__(self, other) -> "KeySeq":
return KeySeq(self.base * other)
def __sub__(self, other: Union[str, Iterable[str]]) -> "KeySeq":
return KeySeq(self.base - other)
def __truediv__(self, other) -> "KeySeq":
return KeySeq(self.base / other)
#: Type shorthand for :class:`Key` or any other value that can be used as a key.
KeyLike = Union[Key, str]
def combo_partition(iterable):
"""Yield pairs of lists with all possible subsets of *iterable*."""
# Format string for binary conversion, e.g. '04b'
fmt = "0{}b".format(len(iterable))
for n in range(2 ** len(iterable) - 1):
# Two binary lists
a, b = zip(*[(v, not v) for v in map(int, format(n, fmt))])
yield list(compress(iterable, a)), list(compress(iterable, b))
[docs]
def iter_keys(value: Union[KeyLike, tuple[KeyLike, ...]]) -> Iterator[Key]:
"""Yield :class:`Keys <Key>` from `value`.
Raises
------
TypeError
`value` is not an iterable of :class:`Key`.
See also
--------
.Computer.add
"""
if isinstance(value, (Key, str)):
yield Key(value)
tmp: Iterator[KeyLike] = iter(())
else:
tmp = iter(value)
for element in tmp:
if not isinstance(element, Key):
raise TypeError(type(element))
yield element
[docs]
def single_key(value: Union[KeyLike, tuple[KeyLike, ...], Iterator]) -> Key:
"""Ensure `value` is a single :class:`Key`.
Raises
------
TypeError
`value` is not a :class:`Key` or 1-tuple of :class:`Key`.
See also
--------
.Computer.add
"""
if isinstance(value, (Key, str)):
return Key(value)
tmp = iter(value)
try:
result = next(tmp)
except StopIteration:
raise TypeError("Empty iterable")
else:
try:
next(tmp)
except StopIteration:
pass
else:
raise TypeError("Iterable of length >1")
if isinstance(result, Key):
return result
else:
raise TypeError(type(result))