Skip to content

API Reference: CrossRegistry

A data registry to interact with the CROSS data platform.

Source code in src/crosscontract/registry/registry.py
class CrossRegistry:
    """A data registry to interact with the CROSS data platform."""

    def __init__(
        self,
        username: str | None = None,
        password: str | None = None,
        client: CrossClient | None = None,
    ):
        """Initialize the CrossRegistry with either a CrossClient instance or
        username/password.

        Args:
            username (str | None): The username or email to connect to CROSS platform.
            password (str | None): The password to connect to CROSS platform.
            client (CrossClient | None): An optional CrossClient instance. If provided,
                it will be used directly. If not, a new client will be created using
                the provided username and password.
        """
        if client is None:
            if username is None or password is None:
                raise ValueError(
                    "Either a CrossClient instance or both username and password must "
                    "be provided."
                )
            client = CrossClient(username=username, password=password)

        self._client = client
        self._variables: dict[str, CrossDataVariable | CrossDimension] = {}
        self._loading: set[str] = set()

    def __getattr__(self, name: str) -> CrossDataVariable | CrossDimension:
        """Magic method to allow dot notation access with lazy loading."""
        # 1. Prevent IDEs and Python internals from triggering API calls!
        if name.startswith("_"):
            raise AttributeError(
                f"'{self.__class__.__name__}' object has no attribute '{name}'"
            )

        # 2. Route through get_variable for auto-loading
        try:
            return self.get_variable(name)
        except KeyError as e:
            # 3. Cast the KeyError back to an AttributeError so Python's
            # internal hasattr() functions still work correctly.
            raise AttributeError(str(e)) from e

    def __getitem__(self, name: str) -> CrossDataVariable | CrossDimension:
        """
        Magic method to allow dictionary-style access.
        Usage: registry["my_variable_name"]
        """
        return self.get_variable(name)

    def __dir__(self) -> list[str]:
        """
        Overrides the built-in dir() function to include your dynamic variables
        in IDE autocomplete menus (like Jupyter tab-completion).
        """
        return list(super().__dir__()) + list(self._variables.keys())

    @property
    def contract_overview(self) -> pd.DataFrame:
        """Fetch an overview of available contracts from the CROSS platform as
        pandas DataFrame"""
        df = self._client.contracts.overview().pipe(
            lambda df: df[~df.name.str.startswith("dim_")]
        )
        return df[["name", "title", "description"]]

    def add_variable(
        self,
        name: str,
        filters: dict[str, Any] | None = None,
        overwrite: bool = False,
    ) -> CrossDataVariable | CrossDimension:
        """Add a variable to the registry by fetching it from the CROSS platform.

        Args:
            name (str): The name of the data contract. It is also used as name
                attribute name under which the variable will be accessible in the
                registry.
            filters (dict[str, Any] | None): Additional filters to apply when
                fetching data (optional).
            overwrite (bool): Whether to overwrite an existing variable with the
                same name.
                Defaults to False.

        Returns:
            CrossDataVariable | CrossDimension: The loaded variable instance.
        """
        if name in self._variables:
            if isinstance(self._variables[name], CrossDimension):
                raise ValueError(
                    f"Variable '{name}' is a Dimension and cannot be overwritten."
                )
            if not overwrite:
                raise ValueError(
                    f"Variable '{name}' already exists in the registry. "
                    "Set overwrite=True to replace it."
                )

        # todo: make dimensions identifiable by contract
        if name.startswith("dim_"):
            dimension = CrossDimension.from_client(self._client, name)
            self._variables[name] = dimension
            return dimension
        else:
            self._variables[name] = CrossDataVariable.from_client(
                self._client, name, filters=filters
            )

        # resolve the foreign key references, fetch the respective contracts, and
        # hydrate them into the variable
        # NOTE: circular foreign key references are assumed to be prevented
        # upstream by the CROSS platform upon contract injection.
        # The guard below is a defensive measure only.
        self._loading.add(name)
        try:
            fks: ForeignKeys | list = self._variables[name].foreign_keys or []
            for fk in fks:
                ref_name = fk.reference.resource
                if ref_name is None:
                    continue  # skip self-reference
                # todo: temporary fix for scenario references
                # fix will be provided upstream with clear dimension definition.
                if ref_name.startswith("dim_scenario"):
                    continue
                if ref_name not in self._variables:
                    if ref_name in self._loading:
                        warnings.warn(
                            f"Circular foreign key reference detected: "
                            f"'{ref_name}' is already being loaded while "
                            f"resolving '{name}'. Skipping.",
                            stacklevel=2,
                        )
                        continue  # skip circular reference
                    self.add_variable(ref_name)
                # if it is a dimension, add it to the variable,
                # otherwise skip (we only support dimensions as FK targets for now)
                var = self._variables[name]
                dim = self._variables[ref_name]
                if isinstance(var, CrossDataVariable) and isinstance(
                    dim, CrossDimension
                ):
                    var.add_dimension(dim)
        finally:
            self._loading.remove(name)
        return self._variables[name]

    def get_variable(self, name: str) -> CrossDataVariable | CrossDimension:
        """Explicit getter method for retrieving a variable (with lazy loading)."""
        if name not in self._variables:
            try:
                # Auto-load the variable
                self.add_variable(name)
            except Exception as e:
                # Chain the exception using 'from e' so the user can see
                # IF it was a network/auth error from the client!
                raise KeyError(
                    f"Could not load variable '{name}' into registry. "
                    f"Original error: {str(e)}"
                ) from e

        return self._variables[name]

contract_overview property

Fetch an overview of available contracts from the CROSS platform as pandas DataFrame

__dir__()

Overrides the built-in dir() function to include your dynamic variables in IDE autocomplete menus (like Jupyter tab-completion).

Source code in src/crosscontract/registry/registry.py
def __dir__(self) -> list[str]:
    """
    Overrides the built-in dir() function to include your dynamic variables
    in IDE autocomplete menus (like Jupyter tab-completion).
    """
    return list(super().__dir__()) + list(self._variables.keys())

__getattr__(name)

Magic method to allow dot notation access with lazy loading.

Source code in src/crosscontract/registry/registry.py
def __getattr__(self, name: str) -> CrossDataVariable | CrossDimension:
    """Magic method to allow dot notation access with lazy loading."""
    # 1. Prevent IDEs and Python internals from triggering API calls!
    if name.startswith("_"):
        raise AttributeError(
            f"'{self.__class__.__name__}' object has no attribute '{name}'"
        )

    # 2. Route through get_variable for auto-loading
    try:
        return self.get_variable(name)
    except KeyError as e:
        # 3. Cast the KeyError back to an AttributeError so Python's
        # internal hasattr() functions still work correctly.
        raise AttributeError(str(e)) from e

__getitem__(name)

Magic method to allow dictionary-style access. Usage: registry["my_variable_name"]

Source code in src/crosscontract/registry/registry.py
def __getitem__(self, name: str) -> CrossDataVariable | CrossDimension:
    """
    Magic method to allow dictionary-style access.
    Usage: registry["my_variable_name"]
    """
    return self.get_variable(name)

__init__(username=None, password=None, client=None)

Initialize the CrossRegistry with either a CrossClient instance or username/password.

Parameters:

Name Type Description Default
username str | None

The username or email to connect to CROSS platform.

None
password str | None

The password to connect to CROSS platform.

None
client CrossClient | None

An optional CrossClient instance. If provided, it will be used directly. If not, a new client will be created using the provided username and password.

None
Source code in src/crosscontract/registry/registry.py
def __init__(
    self,
    username: str | None = None,
    password: str | None = None,
    client: CrossClient | None = None,
):
    """Initialize the CrossRegistry with either a CrossClient instance or
    username/password.

    Args:
        username (str | None): The username or email to connect to CROSS platform.
        password (str | None): The password to connect to CROSS platform.
        client (CrossClient | None): An optional CrossClient instance. If provided,
            it will be used directly. If not, a new client will be created using
            the provided username and password.
    """
    if client is None:
        if username is None or password is None:
            raise ValueError(
                "Either a CrossClient instance or both username and password must "
                "be provided."
            )
        client = CrossClient(username=username, password=password)

    self._client = client
    self._variables: dict[str, CrossDataVariable | CrossDimension] = {}
    self._loading: set[str] = set()

add_variable(name, filters=None, overwrite=False)

Add a variable to the registry by fetching it from the CROSS platform.

Parameters:

Name Type Description Default
name str

The name of the data contract. It is also used as name attribute name under which the variable will be accessible in the registry.

required
filters dict[str, Any] | None

Additional filters to apply when fetching data (optional).

None
overwrite bool

Whether to overwrite an existing variable with the same name. Defaults to False.

False

Returns:

Type Description
CrossDataVariable | CrossDimension

CrossDataVariable | CrossDimension: The loaded variable instance.

Source code in src/crosscontract/registry/registry.py
def add_variable(
    self,
    name: str,
    filters: dict[str, Any] | None = None,
    overwrite: bool = False,
) -> CrossDataVariable | CrossDimension:
    """Add a variable to the registry by fetching it from the CROSS platform.

    Args:
        name (str): The name of the data contract. It is also used as name
            attribute name under which the variable will be accessible in the
            registry.
        filters (dict[str, Any] | None): Additional filters to apply when
            fetching data (optional).
        overwrite (bool): Whether to overwrite an existing variable with the
            same name.
            Defaults to False.

    Returns:
        CrossDataVariable | CrossDimension: The loaded variable instance.
    """
    if name in self._variables:
        if isinstance(self._variables[name], CrossDimension):
            raise ValueError(
                f"Variable '{name}' is a Dimension and cannot be overwritten."
            )
        if not overwrite:
            raise ValueError(
                f"Variable '{name}' already exists in the registry. "
                "Set overwrite=True to replace it."
            )

    # todo: make dimensions identifiable by contract
    if name.startswith("dim_"):
        dimension = CrossDimension.from_client(self._client, name)
        self._variables[name] = dimension
        return dimension
    else:
        self._variables[name] = CrossDataVariable.from_client(
            self._client, name, filters=filters
        )

    # resolve the foreign key references, fetch the respective contracts, and
    # hydrate them into the variable
    # NOTE: circular foreign key references are assumed to be prevented
    # upstream by the CROSS platform upon contract injection.
    # The guard below is a defensive measure only.
    self._loading.add(name)
    try:
        fks: ForeignKeys | list = self._variables[name].foreign_keys or []
        for fk in fks:
            ref_name = fk.reference.resource
            if ref_name is None:
                continue  # skip self-reference
            # todo: temporary fix for scenario references
            # fix will be provided upstream with clear dimension definition.
            if ref_name.startswith("dim_scenario"):
                continue
            if ref_name not in self._variables:
                if ref_name in self._loading:
                    warnings.warn(
                        f"Circular foreign key reference detected: "
                        f"'{ref_name}' is already being loaded while "
                        f"resolving '{name}'. Skipping.",
                        stacklevel=2,
                    )
                    continue  # skip circular reference
                self.add_variable(ref_name)
            # if it is a dimension, add it to the variable,
            # otherwise skip (we only support dimensions as FK targets for now)
            var = self._variables[name]
            dim = self._variables[ref_name]
            if isinstance(var, CrossDataVariable) and isinstance(
                dim, CrossDimension
            ):
                var.add_dimension(dim)
    finally:
        self._loading.remove(name)
    return self._variables[name]

get_variable(name)

Explicit getter method for retrieving a variable (with lazy loading).

Source code in src/crosscontract/registry/registry.py
def get_variable(self, name: str) -> CrossDataVariable | CrossDimension:
    """Explicit getter method for retrieving a variable (with lazy loading)."""
    if name not in self._variables:
        try:
            # Auto-load the variable
            self.add_variable(name)
        except Exception as e:
            # Chain the exception using 'from e' so the user can see
            # IF it was a network/auth error from the client!
            raise KeyError(
                f"Could not load variable '{name}' into registry. "
                f"Original error: {str(e)}"
            ) from e

    return self._variables[name]

Bases: CrossBaseVariable

A variable obtained from the CROSS data platform

Source code in src/crosscontract/registry/data_variable.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
class CrossDataVariable(CrossBaseVariable):
    """A variable obtained from the CROSS data platform"""

    def __init__(
        self,
        contract_resource: ContractResource,
        filters: dict[str, Any] | None = None,
    ):
        """Initialized a data variable with the given contract resource and filters.

        Args:
            contract_resource: The contract resource associated with this variable.
            filters: Additional filters to apply when fetching data (optional).
        """

        super().__init__(contract_resource=contract_resource)
        self._filters = filters
        # a dictionary with the references for each dimension, keyed by the name
        # of the column referring to the dimension (i.e. the foreign key column name)
        self._dimensions: dict[str, CrossDimension] = {}

    @classmethod
    def from_client(
        cls,
        client: CrossClient,
        contract_name: str,
        filters: dict[str, Any] | None = None,
    ) -> Self:
        """Build from a contract fetched via the client.

        Value variables can have filters applied when they are created, which
        will be applied to all data fetched for that variable.

        Args:
            client: An instance of CrossClient to fetch contract details.
            contract_name: The name of the contract to fetch.
            filters: Additional filters to apply when fetching data (optional).
        """
        cr = client.contracts.get(contract_name)
        return cls(contract_resource=cr, filters=filters)

    def __repr__(self) -> str:
        return f"CrossDataVariable(name={self.name}, filters={self._filters})"

    @property
    def dimensions(self) -> dict[str, CrossDimension]:
        """Get the dimensions associated with this variable. The keys of the returned
        dictionary are the names of the columns in this variable that refer to
        dimensions (i.e. the foreign key column names), and the values are the
        corresponding CrossDimension variables.

        Returns:
            dict[str, CrossDimension]: A dictionary mapping foreign key column names to
                CrossDimension variables.
        """
        return self._dimensions.copy()

    def add_dimension(self, item: CrossDimension):
        """Add a dimension variable to the registry, keyed by the referring
        foreign key column name(s).

        Args:
            item (CrossDimension): The CrossDimension variable to add.

        """
        if item in self._dimensions.values():
            return

        matching_fks = [
            fk for fk in self.foreign_keys if fk.reference.resource == item.name
        ]
        if not matching_fks:
            raise ValueError(
                f"No foreign key in '{self.name}' references dimension '{item.name}'. "
                f"Available foreign keys: {self.foreign_keys}"
            )
        for fk in matching_fks:
            if len(fk.fields) != 1:
                raise ValueError(
                    f"Foreign key referencing '{item.name}' has multiple fields: "
                    f"{fk.fields}. Only single-column foreign keys are supported."
                )
            # add to dimensions under the name of the referring column
            ref_column = fk.fields[0]
            existing_dimension = self._dimensions.get(ref_column)
            if existing_dimension is not None and existing_dimension is not item:
                raise ValueError(
                    "Ambiguous foreign key mapping for column "
                    f"'{ref_column}' in '{self.name}': already mapped to "
                    f"dimension '{existing_dimension.name}', cannot also map to "
                    f"'{item.name}'."
                )
            self._dimensions[ref_column] = item

    def _fetch_data(self) -> pd.DataFrame:
        """Fetch the data for this variable from the CROSS platform. The data
        is filtered according to the filters specified when the variable was
        created.

        Returns:
            pd.DataFrame: A DataFrame containing the results for the specified contract.
        """
        filters = self._filters
        df = self.contract_resource.get_data(filters=filters)
        return df

    @staticmethod
    def _get_filter_mask(df: pd.DataFrame, **filters: list[Any] | None) -> pd.Series:
        """Construct a boolean mask for filtering the DataFrame based on the
        provided filters.

        Args:
            df (pd.DataFrame): The DataFrame to filter.
            **filters (list[Any] | None): Keyword arguments with column name as the
                argument name and a list of allowed values for that column as the value.
                For example, `year=[2020, 2021]` will filter the DataFrame to include
                only rows where the 'year' column has values 2020 or 2021. If a filter
                value is None, it will be ignored (i.e., no filtering will be applied
                for that column).

        Returns:
            pd.Series: A boolean Series that can be used to filter the DataFrame.
        """
        # check that all filter keys are valid columns in the dataframe
        invalid = set(filters) - set(df.columns)
        if invalid:
            raise KeyError(
                f"Invalid filter columns: {invalid}. Valid columns are: "
                f"{df.columns.tolist()}"
            )

        mask = pd.Series(True, index=df.index)
        for column, values in filters.items():
            if values is not None:
                mask &= df[column].isin(values)
        return mask

    def _relabel_column_with_title(self, df: pd.DataFrame, column: str) -> pd.DataFrame:
        """Replace the ids in the specified column with the corresponding titles
        from the dimension.

        Args:
            df: The DataFrame containing the column to relabel.
            column: The name of the column to relabel.

        Returns:
            A DataFrame with the specified column relabeled with titles.
        """
        dimension = self.dimensions.get(column)
        if dimension is None:
            return df
        label_map = dimension.label_map
        df_out = df.copy()
        df_out[column] = df_out[column].map(label_map).fillna(df_out[column])
        return df_out

    @staticmethod
    def _aggregate(
        df: pd.DataFrame,
        dimension_col: str,
        dimension_map: dict[Any, Any],
        value_col: str = "value",
        agg_func: str = "sum",
    ) -> pd.DataFrame:
        """Aggregate data using a dimension value mapping.

        This helper is used by the public `get_data` method to aggregate rows after
        remapping values in ``dimension_col`` according to ``dimension_map``. The
        mapping can represent arbitrary groupings (for example, hierarchy levels,
        explicit ID lists, or custom value-to-group mappings).

        Args:
            df (pd.DataFrame): The DataFrame containing the data to aggregate.
            dimension_col (str): The name of the dimension column whose values will
                be remapped before grouping.
            dimension_map (dict[Any, Any]): Mapping from original dimension values
                to target aggregation values. Values not present in the mapping
                are left unchanged.
            value_col: The name of the column containing the values to aggregate.
            agg_func: The aggregation function to use (e.g., "sum", "mean").
        """
        agg_cols = [c for c in df.columns if c != value_col]
        df_out: pd.DataFrame = (
            df.assign(  # is already copy
                **{
                    dimension_col: df[dimension_col]
                    .map(dimension_map)
                    .fillna(df[dimension_col])
                }
            )
            .groupby(agg_cols, as_index=False)[value_col]
            .agg(agg_func)  # type: ignore[assignment]
        )
        return df_out

    def _get_level_mapping(self, col: str, level: int) -> dict[Any, Any]:
        """Get the mapping for aggregating a dimension column to a specified level.

        Args:
            col (str): The name of the dimension column to aggregate.
            level (int): The hierarchy level to aggregate to.

        Returns:
            dict[Any, Any]: A dictionary mapping original dimension IDs to their
            aggregated IDs at the specified level.
        """
        dim = self.dimensions.get(col)

        if dim is None:  # pragma: no cover
            # defensive: already checked in _get_aggregation_mapping
            raise KeyError(
                f"Aggregation specified for column '{col}', but it is not a "
                "registered dimension foreign key. Available dimensions: "
                f"{list(self.dimensions.keys())}"
            )
        dim_map = dim.ancestor_maps.get(level)
        if dim_map is None:
            return {}  # level beyond max depth, no aggregation
        return dim_map

    def _get_ids_mapping(self, col: str, target_ids: list[Any]) -> dict[Any, Any]:
        """Get the mapping for aggregating a dimension column to a specified set of
        target IDs.

        Args:
            col (str): The name of the dimension column to aggregate.
            target_ids (list[Any]): A list of dimension IDs to aggregate to. Each
            original ID will
            be mapped to the nearest ancestor in this list. IDs already in the list
            map to themselves. IDs with no ancestor in the list are left unmapped.

        Returns:
            dict[Any, Any]: A dictionary mapping original dimension IDs to their
            aggregated IDs.
        """
        if col not in self.dimensions:  # pragma: no cover
            # defensive: already checked in _get_aggregation_mapping
            raise KeyError(
                f"Aggregation specified for column '{col}', but it is not a "
                "registered dimension foreign key. Available dimensions: "
                f"{list(self.dimensions.keys())}"
            )
        dim = self.dimensions[col]
        return dim.get_ancestor_map_by_ids(target_ids)

    def _get_aggregation_mapping(
        self,
        aggregation_spec: dict[str, int | list[Any] | dict[Any, Any]],
    ) -> dict[str, dict[Any, Any]]:
        """Build aggregation mappings from a user-facing aggregation specification.

        Translates the per-column aggregation specifications into concrete
        dictionaries that map original dimension values to their aggregated
        counterparts.  The returned mappings are consumed by ``_aggregate``.

        Each column in *aggregation_spec* accepts one of four forms:

        ``int`` - Aggregate to a hierarchy level.
            Every node is mapped to its ancestor at the given level via
            the dimension's precomputed ancestor maps.

            Example::

                {"region": 1}

        ``list[Any]`` - Aggregate to a target set of IDs.
            Every node is mapped to its nearest ancestor that appears in
            the provided list.  Nodes already in the list map to
            themselves.  Nodes with no ancestor in the set are left
            unmapped (kept as-is by ``_aggregate`` via ``fillna``).

            Example::

                {"region": ["cat_a", "cat_b"]}

        ``dict`` **with spec keys** (``"level"``, ``"keep"``) -
            Level-based aggregation with exceptions.

            * ``"level"`` (``int``, required): the hierarchy level to
            aggregate to.
            * ``"keep"`` (``list[Any]``, optional): IDs that are exempt
            from the level-based roll-up and map to themselves.

            Example::

                {"region": {"level": 0, "keep": ["cat_a"]}}

        ``dict`` **without spec keys** - Raw mapping passthrough.
            The dictionary is used as-is.  Unmapped IDs are kept
            unchanged by ``_aggregate`` (via ``fillna``).

            Example::

                {"region": {"leaf_1": "group_x", "leaf_2": "group_x"}}

        Args:
            aggregation_spec: Per-column aggregation specifications.  Keys
                are column names that correspond to registered dimension
                foreign keys (for level / ids modes).
                Values are one of the four forms described above.

        Returns:
            A dictionary keyed by column name whose values are mappings
            from original dimension values to aggregated dimension values.

        Raises:
            TypeError: If a column specification is not ``int``, ``list``,
                or ``dict``.
            ValueError: If a spec-dict uses ``"keep"`` without ``"level"``.
            KeyError: If level-based or id-based aggregation is requested
                for a column that is not a registered dimension.
        """
        spec_keys = {"level", "keep"}

        mapping: dict[str, dict[Any, Any]] = {}
        for col, spec in aggregation_spec.items():
            if col not in self.dimensions:
                raise KeyError(
                    f"Aggregation specified for column '{col}', but it is not a "
                    "registered dimension foreign key. Available dimensions: "
                    f"{list(self.dimensions.keys())}"
                )
            # aggregation to single hierarchical level
            if isinstance(spec, int):
                dim_map = self._get_level_mapping(col, spec)

            # aggregation to specified set of IDs
            elif isinstance(spec, list):
                dim_map = self._get_ids_mapping(col, spec)

            # aggregation to level with exceptions for sub-categories
            elif isinstance(spec, dict) and spec.keys() & spec_keys:
                if "keep" in spec and "level" not in spec:
                    raise ValueError(
                        f"Aggregation spec for '{col}' has 'keep' without "
                        "'level'. 'keep' is only valid together with 'level'. "
                        "If you want to specify IDs to aggregate to without using "
                        " levels, use a list of IDs as the specification instead."
                    )
                elif "level" in spec and "keep" not in spec:
                    # treat as regular level-based aggregation if no exceptions
                    dim_map = self._get_level_mapping(col, spec["level"])
                else:
                    # aggregation to level with exceptions for sub-categories
                    # combine the level-based mapping with the exceptions specified in
                    # "keep" and use the id based mapping logic to map to the nearest
                    # ancestor in the combined set of target IDs
                    # 1. get the level-based mapping by id
                    level_map = self._get_level_mapping(col, spec["level"])
                    # 2. combine the level-based targets with the "keep" exceptions
                    #    to get the full list of target IDs for id-based aggregation
                    target_ids = list(
                        set(level_map.values()) | set(spec.get("keep", []))
                    )
                    # 3. call the mapping logic for id-based aggregation with
                    #    the combined target set
                    dim_map = self._get_ids_mapping(col, target_ids)

            elif isinstance(spec, dict):
                dim_map = spec

            else:
                raise TypeError(
                    f"Invalid aggregation spec for '{col}': expected int, "
                    f"list, or dict, got {type(spec).__name__}."
                )

            mapping[col] = dim_map

        return mapping

    def get_data(
        self,
        filters: dict[str, list[Any]] | None = None,
        columns: list[str] | None = None,
        use_titles: bool = False,
        aggregation: dict[str, int | list[Any] | dict[Any, Any]] | None = None,
        value_col: str = "value",
        agg_func: str = "sum",
    ) -> pd.DataFrame:
        """Get the data for this variable, optionally filtered and aggregated.

        This is the main public interface for retrieving data. It applies
        filtering, aggregation, title relabeling, and column selection in
        that order. Can be overridden in subclasses to add additional
        processing.

        Args:
            filters: Additional filters applied on top of any filters
                specified at variable creation time. Keys are column names,
                values are lists of allowed values for those columns.
            columns: Columns to include in the returned DataFrame. If
                ``None``, all columns are included. Applied last, after
                all other transformations.
            use_titles: If ``True``, replace IDs in dimension foreign key
                columns with the corresponding human-readable labels from
                the dimension's ``label_map``.
            aggregation: Per-column aggregation specifications. Keys are
                dimension foreign key column names. Values control how
                that dimension is aggregated and accept four forms:

                ``int`` — Aggregate to a hierarchy level::

                    # roll up to level 1 (categories)
                    var.get_data(aggregation={"region": 1})

                ``list[Any]`` — Aggregate to a target set of IDs. Each
                node maps to its nearest ancestor in the list::

                    # aggregate to specific nodes
                    var.get_data(aggregation={"region": ["cat_a", "cat_b"]})

                ``dict`` with ``"level"`` and optional ``"keep"`` — Level-
                based aggregation with exceptions. Nodes under a kept ID
                aggregate to that ID instead of rolling up to the level
                target::

                    # roll up to total, but preserve cat_a breakdown
                    var.get_data(
                        aggregation={"region": {"level": 0, "keep": ["cat_a"]}}
                    )

                ``dict`` without spec keys — Raw mapping passthrough.
                Keys are original IDs, values are target IDs. Unmapped
                IDs are kept as-is::

                    var.get_data(
                        aggregation={"region": {"leaf_1": "x", "leaf_2": "x"}}
                    )

                Aggregations are applied sequentially in the order they
                appear in the dictionary.
            value_col: The column containing numeric values to aggregate.
            agg_func: The aggregation function (e.g., ``"sum"``,
                ``"mean"``).

        Returns:
            A DataFrame with the requested filters, aggregations, title
            relabeling, and column selection applied.
        """
        df = self.data  # is already copy
        if filters:
            mask = self._get_filter_mask(df, **filters)
            df = df[mask]

        if aggregation is not None:
            agg_mappings = self._get_aggregation_mapping(aggregation)
            for col, dim_map in agg_mappings.items():
                df = self._aggregate(
                    df,
                    dimension_col=col,
                    dimension_map=dim_map,
                    value_col=value_col,
                    agg_func=agg_func,
                )
        if use_titles:
            cols = [c for c in df.columns if self.dimensions.get(c) is not None]
            for c in cols:
                df = self._relabel_column_with_title(df, c)
        if columns is not None:
            df = df[columns]
        return df

dimensions property

Get the dimensions associated with this variable. The keys of the returned dictionary are the names of the columns in this variable that refer to dimensions (i.e. the foreign key column names), and the values are the corresponding CrossDimension variables.

Returns:

Type Description
dict[str, CrossDimension]

dict[str, CrossDimension]: A dictionary mapping foreign key column names to CrossDimension variables.

__init__(contract_resource, filters=None)

Initialized a data variable with the given contract resource and filters.

Parameters:

Name Type Description Default
contract_resource ContractResource

The contract resource associated with this variable.

required
filters dict[str, Any] | None

Additional filters to apply when fetching data (optional).

None
Source code in src/crosscontract/registry/data_variable.py
def __init__(
    self,
    contract_resource: ContractResource,
    filters: dict[str, Any] | None = None,
):
    """Initialized a data variable with the given contract resource and filters.

    Args:
        contract_resource: The contract resource associated with this variable.
        filters: Additional filters to apply when fetching data (optional).
    """

    super().__init__(contract_resource=contract_resource)
    self._filters = filters
    # a dictionary with the references for each dimension, keyed by the name
    # of the column referring to the dimension (i.e. the foreign key column name)
    self._dimensions: dict[str, CrossDimension] = {}

add_dimension(item)

Add a dimension variable to the registry, keyed by the referring foreign key column name(s).

Parameters:

Name Type Description Default
item CrossDimension

The CrossDimension variable to add.

required
Source code in src/crosscontract/registry/data_variable.py
def add_dimension(self, item: CrossDimension):
    """Add a dimension variable to the registry, keyed by the referring
    foreign key column name(s).

    Args:
        item (CrossDimension): The CrossDimension variable to add.

    """
    if item in self._dimensions.values():
        return

    matching_fks = [
        fk for fk in self.foreign_keys if fk.reference.resource == item.name
    ]
    if not matching_fks:
        raise ValueError(
            f"No foreign key in '{self.name}' references dimension '{item.name}'. "
            f"Available foreign keys: {self.foreign_keys}"
        )
    for fk in matching_fks:
        if len(fk.fields) != 1:
            raise ValueError(
                f"Foreign key referencing '{item.name}' has multiple fields: "
                f"{fk.fields}. Only single-column foreign keys are supported."
            )
        # add to dimensions under the name of the referring column
        ref_column = fk.fields[0]
        existing_dimension = self._dimensions.get(ref_column)
        if existing_dimension is not None and existing_dimension is not item:
            raise ValueError(
                "Ambiguous foreign key mapping for column "
                f"'{ref_column}' in '{self.name}': already mapped to "
                f"dimension '{existing_dimension.name}', cannot also map to "
                f"'{item.name}'."
            )
        self._dimensions[ref_column] = item

from_client(client, contract_name, filters=None) classmethod

Build from a contract fetched via the client.

Value variables can have filters applied when they are created, which will be applied to all data fetched for that variable.

Parameters:

Name Type Description Default
client CrossClient

An instance of CrossClient to fetch contract details.

required
contract_name str

The name of the contract to fetch.

required
filters dict[str, Any] | None

Additional filters to apply when fetching data (optional).

None
Source code in src/crosscontract/registry/data_variable.py
@classmethod
def from_client(
    cls,
    client: CrossClient,
    contract_name: str,
    filters: dict[str, Any] | None = None,
) -> Self:
    """Build from a contract fetched via the client.

    Value variables can have filters applied when they are created, which
    will be applied to all data fetched for that variable.

    Args:
        client: An instance of CrossClient to fetch contract details.
        contract_name: The name of the contract to fetch.
        filters: Additional filters to apply when fetching data (optional).
    """
    cr = client.contracts.get(contract_name)
    return cls(contract_resource=cr, filters=filters)

get_data(filters=None, columns=None, use_titles=False, aggregation=None, value_col='value', agg_func='sum')

Get the data for this variable, optionally filtered and aggregated.

This is the main public interface for retrieving data. It applies filtering, aggregation, title relabeling, and column selection in that order. Can be overridden in subclasses to add additional processing.

Parameters:

Name Type Description Default
filters dict[str, list[Any]] | None

Additional filters applied on top of any filters specified at variable creation time. Keys are column names, values are lists of allowed values for those columns.

None
columns list[str] | None

Columns to include in the returned DataFrame. If None, all columns are included. Applied last, after all other transformations.

None
use_titles bool

If True, replace IDs in dimension foreign key columns with the corresponding human-readable labels from the dimension's label_map.

False
aggregation dict[str, int | list[Any] | dict[Any, Any]] | None

Per-column aggregation specifications. Keys are dimension foreign key column names. Values control how that dimension is aggregated and accept four forms:

int — Aggregate to a hierarchy level::

# roll up to level 1 (categories)
var.get_data(aggregation={"region": 1})

list[Any] — Aggregate to a target set of IDs. Each node maps to its nearest ancestor in the list::

# aggregate to specific nodes
var.get_data(aggregation={"region": ["cat_a", "cat_b"]})

dict with "level" and optional "keep" — Level- based aggregation with exceptions. Nodes under a kept ID aggregate to that ID instead of rolling up to the level target::

# roll up to total, but preserve cat_a breakdown
var.get_data(
    aggregation={"region": {"level": 0, "keep": ["cat_a"]}}
)

dict without spec keys — Raw mapping passthrough. Keys are original IDs, values are target IDs. Unmapped IDs are kept as-is::

var.get_data(
    aggregation={"region": {"leaf_1": "x", "leaf_2": "x"}}
)

Aggregations are applied sequentially in the order they appear in the dictionary.

None
value_col str

The column containing numeric values to aggregate.

'value'
agg_func str

The aggregation function (e.g., "sum", "mean").

'sum'

Returns:

Type Description
DataFrame

A DataFrame with the requested filters, aggregations, title

DataFrame

relabeling, and column selection applied.

Source code in src/crosscontract/registry/data_variable.py
def get_data(
    self,
    filters: dict[str, list[Any]] | None = None,
    columns: list[str] | None = None,
    use_titles: bool = False,
    aggregation: dict[str, int | list[Any] | dict[Any, Any]] | None = None,
    value_col: str = "value",
    agg_func: str = "sum",
) -> pd.DataFrame:
    """Get the data for this variable, optionally filtered and aggregated.

    This is the main public interface for retrieving data. It applies
    filtering, aggregation, title relabeling, and column selection in
    that order. Can be overridden in subclasses to add additional
    processing.

    Args:
        filters: Additional filters applied on top of any filters
            specified at variable creation time. Keys are column names,
            values are lists of allowed values for those columns.
        columns: Columns to include in the returned DataFrame. If
            ``None``, all columns are included. Applied last, after
            all other transformations.
        use_titles: If ``True``, replace IDs in dimension foreign key
            columns with the corresponding human-readable labels from
            the dimension's ``label_map``.
        aggregation: Per-column aggregation specifications. Keys are
            dimension foreign key column names. Values control how
            that dimension is aggregated and accept four forms:

            ``int`` — Aggregate to a hierarchy level::

                # roll up to level 1 (categories)
                var.get_data(aggregation={"region": 1})

            ``list[Any]`` — Aggregate to a target set of IDs. Each
            node maps to its nearest ancestor in the list::

                # aggregate to specific nodes
                var.get_data(aggregation={"region": ["cat_a", "cat_b"]})

            ``dict`` with ``"level"`` and optional ``"keep"`` — Level-
            based aggregation with exceptions. Nodes under a kept ID
            aggregate to that ID instead of rolling up to the level
            target::

                # roll up to total, but preserve cat_a breakdown
                var.get_data(
                    aggregation={"region": {"level": 0, "keep": ["cat_a"]}}
                )

            ``dict`` without spec keys — Raw mapping passthrough.
            Keys are original IDs, values are target IDs. Unmapped
            IDs are kept as-is::

                var.get_data(
                    aggregation={"region": {"leaf_1": "x", "leaf_2": "x"}}
                )

            Aggregations are applied sequentially in the order they
            appear in the dictionary.
        value_col: The column containing numeric values to aggregate.
        agg_func: The aggregation function (e.g., ``"sum"``,
            ``"mean"``).

    Returns:
        A DataFrame with the requested filters, aggregations, title
        relabeling, and column selection applied.
    """
    df = self.data  # is already copy
    if filters:
        mask = self._get_filter_mask(df, **filters)
        df = df[mask]

    if aggregation is not None:
        agg_mappings = self._get_aggregation_mapping(aggregation)
        for col, dim_map in agg_mappings.items():
            df = self._aggregate(
                df,
                dimension_col=col,
                dimension_map=dim_map,
                value_col=value_col,
                agg_func=agg_func,
            )
    if use_titles:
        cols = [c for c in df.columns if self.dimensions.get(c) is not None]
        for c in cols:
            df = self._relabel_column_with_title(df, c)
    if columns is not None:
        df = df[columns]
    return df

Bases: CrossBaseVariable

Dimension variable obtained from the CROSS data platform.

Dimensions have additional methods for handling hierarchical relationships and aggregations.

Source code in src/crosscontract/registry/dimension.py
class CrossDimension(CrossBaseVariable):
    """Dimension variable obtained from the CROSS data platform.

    Dimensions have additional methods for handling hierarchical relationships
    and aggregations.
    """

    def __init__(
        self,
        contract_resource: ContractResource,
    ):
        super().__init__(contract_resource)
        self._ancestor_maps: dict[int, dict[str, str]] | None = None
        self._label_map: dict[str, str] | None = None
        self._ancestry_chains: dict[Any, list[Any]] | None = None

    def __str__(self):
        return f"Dimension(name={self.name})"

    @property
    def ancestor_maps(self) -> dict[int, dict[str, str]]:
        """Precompute and return ancestor maps for all aggregation levels.

        Returns:
            dict[int, dict[str, str]]: A dictionary where keys are aggregation levels
                and values are dictionaries mapping dimension IDs to their ancestor
                IDs at that level.

        """
        if self._ancestor_maps is None:
            self._ancestor_maps = self._build_ancestor_maps()
        # return copy
        return {level: mapping.copy() for level, mapping in self._ancestor_maps.items()}

    @property
    def label_map(self) -> dict[str, str]:
        """Return a mapping from id to label for the dimension.

        Returns:
            dict[str, str]: A dictionary mapping dimension IDs to their labels.
        """
        if self._label_map is None:
            self._label_map = dict(
                zip(self.data["id"], self.data["label"], strict=True)
            )
        return self._label_map.copy()

    def _build_ancestor_maps(self) -> dict[int, dict[str, str]]:
        """Precompute ancestor mappings for all aggregation levels.

        Returns:
            dict[int, dict[str, str]]: A dictionary where keys are aggregation levels
                and values are dictionaries mapping dimension IDs to their ancestor
                IDs at that level.
        """
        dim = self.data.set_index("id")
        max_level = int(dim["level"].max())

        ids = dim.index.values
        levels = dim["level"].values
        parents = dim["id_parent"].values

        # Map each id to a positional index for fast numpy lookups
        id_to_pos = {id_val: pos for pos, id_val in enumerate(ids)}
        parent_pos = np.array([id_to_pos.get(p, i) for i, p in enumerate(parents)])

        ancestor_maps = {}
        for agg_level in range(max_level):
            # Start with each node as its own ancestor (by position)
            anc = np.arange(len(ids))

            # Walk one level at a time, low → high.
            # Nodes at level <= agg_level keep themselves.
            # At each higher level, inherit the (already-resolved) parent's ancestor.
            for lvl in range(agg_level + 1, max_level + 1):
                mask = levels == lvl
                anc[mask] = anc[parent_pos[mask]]

            ancestor_maps[agg_level] = dict(zip(ids, ids[anc], strict=True))

        return ancestor_maps

    def _build_ancestry_chains(self) -> dict[Any, list[Any]]:
        """Precompute the full ancestry chain for every node in the dimension.

        Each chain is an ordered list starting from the node itself and
        walking up through parents to the root.  For example, in a
        three-level hierarchy, ``leaf_1``'s chain would be
        ``["leaf_1", "cat_a", "total"]``.

        The result is cached and reused by ``get_ancestor_map_by_ids``
        to avoid repeated tree walks.

        Returns:
            A dictionary mapping each dimension ID to its ancestry chain
            (a list from self to root, inclusive).
        """
        df_dim = self.data.set_index("id")
        chains: dict[Any, list[Any]] = {}
        for node_id in df_dim.index:
            chain = []
            current = node_id
            seen = set()
            while current is not None and not pd.isna(current):
                if current in seen:
                    break
                chain.append(current)
                seen.add(current)
                current = (
                    df_dim.at[current, "id_parent"] if current in df_dim.index else None
                )
            chains[node_id] = chain
        return chains

    def get_ancestor_map_by_ids(self, target_ids: list[Any]) -> dict[Any, Any]:
        """Build a mapping from every node to its nearest ancestor in the
        target set.

        For each node, walks its precomputed ancestry chain until it finds
        an ID present in *target_ids*.  Nodes already in the set map to
        themselves.  Nodes whose ancestry chain does not intersect the
        target set are omitted from the returned mapping (and will be
        kept as-is by ``_aggregate`` via ``fillna``).

        Args:
            target_ids: The dimension IDs defining the aggregation targets.
                Must be a subset of the IDs in the dimension data.

        Returns:
            A dictionary mapping dimension IDs to their nearest ancestor
            in the target set.
        """
        if self._ancestry_chains is None:
            self._ancestry_chains = self._build_ancestry_chains()

        target_set = set(target_ids)
        dim_map: dict[Any, Any] = {}
        for node_id, chain in self._ancestry_chains.items():
            for ancestor in chain:
                if ancestor in target_set:
                    dim_map[node_id] = ancestor
                    break
        return dim_map

    def clear_data_cache(self):
        super().clear_data_cache()
        self._ancestor_maps = None
        self._label_map = None
        self._ancestry_chains = None

ancestor_maps property

Precompute and return ancestor maps for all aggregation levels.

Returns:

Type Description
dict[int, dict[str, str]]

dict[int, dict[str, str]]: A dictionary where keys are aggregation levels and values are dictionaries mapping dimension IDs to their ancestor IDs at that level.

label_map property

Return a mapping from id to label for the dimension.

Returns:

Type Description
dict[str, str]

dict[str, str]: A dictionary mapping dimension IDs to their labels.

get_ancestor_map_by_ids(target_ids)

Build a mapping from every node to its nearest ancestor in the target set.

For each node, walks its precomputed ancestry chain until it finds an ID present in target_ids. Nodes already in the set map to themselves. Nodes whose ancestry chain does not intersect the target set are omitted from the returned mapping (and will be kept as-is by _aggregate via fillna).

Parameters:

Name Type Description Default
target_ids list[Any]

The dimension IDs defining the aggregation targets. Must be a subset of the IDs in the dimension data.

required

Returns:

Type Description
dict[Any, Any]

A dictionary mapping dimension IDs to their nearest ancestor

dict[Any, Any]

in the target set.

Source code in src/crosscontract/registry/dimension.py
def get_ancestor_map_by_ids(self, target_ids: list[Any]) -> dict[Any, Any]:
    """Build a mapping from every node to its nearest ancestor in the
    target set.

    For each node, walks its precomputed ancestry chain until it finds
    an ID present in *target_ids*.  Nodes already in the set map to
    themselves.  Nodes whose ancestry chain does not intersect the
    target set are omitted from the returned mapping (and will be
    kept as-is by ``_aggregate`` via ``fillna``).

    Args:
        target_ids: The dimension IDs defining the aggregation targets.
            Must be a subset of the IDs in the dimension data.

    Returns:
        A dictionary mapping dimension IDs to their nearest ancestor
        in the target set.
    """
    if self._ancestry_chains is None:
        self._ancestry_chains = self._build_ancestry_chains()

    target_set = set(target_ids)
    dim_map: dict[Any, Any] = {}
    for node_id, chain in self._ancestry_chains.items():
        for ancestor in chain:
            if ancestor in target_set:
                dim_map[node_id] = ancestor
                break
    return dim_map