from collections.abc import Hashable, Iterable, Mapping
from typing import Optional, Union
import genno
from genno import Quantity
try:
import sdmx
except ModuleNotFoundError: # pragma: no cover
HAS_SDMX = False
else:
HAS_SDMX = True
from . import util
__all__ = [
"codelist_to_groups",
"dataset_to_quantity",
"quantity_to_dataset",
"quantity_to_message",
]
[docs]
def codelist_to_groups(
codes: Union["sdmx.model.common.Codelist", Iterable["sdmx.model.common.Code"]],
dim: Optional[str] = None,
) -> Mapping[str, Mapping[str, list[str]]]:
"""Convert `codes` into a mapping from parent items to their children.
The returned value is suitable for use with :func:`~.operator.aggregate`.
Parameters
----------
codes
Either a :class:`sdmx.Codelist <sdmx.model.common.Codelist>` object or any
iterable of :class:`sdmx.Code <sdmx.model.common.Code>`.
dim : str, optional
Dimension to aggregate. If `codes` is a code list and `dim` is not given, the
ID of the code list is used; otherwise `dim` must be supplied.
"""
from sdmx.model.common import Codelist
if isinstance(codes, Codelist):
items: Iterable["sdmx.model.common.Code"] = codes.items.values()
dim = dim or codes.id
else:
items = codes
if dim is None:
raise ValueError("Must provide a dimension ID for aggregation")
groups = dict()
for code in filter(lambda c: len(c.child), items):
groups[code.id] = list(map(str, code.child))
return {dim: groups}
[docs]
def dataset_to_quantity(ds: "sdmx.model.common.BaseDataSet") -> Quantity:
"""Convert :class:`DataSet <sdmx.model.common.BaseDataSet>` to :class:`.Quantity`.
Returns
-------
.Quantity
The quantity may have the attributes:
- "dataflow_urn": :attr:`urn <sdmx.model.common.IdentifiableArtefact.urn>` of
the :class:`Dataflow <sdmx.model.common.BaseDataflow>` referenced by the
:attr:`described_by <sdmx.model.common.BaseDataSet.described_by>` attribute of
`ds`, if any.
- "structure_urn": :attr:`urn <sdmx.model.common.IdentifiableArtefact.urn>` of
the :class:`DataStructureDefinition
<sdmx.model.common.BaseDataStructureDefinition>` referenced by the
:attr:`structured_by <sdmx.model.common.BaseDataSet.structured_by>` attribute
of `ds`, if any.
"""
# Assemble attributes
attrs: dict[str, str] = {}
if ds.described_by: # pragma: no cover
attrs.update(dataflow_urn=util.urn(ds.described_by))
if ds.structured_by:
attrs.update(structure_urn=util.urn(ds.structured_by))
return Quantity(sdmx.to_pandas(ds), attrs=attrs)
[docs]
def quantity_to_dataset(
qty: Quantity,
structure: "sdmx.model.common.BaseDataStructureDefinition",
*,
observation_dimension: Optional[str] = None,
version: Union["sdmx.format.Version", str, None] = None,
) -> "sdmx.model.common.BaseDataSet":
"""Convert :class:`.Quantity` to :class:`DataSet <sdmx.model.common.BaseDataSet>`.
The resulting data set is structure-specific.
Parameters
----------
observation_dimension : str or sdmx.model.common.DimensionComponent, optional
If given, the resulting data set is arranged in series, with the
`observation_dimension` varying across observations within each series. If not
given, the data set is flat, with all dimensions specified for each observation.
version : str or sdmx.format.Version, optional
SDMX data model version to use; default 2.1.
"""
# Handle `version` argument, identify classes
_, DataSet, Observation = util.handle_version(version)
Key = sdmx.model.common.Key
SeriesKey = sdmx.model.common.SeriesKey
# Narrow type
# NB This is necessary because BaseDataStructureDefinition.measures is not defined
# TODO Remove once addressed upstream
assert isinstance(
structure,
(
sdmx.model.v21.DataStructureDefinition,
sdmx.model.v30.DataStructureDefinition,
),
)
try:
# URN of DSD stored on `qty` matches `structure`
assert qty.attrs["structure_urn"] == util.urn(structure)
except KeyError:
pass # No such attribute
# Dimensions; should be equivalent to the IDs of structure.dimensions
dims = qty.dims
# Create data set
ds = DataSet(structured_by=structure)
measure = structure.measures[0]
if od := util.handle_od(observation_dimension, structure):
# Index of `observation_dimension`
od_index = dims.index(od.id)
# Group data / construct SeriesKey all *except* the observation_dimension
series_dims = list(dims[:od_index] + dims[od_index + 1 :])
grouped: Iterable = qty.to_series().groupby(series_dims)
# For as_obs()
obs_dims: tuple[Hashable, ...] = (od.id,)
key_slice = slice(od_index, od_index + 1)
else:
# Pseudo-groupby object
grouped = [(None, qty.to_series())]
obs_dims, key_slice = dims, slice(None)
def as_obs(key, value):
"""Convert a single pd.Series element to an sdmx Observation."""
return Observation(
# Select some or all elements of the SeriesGroupBy key
dimension=structure.make_key(Key, dict(zip(obs_dims, key[key_slice]))),
value_for=measure,
value=value,
)
for series_key, data in grouped:
if series_key:
sk = structure.make_key(SeriesKey, dict(zip(series_dims, series_key)))
else:
sk = None
# - Convert each item to an sdmx Observation.
# - Add to `ds`, associating with sk
ds.add_obs([as_obs(key, value) for key, value in data.items()], series_key=sk)
return ds
[docs]
def quantity_to_message(
qty: Quantity, structure: "sdmx.model.v21.DataStructureDefinition", **kwargs
) -> "sdmx.message.DataMessage":
"""Convert :class:`.Quantity` to :class:`DataMessage <sdmx.message.DataMessage>`.
Parameters
----------
kwargs :
`observation_dimension` and `version` parameters are both used and passed on
to :func:`.quantity_to_dataset`.
"""
kwargs.update(
version=util.handle_version(kwargs.get("version"))[0],
observation_dimension=util.handle_od(
kwargs.get("observation_dimension"), structure
),
)
ds = quantity_to_dataset(
qty,
structure,
observation_dimension=kwargs["observation_dimension"],
version=kwargs["version"],
)
return sdmx.message.DataMessage(data=[ds], **kwargs)
@genno.operator.write_report.register
def _(obj: "sdmx.message.DataMessage", path, kwargs=None) -> None:
"""Write `obj` to the file at `path`.
If `obj` is a :class:`sdmx.message.DataMessage` and `path` ends with ".xml", use
use :mod:`sdmx` methods to write the file to SDMX-ML. Otherwise, equivalent to
:func:`genno.operator.write_report`.
"""
import genno.compat.sdmx.operator # noqa: F401
assert path.suffix.lower() == ".xml"
kwargs = kwargs or {}
kwargs.setdefault("pretty_print", True)
path.write_bytes(sdmx.to_xml(obj, **kwargs))