Skip to content

API Reference: CrossContract

BaseContract

Bases: BaseMetaData

The BaseContract class is the most basic representation of a data contract. It combines the minimum required metadata with the contract structure given by Schema.

It serves as the foundational blueprint for defining data contracts. Any custom contract implementation MUST inherit from this class to ensure structural consistency and compatibility with the system.

Attributes:

Name Type Description
name str

A unique identifier for the data contract. Must contain only alphanumeric characters, underscores, or hyphens. Maximum length is 100 characters.

tableschema TableSchema

The schema defining the structure of the contract (fields, primary keys, foreign keys, field descriptors).

Example

To implement a custom contract with additional metadata:

from pydantic import Field
from crosscontract.contracts import BaseContract

class MyCustomContract(BaseContract):
    # Add custom metadata fields
    owner: str = Field(description="The owner of this dataset")
    version: str = Field(description="Semantic version of the contract")

    # The 'schema' field is already inherited from BaseContract!
Source code in src/crosscontract/contracts/contracts/base_contract.py
class BaseContract(BaseMetaData):
    """
    The BaseContract class is the most basic representation of a data contract.
    It combines the minimum required metadata with the contract structure given by
    Schema.

    It serves as the foundational blueprint for defining data contracts.
    Any custom contract implementation MUST inherit from this class to ensure
    structural consistency and compatibility with the system.

    Attributes:
        name (str): A unique identifier for the data contract.
            Must contain only alphanumeric characters, underscores, or hyphens.
            Maximum length is 100 characters.
        tableschema (TableSchema): The schema defining the structure of the contract
            (fields, primary keys, foreign keys, field descriptors).

    Example:
        To implement a custom contract with additional metadata:

        ```python
        from pydantic import Field
        from crosscontract.contracts import BaseContract

        class MyCustomContract(BaseContract):
            # Add custom metadata fields
            owner: str = Field(description="The owner of this dataset")
            version: str = Field(description="Semantic version of the contract")

            # The 'schema' field is already inherited from BaseContract!
        ```
    """

    model_config = ConfigDict(populate_by_name=True, extra="forbid")

    tableschema: TableSchema = Field(
        description="The Frictionless Table Schema definition.",
    )

    @classmethod
    def from_file(cls, file_path: str | Path) -> Self:
        """
        Load a BaseContract from a YAML or JSON file.

        Args:
            file_path (str | Path): The path to the YAML or JSON file.

        Returns:
            Self: An instance of BaseContract loaded from the file.

        Raises:
            FileNotFoundError: If the specified file does not exist.
            ValueError: If the file format is not supported (not .json, .yaml, or .yml).
        """
        data = read_yaml_or_json_file(file_path)
        return cls.model_validate(data)

    @model_validator(mode="after")
    def validate_self_reference(self) -> Self:
        """Validate that self-referencing foreign keys are given as None on the
        resource field. Raise if a reference has the same name as the contract itself.
        """
        for fk in self.tableschema.foreignKeys:
            if fk.reference.resource == self.name:
                raise ValueError(
                    f"Foreign key reference resource '{fk.reference.resource}' "
                    "cannot be the same as the contract name. Self-references must "
                    "use None for the resource field."
                )
        return self

from_file(file_path) classmethod

Load a BaseContract from a YAML or JSON file.

Parameters:

Name Type Description Default
file_path str | Path

The path to the YAML or JSON file.

required

Returns:

Name Type Description
Self Self

An instance of BaseContract loaded from the file.

Raises:

Type Description
FileNotFoundError

If the specified file does not exist.

ValueError

If the file format is not supported (not .json, .yaml, or .yml).

Source code in src/crosscontract/contracts/contracts/base_contract.py
@classmethod
def from_file(cls, file_path: str | Path) -> Self:
    """
    Load a BaseContract from a YAML or JSON file.

    Args:
        file_path (str | Path): The path to the YAML or JSON file.

    Returns:
        Self: An instance of BaseContract loaded from the file.

    Raises:
        FileNotFoundError: If the specified file does not exist.
        ValueError: If the file format is not supported (not .json, .yaml, or .yml).
    """
    data = read_yaml_or_json_file(file_path)
    return cls.model_validate(data)

validate_self_reference()

Validate that self-referencing foreign keys are given as None on the resource field. Raise if a reference has the same name as the contract itself.

Source code in src/crosscontract/contracts/contracts/base_contract.py
@model_validator(mode="after")
def validate_self_reference(self) -> Self:
    """Validate that self-referencing foreign keys are given as None on the
    resource field. Raise if a reference has the same name as the contract itself.
    """
    for fk in self.tableschema.foreignKeys:
        if fk.reference.resource == self.name:
            raise ValueError(
                f"Foreign key reference resource '{fk.reference.resource}' "
                "cannot be the same as the contract name. Self-references must "
                "use None for the resource field."
            )
    return self

CrossContract

Bases: BaseContract, CrossMetaData

A concrete implementation of a data contract for the CrossContract system.

This class extends BaseContract by adding tagging capabilities. It serves as the standard contract definition for resources within the CrossContract ecosystem.

Attributes:

Name Type Description
name str

A unique identifier for the data contract. Must contain only alphanumeric characters, underscores, or hyphens. Inherited from BaseContract.

title str

A human-readable title for the data.

description str

A human-readable description of the data.

tags list[str] | None

A list of tags used for categorization and filtering.

schema Schema

The Frictionless Table Schema definition. Accessible via the schema property as well.

Source code in src/crosscontract/contracts/contracts/cross_contract.py
class CrossContract(BaseContract, CrossMetaData):
    """
    A concrete implementation of a data contract for the CrossContract system.

    This class extends `BaseContract` by adding tagging capabilities.
    It serves as the standard contract definition for resources within the
    CrossContract ecosystem.

    Attributes:
        name (str): A unique identifier for the data contract.
            Must contain only alphanumeric characters, underscores, or hyphens.
            Inherited from BaseContract.
        title (str): A human-readable title for the data.
        description (str): A human-readable description of the data.
        tags (list[str] | None): A list of tags used for categorization and filtering.
        schema (Schema): The Frictionless Table Schema definition.
            Accessible via the `schema` property as well.
    """

    model_config = ConfigDict(
        populate_by_name=True,
        extra="forbid",
        serialize_by_alias=True,
    )

SchemaValidationError

Bases: Exception

Source code in src/crosscontract/contracts/schema/exceptions/validation_error.py
class SchemaValidationError(Exception):
    def __init__(
        self,
        message: str,
        schema_errors: pa.errors.SchemaErrors | None = None,
    ):
        """Initialize SchemaValidationError with optional pandera schema errors.

        Args:
            message (str): The error message.
            schema_errors (pa.errors.SchemaErrors, optional): Pandera SchemaErrors
                exception to parse for detailed error information.
        """
        super().__init__(message)
        self.message = message
        self._schema_errors = schema_errors
        self._parsed_errors: list[dict[Hashable, Any]] | None = None

    @property
    def errors(self) -> list[dict[Hashable, Any]]:
        """Lazy-loads and parses the error details."""
        if self._parsed_errors is None:  # pragma: no cover
            self._parsed_errors = self._parse_pandera_errors()
        return self._parsed_errors

    def to_list(self) -> list[dict[Hashable, Any]]:
        """Return the errors as a list of dictionaries.

        Useful for API responses (JSON serialization).
        Alias for .errors.
        """
        return self.errors

    def to_pandas(self) -> pd.DataFrame:  # pragma: no cover
        """Return the errors as a pandas DataFrame.

        Useful for client-side debugging in Jupyter Notebooks.
        """
        return pd.DataFrame(self.errors)

    def _parse_pandera_errors(self) -> list[dict[Hashable, Any]]:
        """Parse pandera SchemaErrors into a list of error details.

        Returns:
            list[dict[str, Any]]: A list of error details dictionaries.
        """
        if self._schema_errors is None:
            return []
        e = self._schema_errors
        df_failures: pd.DataFrame = e.failure_cases

        # 1. CLEAN TYPE COERCION ERROR: We only keep the rows that failed coercion,
        # but delete the redundant dtype errors (for the whole column)
        coercion_mask = df_failures["check"].str.startswith("coerce_dtype")
        coercion_failed_cols = df_failures[coercion_mask]["column"].unique()
        is_redundant_dtype = (df_failures["check"].str.startswith("dtype")) & (
            df_failures["column"].isin(coercion_failed_cols)
        )
        df_failures = df_failures[~is_redundant_dtype].copy()

        # 2 CLEAN REFERENCE ERRORS
        df_failures = self._parse_reference_errors(df_failures, data=e.data)

        # 3. Format for Output (JSON safe)
        df_errors = (
            df_failures.replace({float("nan"): None})
            .sort_values(by=["check", "index"])
            .to_dict(orient="records")
        )
        return df_errors

    def _parse_reference_errors(
        self, df_failures: pd.DataFrame, data: pd.DataFrame
    ) -> pd.DataFrame:
        """Parse pandera SchemaErrors related to foreign key violations by combining
        the error messages for multiple rows into a single message per reference
        violation.

        Note: The function relies on the column names provided in the name of the
        check. They have to be given as:
        "ForeignKeyError: ['col1', 'col2']"
        PrimaryKeyError: ['col1', 'col2']}

        Args:
            df_failure (pd.DataFrame): The DataFrame containing the pandera failure
                cases.
            data (pd.DataFrame): The original DataFrame that was validated.

        Returns:
            pd.DataFrame: DataFrame with combined reference error messages.
        """
        reference_errors = ["ForeignKeyError", "PrimaryKeyError"]

        # 1. Identify reference errors
        is_ref_error = df_failures["check"].str.contains(
            "|".join(reference_errors), regex=True
        )
        df_refs = df_failures[is_ref_error].copy()
        df_others = df_failures[~is_ref_error]

        if df_refs.empty:
            return df_failures

        # 2. Remove duplicate rows per check type. We only need one row per check
        # and index to report the failure cases
        df_refs = df_refs.drop_duplicates(subset=["check", "index"]).copy()

        # 3. Lookup values of the failure cases from the original data
        for check_name in df_refs["check"].unique():
            target_cols = self._extract_cols(check_name)
            if not target_cols:  # pragma: no cover
                continue

            # Identify rows belonging to the current check
            mask = df_refs["check"] == check_name
            error_indices = df_refs.loc[mask, "index"]

            # Fetch the failure cases from the original data
            try:
                # Different handling for pandas and other backends could be
                # implemented here
                actual_values = self._lookup_values_pandas(
                    data, error_indices, target_cols
                )

                # Assign back to    the failure report
                df_refs.loc[mask, "failure_case"] = pd.Series(
                    actual_values, index=df_refs.loc[mask].index
                )
                df_refs.loc[mask, "column"] = ", ".join(target_cols)

            except KeyError:  # pragma: no cover
                # Fallback if indices/columns are missing (edge cases)
                continue

        # 4. Recombine with non-reference errors and return
        df_out = pd.concat([df_others, df_refs], ignore_index=True)
        return df_out

    def _lookup_values_pandas(
        self, data: pd.DataFrame, indices: pd.Series, cols: list[str]
    ) -> list[Any]:
        """Fetch values from the original dataframe. Here it assumed that the
        original dataframe is a pandas DataFrame.

        Note: Implementations for other backends (e.g., Polars) would need to
            provide their own version of this method.

        Args:
            data (pd.DataFrame): The original DataFrame.
            indices (pd.Series): The indices of the rows to fetch.
            cols (list[str]): The columns to fetch.
        """
        subset = data.loc[indices, cols]

        # Handle potential index duplication in source data
        if len(subset) != len(indices):
            # is tested but coverage does not verify this branch
            subset = subset[~subset.index.duplicated(keep="first")]  # pragma: no cover

        # Return as list of strings/tuples
        out_list = list(subset.itertuples(index=False, name=None))
        return out_list

    @staticmethod
    def _extract_cols(check_name: str) -> list[str]:
        """Helper to parse list string from check name.

        Args:
            check_name (str): The name of the check containing the list string.

        Returns:
            list[str]: The parsed list of column names.
        """
        match = re.search(r"(\[.*?\])", str(check_name))
        # note: code is tested but coverage does not verify this branch
        if match:
            try:
                return ast.literal_eval(match.group(1))
            except (ValueError, SyntaxError):  # pragma: no cover
                pass
        return []  # pragma: no cover

errors property

Lazy-loads and parses the error details.

__init__(message, schema_errors=None)

Initialize SchemaValidationError with optional pandera schema errors.

Parameters:

Name Type Description Default
message str

The error message.

required
schema_errors SchemaErrors

Pandera SchemaErrors exception to parse for detailed error information.

None
Source code in src/crosscontract/contracts/schema/exceptions/validation_error.py
def __init__(
    self,
    message: str,
    schema_errors: pa.errors.SchemaErrors | None = None,
):
    """Initialize SchemaValidationError with optional pandera schema errors.

    Args:
        message (str): The error message.
        schema_errors (pa.errors.SchemaErrors, optional): Pandera SchemaErrors
            exception to parse for detailed error information.
    """
    super().__init__(message)
    self.message = message
    self._schema_errors = schema_errors
    self._parsed_errors: list[dict[Hashable, Any]] | None = None

to_list()

Return the errors as a list of dictionaries.

Useful for API responses (JSON serialization). Alias for .errors.

Source code in src/crosscontract/contracts/schema/exceptions/validation_error.py
def to_list(self) -> list[dict[Hashable, Any]]:
    """Return the errors as a list of dictionaries.

    Useful for API responses (JSON serialization).
    Alias for .errors.
    """
    return self.errors

to_pandas()

Return the errors as a pandas DataFrame.

Useful for client-side debugging in Jupyter Notebooks.

Source code in src/crosscontract/contracts/schema/exceptions/validation_error.py
def to_pandas(self) -> pd.DataFrame:  # pragma: no cover
    """Return the errors as a pandas DataFrame.

    Useful for client-side debugging in Jupyter Notebooks.
    """
    return pd.DataFrame(self.errors)

TableSchema

Bases: BaseModel

A Frictionless Table Schema compatible schema definition. Includes fields, primary keys, foreign keys, and field descriptors.

Source code in src/crosscontract/contracts/schema/schema.py
class TableSchema(BaseModel):
    """
    A Frictionless Table Schema compatible schema definition.
    Includes fields, primary keys, foreign keys, and field descriptors.
    """

    model_config = ConfigDict(
        title="TableSchema", ignored_types=(cached_property,), str_strip_whitespace=True
    )

    fields: list[FieldUnion] = Field(
        default_factory=list,
        description="An `array` of Table Schema Field objects.",
        min_length=1,
    )
    primaryKey: PrimaryKey = Field(
        default_factory=PrimaryKey,
        description=(
            "The primary key definition. Primary keys are used to uniquely "
            "identify records in the data."
        ),
    )
    foreignKeys: ForeignKeys = Field(
        default_factory=ForeignKeys,
        description=(
            "The foreign key definitions. Foreign keys are used to establish "
            "relationships between tables."
        ),
    )
    fieldDescriptors: FieldDescriptors | None = None

    # def __iter__(self) -> Iterator[FieldUnion]:
    #     return iter(self.fields)

    def field_iterator(self) -> Iterator[FieldUnion]:
        """Returns an iterator over the fields in the schema."""
        return iter(self.fields)

    def __getitem__(self, key: int | str) -> FieldUnion:
        if isinstance(key, int):
            return self.fields[key]
        try:
            return self._name_index[key]
        except KeyError as e:
            raise KeyError(f"Field '{key}' not found in Schema.") from e

    def __len__(self) -> int:
        return len(self.fields)

    @cached_property
    def _name_index(self) -> dict[str, FieldUnion]:
        """
        Creates a dictionary mapping field names to field objects.
        This runs only once when accessed, providing O(1) lookups thereafter.
        """
        return {field.name: field for field in self.fields}

    @property
    def field_names(self) -> list[str]:
        """Returns a list of all field names."""
        return list(self._name_index)

    def get(self, name: str) -> FieldUnion | None:
        """Returns the field by name, or None if it doesn't exist."""
        return self._name_index.get(name)

    def has_fields(self, field_names: str | list[str]) -> bool:
        """Check if a field with the given name exists in the data contract."""
        if isinstance(field_names, str):
            return field_names in self.field_names
        else:
            return all(name in self.field_names for name in field_names)

    @model_validator(mode="after")
    def validate_structural_integrity(self) -> "TableSchema":
        """
        Validates that all key definitions refer to fields that actually
        exist in the schema.
        """
        valid_fields = self.field_names

        if self.primaryKey:
            self.primaryKey.validate_fields(valid_fields)

        if self.foreignKeys:
            for fk in self.foreignKeys:
                fk.validate_fields(valid_fields)
                if fk.reference.resource is None:
                    fk.validate_referenced_fields(valid_fields)

        if self.fieldDescriptors is not None:
            self.fieldDescriptors.validate_all_exist(valid_fields)
        return self

    @classmethod
    def from_file(cls, file_path: str | Path) -> Self:
        data = read_yaml_or_json_file(file_path)
        return cls.model_validate(data)

    def to_sa_table(
        self, metadata: MetaData | None = None, table_name: str | None = None
    ) -> Table:
        from .converter import convert_schema_to_sqlalchemy

        if metadata is None:
            metadata = MetaData()
        if table_name is None:
            table_name = f"dct_{getattr(self, 'name', 'contract_table')}"
        return convert_schema_to_sqlalchemy(
            self, metadata=metadata, table_name=table_name
        )

    def to_pandera_schema(
        self,
        name: str | None = None,
    ) -> pa.DataFrameSchema:
        from .converter import convert_schema_to_pandera

        if name is None:
            name = getattr(self, "name", "contract_schema")

        pandera_schema: pa.DataFrameSchema = convert_schema_to_pandera(self, name=name)

        return pandera_schema

    def to_pydantic_model(
        self, model_name: str | None = None, base_class: type[BaseModel] | None = None
    ) -> type[BaseModel]:
        from .converter import convert_schema_to_pydantic

        if model_name is None:
            model_name = getattr(self, "name", "ContractModel")
        return convert_schema_to_pydantic(self, name=model_name, base_class=base_class)

    def validate_dataframe(
        self,
        df: Any,
        primary_key_values: list[tuple[Any, ...]] | None = None,
        foreign_key_values: dict[tuple[str, ...], list[tuple[Any, ...]]] | None = None,
        skip_primary_key_validation: bool = False,
        skip_foreign_key_validation: bool = False,
        lazy: bool = True,
        backend: Literal["pandas"] = "pandas",
    ) -> None:
        """Validate a DataFrame against the schema.
        It allows to provide existing primary key and foreign key values for validation.
        If provided, the primary key uniqueness is checked against the union of the
        existing and the DataFrame values. Similarly, foreign key integrity is checked
        against the union of existing and DataFrame values in case of self-referencing
        foreign keys.

        Args:
            df (Any): The DataFrame to validate.
            primary_key_values (list[tuple[Any, ...]] | None): Existing primary key
                values to check for uniqueness.
                Note: The uniqueness of the primary key is validated is checked against
                    the union of the provided values and the values in the DataFrame.
            foreign_key_values (dict[tuple[str, ...], list[tuple[Any, ...]]] | None):
                Existing foreign key values to check against. This is provided as a
                dictionary where the keys are the tuples of fields that refer to the
                referenced values, and the values are lists of tuples representing the
                existing referenced values.
                Note: In the case of self-referencing foreign keys, the values in the
                    DataFrame are considered automatically, i.e., the referring fields
                    are validated against the union of the provided values and the
                    values in the DataFrame.
            skip_primary_key_validation (bool): Whether to skip primary key validation.
            skip_foreign_key_validation (bool): Whether to skip foreign key validation.
            lazy (bool): Whether to perform lazy validation, collecting all errors.
                Defaults to True.
            backend (Literal["pandas"]): The backend to use for validation.
                Currently, only "pandas" is supported.
        Raises:
            pandera.errors.SchemaErrors: If the DataFrame does not conform to the
            schema.
        """
        if backend == "pandas":
            from .validation.validate_pandas_dataframe import (
                validate_pandas_dataframe,
            )

            validate_pandas_dataframe(
                schema=self,
                df=df,
                primary_key_values=primary_key_values,
                foreign_key_values=foreign_key_values,
                skip_primary_key_validation=skip_primary_key_validation,
                skip_foreign_key_validation=skip_foreign_key_validation,
                lazy=lazy,
            )
        else:
            raise ValueError(
                f"Unsupported backend '{backend}' for DataFrame validation."
            )

field_names property

Returns a list of all field names.

field_iterator()

Returns an iterator over the fields in the schema.

Source code in src/crosscontract/contracts/schema/schema.py
def field_iterator(self) -> Iterator[FieldUnion]:
    """Returns an iterator over the fields in the schema."""
    return iter(self.fields)

get(name)

Returns the field by name, or None if it doesn't exist.

Source code in src/crosscontract/contracts/schema/schema.py
def get(self, name: str) -> FieldUnion | None:
    """Returns the field by name, or None if it doesn't exist."""
    return self._name_index.get(name)

has_fields(field_names)

Check if a field with the given name exists in the data contract.

Source code in src/crosscontract/contracts/schema/schema.py
def has_fields(self, field_names: str | list[str]) -> bool:
    """Check if a field with the given name exists in the data contract."""
    if isinstance(field_names, str):
        return field_names in self.field_names
    else:
        return all(name in self.field_names for name in field_names)

validate_dataframe(df, primary_key_values=None, foreign_key_values=None, skip_primary_key_validation=False, skip_foreign_key_validation=False, lazy=True, backend='pandas')

Validate a DataFrame against the schema. It allows to provide existing primary key and foreign key values for validation. If provided, the primary key uniqueness is checked against the union of the existing and the DataFrame values. Similarly, foreign key integrity is checked against the union of existing and DataFrame values in case of self-referencing foreign keys.

Parameters:

Name Type Description Default
df Any

The DataFrame to validate.

required
primary_key_values list[tuple[Any, ...]] | None

Existing primary key values to check for uniqueness. Note: The uniqueness of the primary key is validated is checked against the union of the provided values and the values in the DataFrame.

None
foreign_key_values dict[tuple[str, ...], list[tuple[Any, ...]]] | None

Existing foreign key values to check against. This is provided as a dictionary where the keys are the tuples of fields that refer to the referenced values, and the values are lists of tuples representing the existing referenced values. Note: In the case of self-referencing foreign keys, the values in the DataFrame are considered automatically, i.e., the referring fields are validated against the union of the provided values and the values in the DataFrame.

None
skip_primary_key_validation bool

Whether to skip primary key validation.

False
skip_foreign_key_validation bool

Whether to skip foreign key validation.

False
lazy bool

Whether to perform lazy validation, collecting all errors. Defaults to True.

True
backend Literal['pandas']

The backend to use for validation. Currently, only "pandas" is supported.

'pandas'

Raises: pandera.errors.SchemaErrors: If the DataFrame does not conform to the schema.

Source code in src/crosscontract/contracts/schema/schema.py
def validate_dataframe(
    self,
    df: Any,
    primary_key_values: list[tuple[Any, ...]] | None = None,
    foreign_key_values: dict[tuple[str, ...], list[tuple[Any, ...]]] | None = None,
    skip_primary_key_validation: bool = False,
    skip_foreign_key_validation: bool = False,
    lazy: bool = True,
    backend: Literal["pandas"] = "pandas",
) -> None:
    """Validate a DataFrame against the schema.
    It allows to provide existing primary key and foreign key values for validation.
    If provided, the primary key uniqueness is checked against the union of the
    existing and the DataFrame values. Similarly, foreign key integrity is checked
    against the union of existing and DataFrame values in case of self-referencing
    foreign keys.

    Args:
        df (Any): The DataFrame to validate.
        primary_key_values (list[tuple[Any, ...]] | None): Existing primary key
            values to check for uniqueness.
            Note: The uniqueness of the primary key is validated is checked against
                the union of the provided values and the values in the DataFrame.
        foreign_key_values (dict[tuple[str, ...], list[tuple[Any, ...]]] | None):
            Existing foreign key values to check against. This is provided as a
            dictionary where the keys are the tuples of fields that refer to the
            referenced values, and the values are lists of tuples representing the
            existing referenced values.
            Note: In the case of self-referencing foreign keys, the values in the
                DataFrame are considered automatically, i.e., the referring fields
                are validated against the union of the provided values and the
                values in the DataFrame.
        skip_primary_key_validation (bool): Whether to skip primary key validation.
        skip_foreign_key_validation (bool): Whether to skip foreign key validation.
        lazy (bool): Whether to perform lazy validation, collecting all errors.
            Defaults to True.
        backend (Literal["pandas"]): The backend to use for validation.
            Currently, only "pandas" is supported.
    Raises:
        pandera.errors.SchemaErrors: If the DataFrame does not conform to the
        schema.
    """
    if backend == "pandas":
        from .validation.validate_pandas_dataframe import (
            validate_pandas_dataframe,
        )

        validate_pandas_dataframe(
            schema=self,
            df=df,
            primary_key_values=primary_key_values,
            foreign_key_values=foreign_key_values,
            skip_primary_key_validation=skip_primary_key_validation,
            skip_foreign_key_validation=skip_foreign_key_validation,
            lazy=lazy,
        )
    else:
        raise ValueError(
            f"Unsupported backend '{backend}' for DataFrame validation."
        )

validate_structural_integrity()

Validates that all key definitions refer to fields that actually exist in the schema.

Source code in src/crosscontract/contracts/schema/schema.py
@model_validator(mode="after")
def validate_structural_integrity(self) -> "TableSchema":
    """
    Validates that all key definitions refer to fields that actually
    exist in the schema.
    """
    valid_fields = self.field_names

    if self.primaryKey:
        self.primaryKey.validate_fields(valid_fields)

    if self.foreignKeys:
        for fk in self.foreignKeys:
            fk.validate_fields(valid_fields)
            if fk.reference.resource is None:
                fk.validate_referenced_fields(valid_fields)

    if self.fieldDescriptors is not None:
        self.fieldDescriptors.validate_all_exist(valid_fields)
    return self