Skip to content

Clickhouse Metadata Store

ClickHouseMetadataStore

ClickHouseMetadataStore(connection_string: str | None = None, *, connection_params: dict[str, Any] | None = None, fallback_stores: list[MetadataStore] | None = None, **kwargs: Any)

Bases: IbisMetadataStore

ClickHouse metadata storeusing Ibis backend.

Connection Parameters
store = ClickHouseMetadataStore(
    backend="clickhouse",
    connection_params={
        "host": "localhost",
        "port": 9000,
        "database": "default",
        "user": "default",
        "password": ""
    },
    hash_algorithm=HashAlgorithm.XXHASH64
)

Parameters:

  • connection_string (str | None, default: None ) –

    ClickHouse connection string.

    Format: clickhouse://[user[:password]@]host[:port]/database[?param=value]

    Examples:

    - "clickhouse://localhost:9000/default"
    - "clickhouse://user:pass@host:9000/db"
    - "clickhouse://host:9000/db?secure=true"
    

  • connection_params (dict[str, Any] | None, default: None ) –

    Alternative to connection_string, specify params as dict:

    • host: Server host

    • port: Server port (default: 9000)

    • database: Database name

    • user: Username

    • password: Password

    • secure: Use secure connection (default: False)

  • fallback_stores (list[MetadataStore] | None, default: None ) –

    Ordered list of read-only fallback stores.

  • **kwargs (Any, default: {} ) –

Raises:

  • ImportError

    If ibis-clickhouse not installed

  • ValueError

    If neither connection_string nor connection_params provided

Source code in src/metaxy/metadata_store/clickhouse.py
def __init__(
    self,
    connection_string: str | None = None,
    *,
    connection_params: dict[str, Any] | None = None,
    fallback_stores: list["MetadataStore"] | None = None,
    **kwargs: Any,
):
    """
    Initialize [ClickHouse](https://clickhouse.com/) metadata store.

    Args:
        connection_string: ClickHouse connection string.

            Format: `clickhouse://[user[:password]@]host[:port]/database[?param=value]`

            Examples:
                ```
                - "clickhouse://localhost:9000/default"
                - "clickhouse://user:pass@host:9000/db"
                - "clickhouse://host:9000/db?secure=true"
                ```

        connection_params: Alternative to connection_string, specify params as dict:

            - host: Server host

            - port: Server port (default: `9000`)

            - database: Database name

            - user: Username

            - password: Password

            - secure: Use secure connection (default: `False`)

        fallback_stores: Ordered list of read-only fallback stores.

        **kwargs: Passed to [metaxy.metadata_store.ibis.IbisMetadataStore][]`

    Raises:
        ImportError: If ibis-clickhouse not installed
        ValueError: If neither connection_string nor connection_params provided
    """
    if connection_string is None and connection_params is None:
        raise ValueError(
            "Must provide either connection_string or connection_params. "
            "Example: connection_string='clickhouse://localhost:9000/default'"
        )

    # Initialize Ibis store with ClickHouse backend
    super().__init__(
        connection_string=connection_string,
        backend="clickhouse" if connection_string is None else None,
        connection_params=connection_params,
        fallback_stores=fallback_stores,
        **kwargs,
    )

Attributes

ibis_conn property

ibis_conn: BaseBackend

Get Ibis backend connection.

Returns:

  • BaseBackend

    Active Ibis backend connection

Raises:

  • StoreNotOpenError

    If store is not open

conn property

conn: BaseBackend

Get connection (alias for ibis_conn for consistency).

Returns:

  • BaseBackend

    Active Ibis backend connection

Raises:

  • StoreNotOpenError

    If store is not open

Functions

open

open() -> None

Open connection to database via Ibis.

Subclasses should override this to add backend-specific initialization (e.g., loading extensions) and should call super().open() first.

If auto_create_tables is enabled, creates system tables.

Source code in src/metaxy/metadata_store/ibis.py
def open(self) -> None:
    """Open connection to database via Ibis.

    Subclasses should override this to add backend-specific initialization
    (e.g., loading extensions) and should call super().open() first.

    If auto_create_tables is enabled, creates system tables.
    """
    if self.connection_string:
        # Use connection string
        self._conn = self._ibis.connect(self.connection_string)
    else:
        # Use backend + params
        # Get backend-specific connect function
        assert self.backend is not None, (
            "backend must be set if connection_string is None"
        )
        backend_module = getattr(self._ibis, self.backend)
        self._conn = backend_module.connect(**self.connection_params)

    # Auto-create system tables if enabled (warning is handled in base class)
    if self.auto_create_tables:
        self._create_system_tables()

close

close() -> None

Close the Ibis connection.

Source code in src/metaxy/metadata_store/ibis.py
def close(self) -> None:
    """Close the Ibis connection."""
    if self._conn is not None:
        # Ibis connections may not have explicit close method
        # but setting to None releases resources
        self._conn = None

__enter__

__enter__() -> Self

Enter context manager.

Source code in src/metaxy/metadata_store/base.py
def __enter__(self) -> Self:
    """Enter context manager."""
    # Track nesting depth
    self._context_depth += 1

    # Only open on first enter
    if self._context_depth == 1:
        # Warn if auto_create_tables is enabled (and store wants warnings)
        if self.auto_create_tables and self._should_warn_auto_create_tables:
            import warnings

            warnings.warn(
                f"AUTO_CREATE_TABLES is enabled for {self.display()} - "
                "do not use in production! "
                "Use proper database migration tools like Alembic for production deployments.",
                UserWarning,
                stacklevel=3,  # stacklevel=3 to point to user's 'with store:' line
            )

        self.open()
        self._is_open = True

        # Validate after opening (when all components are ready)
        self._validate_after_open()

    return self

__exit__

__exit__(exc_type, exc_val, exc_tb) -> None

Exit context manager.

Source code in src/metaxy/metadata_store/base.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """Exit context manager."""
    # Decrement depth
    self._context_depth -= 1

    # Only close when fully exited
    if self._context_depth == 0:
        self._is_open = False
        self.close()

validate_hash_algorithm

validate_hash_algorithm(check_fallback_stores: bool = True) -> None

Validate that hash algorithm is supported by this store's components.

Public method - can be called to verify hash compatibility.

Parameters:

  • check_fallback_stores (bool, default: True ) –

    If True, also validate hash is supported by fallback stores (ensures compatibility for future cross-store operations)

Raises:

  • ValueError

    If hash algorithm not supported by components or fallback stores

Source code in src/metaxy/metadata_store/base.py
def validate_hash_algorithm(
    self,
    check_fallback_stores: bool = True,
) -> None:
    """Validate that hash algorithm is supported by this store's components.

    Public method - can be called to verify hash compatibility.

    Args:
        check_fallback_stores: If True, also validate hash is supported by
            fallback stores (ensures compatibility for future cross-store operations)

    Raises:
        ValueError: If hash algorithm not supported by components or fallback stores
    """
    # Check if this store can support the algorithm
    # Try native field provenance calculations first (if supported), then Polars
    supported_algorithms = []

    if self._supports_native_components():
        try:
            _, calculator, _ = self._create_native_components()
            supported_algorithms = calculator.supported_algorithms
        except Exception:
            # If native field provenance calculations fail, fall back to Polars
            pass

    # If no native support or prefer_native=False, use Polars
    if not supported_algorithms:
        polars_calc = PolarsProvenanceByFieldCalculator()
        supported_algorithms = polars_calc.supported_algorithms

    if self.hash_algorithm not in supported_algorithms:
        from metaxy.metadata_store.exceptions import (
            HashAlgorithmNotSupportedError,
        )

        raise HashAlgorithmNotSupportedError(
            f"Hash algorithm {self.hash_algorithm} not supported by {self.__class__.__name__}. "
            f"Supported: {supported_algorithms}"
        )

    # Check fallback stores
    if check_fallback_stores:
        for fallback in self.fallback_stores:
            fallback.validate_hash_algorithm(check_fallback_stores=False)

allow_cross_project_writes

allow_cross_project_writes() -> Iterator[None]

Context manager to temporarily allow cross-project writes.

This is an escape hatch for legitimate cross-project operations like migrations, where metadata needs to be written to features from different projects.

Example
# During migration, allow writing to features from different projects
with store.allow_cross_project_writes():
    store.write_metadata(feature_from_project_a, metadata_a)
    store.write_metadata(feature_from_project_b, metadata_b)

Yields:

  • None ( None ) –

    The context manager temporarily disables project validation

Source code in src/metaxy/metadata_store/base.py
@contextmanager
def allow_cross_project_writes(self) -> Iterator[None]:
    """Context manager to temporarily allow cross-project writes.

    This is an escape hatch for legitimate cross-project operations like migrations,
    where metadata needs to be written to features from different projects.

    Example:
        ```py
        # During migration, allow writing to features from different projects
        with store.allow_cross_project_writes():
            store.write_metadata(feature_from_project_a, metadata_a)
            store.write_metadata(feature_from_project_b, metadata_b)
        ```

    Yields:
        None: The context manager temporarily disables project validation
    """
    previous_value = self._allow_cross_project_writes
    try:
        self._allow_cross_project_writes = True
        yield
    finally:
        self._allow_cross_project_writes = previous_value

write_metadata

write_metadata(feature: FeatureKey | type[BaseFeature], df: DataFrame[Any] | DataFrame) -> None

Write metadata for a feature (immutable, append-only).

Automatically adds the canonical system columns (metaxy_feature_version, metaxy_snapshot_version) unless they already exist in the DataFrame (useful for migrations).

Parameters:

  • feature (FeatureKey | type[BaseFeature]) –

    Feature to write metadata for

  • df (DataFrame[Any] | DataFrame) –

    Narwhals DataFrame or Polars DataFrame containing metadata. Must have metaxy_provenance_by_field column of type Struct with fields matching feature's fields. May optionally contain metaxy_feature_version and metaxy_snapshot_version (for migrations).

Raises:

  • MetadataSchemaError

    If DataFrame schema is invalid

  • StoreNotOpenError

    If store is not open

  • ValueError

    If writing to a feature from a different project than expected

Note
  • Always writes to current store, never to fallback stores.
  • If df already contains the metaxy-managed columns, they will be used as-is (no replacement). This allows migrations to write historical versions. A warning is issued unless suppressed via context manager.
  • Project validation is performed unless disabled via allow_cross_project_writes()
Source code in src/metaxy/metadata_store/base.py
def write_metadata(
    self,
    feature: FeatureKey | type[BaseFeature],
    df: nw.DataFrame[Any] | pl.DataFrame,
) -> None:
    """
    Write metadata for a feature (immutable, append-only).

    Automatically adds the canonical system columns (`metaxy_feature_version`,
    `metaxy_snapshot_version`) unless they already exist in the DataFrame
    (useful for migrations).

    Args:
        feature: Feature to write metadata for
        df: Narwhals DataFrame or Polars DataFrame containing metadata.
            Must have `metaxy_provenance_by_field` column of type Struct with fields matching feature's fields.
            May optionally contain `metaxy_feature_version` and `metaxy_snapshot_version` (for migrations).

    Raises:
        MetadataSchemaError: If DataFrame schema is invalid
        StoreNotOpenError: If store is not open
        ValueError: If writing to a feature from a different project than expected

    Note:
        - Always writes to current store, never to fallback stores.
        - If df already contains the metaxy-managed columns, they will be used
          as-is (no replacement). This allows migrations to write historical
          versions. A warning is issued unless suppressed via context manager.
        - Project validation is performed unless disabled via allow_cross_project_writes()
    """
    self._check_open()
    feature_key = self._resolve_feature_key(feature)
    is_system_table = self._is_system_table(feature_key)

    # Validate project for non-system tables
    if not is_system_table:
        self._validate_project_write(feature)

    # Convert Narwhals to Polars if needed
    if isinstance(df, nw.DataFrame):
        df = df.to_polars()
    # nw.DataFrame also matches as DataFrame in some contexts, ensure it's Polars
    if not isinstance(df, pl.DataFrame):
        # Must be some other type - shouldn't happen but handle defensively
        if hasattr(df, "to_polars"):
            df = df.to_polars()
        elif hasattr(df, "to_pandas"):
            df = pl.from_pandas(df.to_pandas())
        else:
            raise TypeError(f"Cannot convert {type(df)} to Polars DataFrame")

    # For system tables, write directly without feature_version tracking
    if is_system_table:
        self._validate_schema_system_table(df)
        self._write_metadata_impl(feature_key, df)
        return

    # For regular features: add feature_version and snapshot_version, validate, and write
    # Check if feature_version and snapshot_version already exist in DataFrame
    if FEATURE_VERSION_COL in df.columns and SNAPSHOT_VERSION_COL in df.columns:
        # DataFrame already has feature_version and snapshot_version - use as-is
        # This is intended for migrations writing historical versions
        # Issue a warning unless we're in a suppression context
        if not _suppress_feature_version_warning.get():
            import warnings

            warnings.warn(
                f"Writing metadata for {feature_key.to_string()} with existing "
                f"{FEATURE_VERSION_COL} and {SNAPSHOT_VERSION_COL} columns. This is intended for migrations only. "
                "Normal code should let write_metadata() add the current versions automatically.",
                UserWarning,
                stacklevel=2,
            )
    else:
        # Get current feature version and snapshot_version from code and add them
        if isinstance(feature, type) and issubclass(feature, BaseFeature):
            current_feature_version = feature.feature_version()  # type: ignore[attr-defined]
        else:
            from metaxy.models.feature import FeatureGraph

            graph = FeatureGraph.get_active()
            feature_cls = graph.features_by_key[feature_key]
            current_feature_version = feature_cls.feature_version()  # type: ignore[attr-defined]

        # Get snapshot_version from active graph
        from metaxy.models.feature import FeatureGraph

        graph = FeatureGraph.get_active()
        current_snapshot_version = graph.snapshot_version

        df = df.with_columns(
            [
                pl.lit(current_feature_version).alias(FEATURE_VERSION_COL),
                pl.lit(current_snapshot_version).alias(SNAPSHOT_VERSION_COL),
            ]
        )

    # Validate schema
    self._validate_schema(df)

    # Write metadata
    self._write_metadata_impl(feature_key, df)

drop_feature_metadata

drop_feature_metadata(feature: FeatureKey | type[BaseFeature]) -> None

Drop all metadata for a feature.

This removes all stored metadata for the specified feature from the store. Useful for cleanup in tests or when re-computing feature metadata from scratch.

Warning

This operation is irreversible and will permanently delete all metadata for the specified feature.

Parameters:

Example
store.drop_feature_metadata(MyFeature)
assert not store.has_feature(MyFeature)
Source code in src/metaxy/metadata_store/base.py
def drop_feature_metadata(self, feature: FeatureKey | type[BaseFeature]) -> None:
    """Drop all metadata for a feature.

    This removes all stored metadata for the specified feature from the store.
    Useful for cleanup in tests or when re-computing feature metadata from scratch.

    Warning:
        This operation is irreversible and will **permanently delete all metadata** for the specified feature.

    Args:
        feature: Feature class or key to drop metadata for

    Example:
        ```py
        store.drop_feature_metadata(MyFeature)
        assert not store.has_feature(MyFeature)
        ```
    """
    self._check_open()
    feature_key = self._resolve_feature_key(feature)
    self._drop_feature_metadata_impl(feature_key)

record_feature_graph_snapshot

record_feature_graph_snapshot() -> SnapshotPushResult

Record all features in graph with a graph snapshot version.

This should be called during CD (Continuous Deployment) to record what feature versions are being deployed. Typically invoked via metaxy graph push.

Records all features in the graph with the same snapshot_version, representing a consistent state of the entire feature graph based on code definitions.

The snapshot_version is a deterministic hash of all feature_version hashes in the graph, making it idempotent - calling multiple times with the same feature definitions produces the same snapshot_version.

This method detects three scenarios: 1. New snapshot (computational changes): No existing rows with this snapshot_version 2. Metadata-only changes: Snapshot exists but some features have different feature_spec_version 3. No changes: Snapshot exists with identical feature_spec_versions for all features

Returns: SnapshotPushResult

Source code in src/metaxy/metadata_store/base.py
def record_feature_graph_snapshot(self) -> SnapshotPushResult:
    """Record all features in graph with a graph snapshot version.

    This should be called during CD (Continuous Deployment) to record what
    feature versions are being deployed. Typically invoked via `metaxy graph push`.

    Records all features in the graph with the same snapshot_version, representing
    a consistent state of the entire feature graph based on code definitions.

    The snapshot_version is a deterministic hash of all feature_version hashes
    in the graph, making it idempotent - calling multiple times with the
    same feature definitions produces the same snapshot_version.

    This method detects three scenarios:
    1. New snapshot (computational changes): No existing rows with this snapshot_version
    2. Metadata-only changes: Snapshot exists but some features have different feature_spec_version
    3. No changes: Snapshot exists with identical feature_spec_versions for all features

    Returns: SnapshotPushResult
    """

    from metaxy.models.feature import FeatureGraph

    graph = FeatureGraph.get_active()

    # Use to_snapshot() to get the snapshot dict
    snapshot_dict = graph.to_snapshot()

    # Generate deterministic snapshot_version from graph
    snapshot_version = graph.snapshot_version

    # Read existing feature versions once
    try:
        existing_versions_lazy = self.read_metadata_in_store(FEATURE_VERSIONS_KEY)
        # Materialize to Polars for iteration
        existing_versions = (
            existing_versions_lazy.collect().to_polars()
            if existing_versions_lazy is not None
            else None
        )
    except Exception:
        # Table doesn't exist yet
        existing_versions = None

    # Get project from any feature in the graph (all should have the same project)
    # Default to empty string if no features in graph
    if graph.features_by_key:
        # Get first feature's project
        first_feature = next(iter(graph.features_by_key.values()))
        project_name = first_feature.project  # type: ignore[attr-defined]
    else:
        project_name = ""

    # Check if this exact snapshot already exists for this project
    snapshot_already_exists = False
    existing_spec_versions: dict[str, str] = {}

    if existing_versions is not None:
        # Check if project column exists (it may not in old tables)
        if "project" in existing_versions.columns:
            snapshot_rows = existing_versions.filter(
                (pl.col(SNAPSHOT_VERSION_COL) == snapshot_version)
                & (pl.col("project") == project_name)
            )
        else:
            # Old table without project column - just check snapshot_version
            snapshot_rows = existing_versions.filter(
                pl.col(SNAPSHOT_VERSION_COL) == snapshot_version
            )
        snapshot_already_exists = snapshot_rows.height > 0

        if snapshot_already_exists:
            # Check if feature_spec_version column exists (backward compatibility)
            # Old records (before issue #77) won't have this column
            has_spec_version = FEATURE_SPEC_VERSION_COL in snapshot_rows.columns

            if has_spec_version:
                # Build dict of existing feature_key -> feature_spec_version
                for row in snapshot_rows.iter_rows(named=True):
                    existing_spec_versions[row["feature_key"]] = row[
                        FEATURE_SPEC_VERSION_COL
                    ]
            # If no spec_version column, existing_spec_versions remains empty
            # This means we'll treat it as "no metadata changes" (conservative approach)

    # Scenario 1: New snapshot (no existing rows)
    if not snapshot_already_exists:
        # Build records from snapshot_dict
        records = []
        for feature_key_str in sorted(snapshot_dict.keys()):
            feature_data = snapshot_dict[feature_key_str]

            # Serialize complete FeatureSpec
            feature_spec_json = json.dumps(feature_data["feature_spec"])

            # Always record all features for this snapshot (don't skip based on feature_version alone)
            # Each snapshot must be complete to support migration detection
            records.append(
                {
                    "project": project_name,
                    "feature_key": feature_key_str,
                    FEATURE_VERSION_COL: feature_data[FEATURE_VERSION_COL],
                    FEATURE_SPEC_VERSION_COL: feature_data[
                        FEATURE_SPEC_VERSION_COL
                    ],
                    FEATURE_TRACKING_VERSION_COL: feature_data[
                        FEATURE_TRACKING_VERSION_COL
                    ],
                    "recorded_at": datetime.now(timezone.utc),
                    "feature_spec": feature_spec_json,
                    "feature_class_path": feature_data["feature_class_path"],
                    SNAPSHOT_VERSION_COL: snapshot_version,
                }
            )

        # Bulk write all new records at once
        if records:
            version_records = pl.DataFrame(
                records,
                schema=FEATURE_VERSIONS_SCHEMA,
            )
            self._write_metadata_impl(FEATURE_VERSIONS_KEY, version_records)

        return SnapshotPushResult(
            snapshot_version=snapshot_version,
            already_recorded=False,
            metadata_changed=False,
            features_with_spec_changes=[],
        )

    # Scenario 2 & 3: Snapshot exists - check for metadata changes
    features_with_spec_changes = []

    for feature_key_str, feature_data in snapshot_dict.items():
        current_spec_version = feature_data[FEATURE_SPEC_VERSION_COL]
        existing_spec_version = existing_spec_versions.get(feature_key_str)

        if existing_spec_version != current_spec_version:
            features_with_spec_changes.append(feature_key_str)

    # If metadata changed, append new rows for affected features
    if features_with_spec_changes:
        records = []
        for feature_key_str in features_with_spec_changes:
            feature_data = snapshot_dict[feature_key_str]

            # Serialize complete FeatureSpec
            feature_spec_json = json.dumps(feature_data["feature_spec"])

            records.append(
                {
                    "project": project_name,
                    "feature_key": feature_key_str,
                    FEATURE_VERSION_COL: feature_data[FEATURE_VERSION_COL],
                    FEATURE_SPEC_VERSION_COL: feature_data[
                        FEATURE_SPEC_VERSION_COL
                    ],
                    FEATURE_TRACKING_VERSION_COL: feature_data[
                        FEATURE_TRACKING_VERSION_COL
                    ],
                    "recorded_at": datetime.now(timezone.utc),
                    "feature_spec": feature_spec_json,
                    "feature_class_path": feature_data["feature_class_path"],
                    SNAPSHOT_VERSION_COL: snapshot_version,
                }
            )

        # Bulk write updated records (append-only)
        if records:
            version_records = pl.DataFrame(
                records,
                schema=FEATURE_VERSIONS_SCHEMA,
            )
            self._write_metadata_impl(FEATURE_VERSIONS_KEY, version_records)

        return SnapshotPushResult(
            snapshot_version=snapshot_version,
            already_recorded=True,
            metadata_changed=True,
            features_with_spec_changes=features_with_spec_changes,
        )

    # Scenario 3: No changes at all
    return SnapshotPushResult(
        snapshot_version=snapshot_version,
        already_recorded=True,
        metadata_changed=False,
        features_with_spec_changes=[],
    )

read_metadata_in_store

read_metadata_in_store(feature: FeatureKey | type[BaseFeature], *, feature_version: str | None = None, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None) -> LazyFrame[Any] | None

Read metadata from this store only (no fallback).

Parameters:

  • feature (FeatureKey | type[BaseFeature]) –

    Feature to read

  • feature_version (str | None, default: None ) –

    Filter by specific feature_version (applied as SQL WHERE clause)

  • filters (Sequence[Expr] | None, default: None ) –

    List of Narwhals filter expressions (converted to SQL WHERE clauses)

  • columns (Sequence[str] | None, default: None ) –

    Optional list of columns to select

Returns:

  • LazyFrame[Any] | None

    Narwhals LazyFrame with metadata, or None if not found

Source code in src/metaxy/metadata_store/ibis.py
def read_metadata_in_store(
    self,
    feature: FeatureKey | type[BaseFeature],
    *,
    feature_version: str | None = None,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
) -> nw.LazyFrame[Any] | None:
    """
    Read metadata from this store only (no fallback).

    Args:
        feature: Feature to read
        feature_version: Filter by specific feature_version (applied as SQL WHERE clause)
        filters: List of Narwhals filter expressions (converted to SQL WHERE clauses)
        columns: Optional list of columns to select

    Returns:
        Narwhals LazyFrame with metadata, or None if not found
    """
    feature_key = self._resolve_feature_key(feature)
    table_name = feature_key.table_name

    # Check if table exists
    existing_tables = self.conn.list_tables()
    if table_name not in existing_tables:
        return None

    # Get Ibis table reference
    table = self.conn.table(table_name)

    # Wrap Ibis table with Narwhals (stays lazy in SQL)
    nw_lazy: nw.LazyFrame[Any] = nw.from_native(table, eager_only=False)

    # Apply feature_version filter (stays in SQL via Narwhals)
    if feature_version is not None:
        nw_lazy = nw_lazy.filter(
            nw.col("metaxy_feature_version") == feature_version
        )

    # Apply generic Narwhals filters (stays in SQL)
    if filters is not None:
        for filter_expr in filters:
            nw_lazy = nw_lazy.filter(filter_expr)

    # Select columns (stays in SQL)
    if columns is not None:
        nw_lazy = nw_lazy.select(columns)

    # Return Narwhals LazyFrame wrapping Ibis table (stays lazy in SQL)
    return nw_lazy

read_metadata

read_metadata(feature: FeatureKey | type[BaseFeature], *, feature_version: str | None = None, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, allow_fallback: bool = True, current_only: bool = True) -> LazyFrame[Any]

Read metadata with optional fallback to upstream stores.

Parameters:

  • feature (FeatureKey | type[BaseFeature]) –

    Feature to read metadata for

  • feature_version (str | None, default: None ) –

    Explicit feature_version to filter by (mutually exclusive with current_only=True)

  • filters (Sequence[Expr] | None, default: None ) –

    Sequence of Narwhals filter expressions to apply to this feature. Example: [nw.col("x") > 10, nw.col("y") < 5]

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to return

  • allow_fallback (bool, default: True ) –

    If True, check fallback stores on local miss

  • current_only (bool, default: True ) –

    If True, only return rows with current feature_version (default: True for safety)

Returns:

Raises:

  • FeatureNotFoundError

    If feature not found in any store

  • ValueError

    If both feature_version and current_only=True are provided

Source code in src/metaxy/metadata_store/base.py
def read_metadata(
    self,
    feature: FeatureKey | type[BaseFeature],
    *,
    feature_version: str | None = None,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    current_only: bool = True,
) -> nw.LazyFrame[Any]:
    """
    Read metadata with optional fallback to upstream stores.

    Args:
        feature: Feature to read metadata for
        feature_version: Explicit feature_version to filter by (mutually exclusive with current_only=True)
        filters: Sequence of Narwhals filter expressions to apply to this feature.
            Example: [nw.col("x") > 10, nw.col("y") < 5]
        columns: Subset of columns to return
        allow_fallback: If True, check fallback stores on local miss
        current_only: If True, only return rows with current feature_version
            (default: True for safety)

    Returns:
        Narwhals LazyFrame with metadata

    Raises:
        FeatureNotFoundError: If feature not found in any store
        ValueError: If both feature_version and current_only=True are provided
    """
    feature_key = self._resolve_feature_key(feature)
    is_system_table = self._is_system_table(feature_key)

    # Validate mutually exclusive parameters
    if feature_version is not None and current_only:
        raise ValueError(
            "Cannot specify both feature_version and current_only=True. "
            "Use current_only=False with feature_version parameter."
        )

    # Determine which feature_version to use
    feature_version_filter = feature_version
    if current_only and not is_system_table:
        # Get current feature_version
        if isinstance(feature, type) and issubclass(feature, BaseFeature):
            feature_version_filter = feature.feature_version()  # type: ignore[attr-defined]
        else:
            from metaxy.models.feature import FeatureGraph

            graph = FeatureGraph.get_active()
            # Only try to get from graph if feature_key exists in graph
            # This allows reading system tables or external features not in current graph
            if feature_key in graph.features_by_key:
                feature_cls = graph.features_by_key[feature_key]
                feature_version_filter = feature_cls.feature_version()  # type: ignore[attr-defined]
            else:
                # Feature not in graph - skip feature_version filtering
                feature_version_filter = None

    # Try local first with filters
    lazy_frame = self.read_metadata_in_store(
        feature,
        feature_version=feature_version_filter,
        filters=filters,  # Pass filters directly
        columns=columns,
    )

    if lazy_frame is not None:
        return lazy_frame

    # Try fallback stores
    if allow_fallback:
        for store in self.fallback_stores:
            try:
                # Use full read_metadata to handle nested fallback chains
                return store.read_metadata(
                    feature,
                    feature_version=feature_version,
                    filters=filters,  # Pass through filters directly
                    columns=columns,
                    allow_fallback=True,
                    current_only=current_only,  # Pass through current_only
                )
            except FeatureNotFoundError:
                # Try next fallback store
                continue

    # Not found anywhere
    raise FeatureNotFoundError(
        f"Feature {feature_key.to_string()} not found in store"
        + (" or fallback stores" if allow_fallback else "")
    )

has_feature

has_feature(feature: FeatureKey | type[BaseFeature], *, check_fallback: bool = False) -> bool

Check if feature exists in store.

Parameters:

Returns:

  • bool

    True if feature exists, False otherwise

Source code in src/metaxy/metadata_store/base.py
def has_feature(
    self,
    feature: FeatureKey | type[BaseFeature],
    *,
    check_fallback: bool = False,
) -> bool:
    """
    Check if feature exists in store.

    Args:
        feature: Feature to check
        check_fallback: If True, also check fallback stores

    Returns:
        True if feature exists, False otherwise
    """
    # Check local
    if self.read_metadata_in_store(feature) is not None:
        return True

    # Check fallback stores
    if check_fallback:
        for store in self.fallback_stores:
            if store.has_feature(feature, check_fallback=True):
                return True

    return False

list_features

list_features(*, include_fallback: bool = False) -> list[FeatureKey]

List all features in store.

Parameters:

  • include_fallback (bool, default: False ) –

    If True, include features from fallback stores

Returns:

Raises:

  • StoreNotOpenError

    If store is not open

Source code in src/metaxy/metadata_store/base.py
def list_features(self, *, include_fallback: bool = False) -> list[FeatureKey]:
    """
    List all features in store.

    Args:
        include_fallback: If True, include features from fallback stores

    Returns:
        List of FeatureKey objects

    Raises:
        StoreNotOpenError: If store is not open
    """
    self._check_open()

    features = self._list_features_local()

    if include_fallback:
        for store in self.fallback_stores:
            features.extend(store.list_features(include_fallback=True))

    # Deduplicate
    seen = set()
    unique_features = []
    for feature in features:
        key_str = feature.to_string()
        if key_str not in seen:
            seen.add(key_str)
            unique_features.append(feature)

    return unique_features

display

display() -> str

Display string for this store.

Source code in src/metaxy/metadata_store/ibis.py
def display(self) -> str:
    """Display string for this store."""
    backend_info = self.connection_string or f"{self.backend}"
    if self._is_open:
        num_features = len(self._list_features_local())
        return f"IbisMetadataStore(backend={backend_info}, features={num_features})"
    else:
        return f"IbisMetadataStore(backend={backend_info})"

read_graph_snapshots

read_graph_snapshots(project: str | None = None) -> DataFrame

Read recorded graph snapshots from the feature_versions system table.

Parameters:

  • project (str | None, default: None ) –

    Project name to filter by. If None, returns snapshots from all projects.

Returns a DataFrame with columns: - snapshot_version: Unique identifier for each graph snapshot - recorded_at: Timestamp when the snapshot was recorded - feature_count: Number of features in this snapshot

Returns:

  • DataFrame

    Polars DataFrame with snapshot information, sorted by recorded_at descending

Raises:

  • StoreNotOpenError

    If store is not open

Example
with store:
    # Get snapshots for a specific project
    snapshots = store.read_graph_snapshots(project="my_project")
    latest_snapshot = snapshots[SNAPSHOT_VERSION_COL][0]
    print(f"Latest snapshot: {latest_snapshot}")

    # Get snapshots across all projects
    all_snapshots = store.read_graph_snapshots()
Source code in src/metaxy/metadata_store/base.py
def read_graph_snapshots(self, project: str | None = None) -> pl.DataFrame:
    """Read recorded graph snapshots from the feature_versions system table.

    Args:
        project: Project name to filter by. If None, returns snapshots from all projects.

    Returns a DataFrame with columns:
    - snapshot_version: Unique identifier for each graph snapshot
    - recorded_at: Timestamp when the snapshot was recorded
    - feature_count: Number of features in this snapshot

    Returns:
        Polars DataFrame with snapshot information, sorted by recorded_at descending

    Raises:
        StoreNotOpenError: If store is not open

    Example:
        ```py
        with store:
            # Get snapshots for a specific project
            snapshots = store.read_graph_snapshots(project="my_project")
            latest_snapshot = snapshots[SNAPSHOT_VERSION_COL][0]
            print(f"Latest snapshot: {latest_snapshot}")

            # Get snapshots across all projects
            all_snapshots = store.read_graph_snapshots()
        ```
    """
    self._check_open()

    # Build filters based on project parameter
    filters = None
    if project is not None:
        import narwhals as nw

        filters = [nw.col("project") == project]

    versions_lazy = self.read_metadata_in_store(
        FEATURE_VERSIONS_KEY, filters=filters
    )
    if versions_lazy is None:
        # No snapshots recorded yet
        return pl.DataFrame(
            schema={
                SNAPSHOT_VERSION_COL: pl.String,
                "recorded_at": pl.Datetime("us"),
                "feature_count": pl.UInt32,
            }
        )

    versions_df = versions_lazy.collect().to_polars()

    # Group by snapshot_version and get earliest recorded_at and count
    snapshots = (
        versions_df.group_by(SNAPSHOT_VERSION_COL)
        .agg(
            [
                pl.col("recorded_at").min().alias("recorded_at"),
                pl.col("feature_key").count().alias("feature_count"),
            ]
        )
        .sort("recorded_at", descending=True)
    )

    return snapshots

read_features

read_features(*, current: bool = True, snapshot_version: str | None = None, project: str | None = None) -> DataFrame

Read feature version information from the feature_versions system table.

Parameters:

  • current (bool, default: True ) –

    If True, only return features from the current code snapshot. If False, must provide snapshot_version.

  • snapshot_version (str | None, default: None ) –

    Specific snapshot version to filter by. Required if current=False.

  • project (str | None, default: None ) –

    Project name to filter by. Defaults to None.

Returns:

  • DataFrame

    Polars DataFrame with columns from FEATURE_VERSIONS_SCHEMA:

  • DataFrame
    • feature_key: Feature identifier
  • DataFrame
    • feature_version: Version hash of the feature
  • DataFrame
    • recorded_at: When this version was recorded
  • DataFrame
    • feature_spec: JSON serialized feature specification
  • DataFrame
    • feature_class_path: Python import path to the feature class
  • DataFrame
    • snapshot_version: Graph snapshot this feature belongs to

Raises:

  • StoreNotOpenError

    If store is not open

  • ValueError

    If current=False but no snapshot_version provided

Examples:

# Get features from current code
with store:
    features = store.read_features(current=True)
    print(f"Current graph has {len(features)} features")
# Get features from a specific snapshot
with store:
    features = store.read_features(current=False, snapshot_version="abc123")
    for row in features.iter_rows(named=True):
        print(f"{row['feature_key']}: {row['metaxy_feature_version']}")
Source code in src/metaxy/metadata_store/base.py
def read_features(
    self,
    *,
    current: bool = True,
    snapshot_version: str | None = None,
    project: str | None = None,
) -> pl.DataFrame:
    """Read feature version information from the feature_versions system table.

    Args:
        current: If True, only return features from the current code snapshot.
                 If False, must provide snapshot_version.
        snapshot_version: Specific snapshot version to filter by. Required if current=False.
        project: Project name to filter by. Defaults to None.

    Returns:
        Polars DataFrame with columns from FEATURE_VERSIONS_SCHEMA:
        - feature_key: Feature identifier
        - feature_version: Version hash of the feature
        - recorded_at: When this version was recorded
        - feature_spec: JSON serialized feature specification
        - feature_class_path: Python import path to the feature class
        - snapshot_version: Graph snapshot this feature belongs to

    Raises:
        StoreNotOpenError: If store is not open
        ValueError: If current=False but no snapshot_version provided

    Examples:
        ```py
        # Get features from current code
        with store:
            features = store.read_features(current=True)
            print(f"Current graph has {len(features)} features")
        ```

        ```py
        # Get features from a specific snapshot
        with store:
            features = store.read_features(current=False, snapshot_version="abc123")
            for row in features.iter_rows(named=True):
                print(f"{row['feature_key']}: {row['metaxy_feature_version']}")
        ```
    """
    self._check_open()

    if not current and snapshot_version is None:
        raise ValueError("Must provide snapshot_version when current=False")

    if current:
        # Get current snapshot from active graph
        graph = FeatureGraph.get_active()
        snapshot_version = graph.snapshot_version

    filters = [nw.col(SNAPSHOT_VERSION_COL) == snapshot_version]
    if project is not None:
        filters.append(nw.col("project") == project)

    versions_lazy = self.read_metadata_in_store(
        FEATURE_VERSIONS_KEY, filters=filters
    )
    if versions_lazy is None:
        # No features recorded yet
        return pl.DataFrame(schema=FEATURE_VERSIONS_SCHEMA)

    # Filter by snapshot_version
    versions_df = versions_lazy.collect().to_polars()

    return versions_df

copy_metadata

copy_metadata(from_store: MetadataStore, features: list[FeatureKey | type[BaseFeature]] | None = None, *, from_snapshot: str | None = None, filters: Mapping[str, Sequence[Expr]] | None = None, incremental: bool = True) -> dict[str, int]

Copy metadata from another store with fine-grained filtering.

This is a reusable method that can be called programmatically or from CLI/migrations. Copies metadata for specified features, preserving the original snapshot_version.

Parameters:

  • from_store (MetadataStore) –

    Source metadata store to copy from (must be opened)

  • features (list[FeatureKey | type[BaseFeature]] | None, default: None ) –

    List of features to copy. Can be: - None: copies all features from source store - List of FeatureKey or Feature classes: copies specified features

  • from_snapshot (str | None, default: None ) –

    Snapshot version to filter source data by. If None, uses latest snapshot from source store. Only rows with this snapshot_version will be copied. The snapshot_version is preserved in the destination store.

  • filters (Mapping[str, Sequence[Expr]] | None, default: None ) –

    Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions. These filters are applied when reading from the source store. Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}

  • incremental (bool, default: True ) –

    If True (default), filter out rows that already exist in the destination store by performing an anti-join on sample_uid for the same snapshot_version.

    The implementation uses an anti-join: source LEFT ANTI JOIN destination ON sample_uid filtered by snapshot_version.

    Disabling incremental (incremental=False) may improve performance when: - You know the destination is empty or has no overlap with source - The destination store uses deduplication

    When incremental=False, it's the user's responsibility to avoid duplicates or configure deduplication at the storage layer.

Returns:

  • dict[str, int]

    Dict with statistics: {"features_copied": int, "rows_copied": int}

Raises:

  • ValueError

    If from_store or self (destination) is not open

  • FeatureNotFoundError

    If a specified feature doesn't exist in source store

Examples:

# Simple: copy all features from latest snapshot
stats = dest_store.copy_metadata(from_store=source_store)
# Copy specific features from a specific snapshot
stats = dest_store.copy_metadata(
    from_store=source_store,
    features=[FeatureKey(["my_feature"])],
    from_snapshot="abc123",
)
# Copy with filters
stats = dest_store.copy_metadata(
    from_store=source_store,
    filters={"my/feature": [nw.col("sample_uid").is_in(["s1", "s2"])]},
)
# Copy specific features with filters
stats = dest_store.copy_metadata(
    from_store=source_store,
    features=[
        FeatureKey(["feature_a"]),
        FeatureKey(["feature_b"]),
    ],
    filters={
        "feature_a": [nw.col("field_a") > 10, nw.col("sample_uid").is_in(["s1", "s2"])],
        "feature_b": [nw.col("field_b") < 30],
    },
)
Source code in src/metaxy/metadata_store/base.py
def copy_metadata(
    self,
    from_store: MetadataStore,
    features: list[FeatureKey | type[BaseFeature]] | None = None,
    *,
    from_snapshot: str | None = None,
    filters: Mapping[str, Sequence[nw.Expr]] | None = None,
    incremental: bool = True,
) -> dict[str, int]:
    """Copy metadata from another store with fine-grained filtering.

    This is a reusable method that can be called programmatically or from CLI/migrations.
    Copies metadata for specified features, preserving the original snapshot_version.

    Args:
        from_store: Source metadata store to copy from (must be opened)
        features: List of features to copy. Can be:
            - None: copies all features from source store
            - List of FeatureKey or Feature classes: copies specified features
        from_snapshot: Snapshot version to filter source data by. If None, uses latest snapshot
            from source store. Only rows with this snapshot_version will be copied.
            The snapshot_version is preserved in the destination store.
        filters: Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions.
            These filters are applied when reading from the source store.
            Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
        incremental: If True (default), filter out rows that already exist in the destination
            store by performing an anti-join on sample_uid for the same snapshot_version.

            The implementation uses an anti-join: source LEFT ANTI JOIN destination ON sample_uid
            filtered by snapshot_version.

            Disabling incremental (incremental=False) may improve performance when:
            - You know the destination is empty or has no overlap with source
            - The destination store uses deduplication

            When incremental=False, it's the user's responsibility to avoid duplicates or
            configure deduplication at the storage layer.

    Returns:
        Dict with statistics: {"features_copied": int, "rows_copied": int}

    Raises:
        ValueError: If from_store or self (destination) is not open
        FeatureNotFoundError: If a specified feature doesn't exist in source store

    Examples:
        ```py
        # Simple: copy all features from latest snapshot
        stats = dest_store.copy_metadata(from_store=source_store)
        ```

        ```py
        # Copy specific features from a specific snapshot
        stats = dest_store.copy_metadata(
            from_store=source_store,
            features=[FeatureKey(["my_feature"])],
            from_snapshot="abc123",
        )
        ```

        ```py
        # Copy with filters
        stats = dest_store.copy_metadata(
            from_store=source_store,
            filters={"my/feature": [nw.col("sample_uid").is_in(["s1", "s2"])]},
        )
        ```

        ```py
        # Copy specific features with filters
        stats = dest_store.copy_metadata(
            from_store=source_store,
            features=[
                FeatureKey(["feature_a"]),
                FeatureKey(["feature_b"]),
            ],
            filters={
                "feature_a": [nw.col("field_a") > 10, nw.col("sample_uid").is_in(["s1", "s2"])],
                "feature_b": [nw.col("field_b") < 30],
            },
        )
        ```
    """
    import logging

    logger = logging.getLogger(__name__)

    # Validate destination store is open
    if not self._is_open:
        raise ValueError("Destination store must be opened (use context manager)")

    # Automatically handle source store context manager
    should_close_source = not from_store._is_open
    if should_close_source:
        from_store.__enter__()

    try:
        return self._copy_metadata_impl(
            from_store=from_store,
            features=features,
            from_snapshot=from_snapshot,
            filters=filters,
            incremental=incremental,
            logger=logger,
        )
    finally:
        if should_close_source:
            from_store.__exit__(None, None, None)

read_upstream_metadata

read_upstream_metadata(feature: FeatureKey | type[BaseFeature], field: FieldKey | None = None, *, filters: Mapping[str, Sequence[Expr]] | None = None, allow_fallback: bool = True, current_only: bool = True) -> dict[str, LazyFrame[Any]]

Read all upstream dependencies for a feature/field.

Parameters:

  • feature (FeatureKey | type[BaseFeature]) –

    Feature whose dependencies to load

  • field (FieldKey | None, default: None ) –

    Specific field (if None, loads all deps for feature)

  • filters (Mapping[str, Sequence[Expr]] | None, default: None ) –

    Dict mapping feature keys (as strings) to lists of Narwhals filter expressions. Example: {"upstream/feature1": [nw.col("x") > 10], "upstream/feature2": [...]}

  • allow_fallback (bool, default: True ) –

    Whether to check fallback stores

  • current_only (bool, default: True ) –

    If True, only read current feature_version for upstream

Returns:

  • dict[str, LazyFrame[Any]]

    Dict mapping upstream feature keys (as strings) to Narwhals LazyFrames.

  • dict[str, LazyFrame[Any]]

    Each LazyFrame has a 'metaxy_provenance_by_field' column (Struct).

Raises:

  • DependencyError

    If required upstream feature is missing

Source code in src/metaxy/metadata_store/base.py
def read_upstream_metadata(
    self,
    feature: FeatureKey | type[BaseFeature],
    field: FieldKey | None = None,
    *,
    filters: Mapping[str, Sequence[nw.Expr]] | None = None,
    allow_fallback: bool = True,
    current_only: bool = True,
) -> dict[str, nw.LazyFrame[Any]]:
    """
    Read all upstream dependencies for a feature/field.

    Args:
        feature: Feature whose dependencies to load
        field: Specific field (if None, loads all deps for feature)
        filters: Dict mapping feature keys (as strings) to lists of Narwhals filter expressions.
            Example: {"upstream/feature1": [nw.col("x") > 10], "upstream/feature2": [...]}
        allow_fallback: Whether to check fallback stores
        current_only: If True, only read current feature_version for upstream

    Returns:
        Dict mapping upstream feature keys (as strings) to Narwhals LazyFrames.
        Each LazyFrame has a 'metaxy_provenance_by_field' column (Struct).

    Raises:
        DependencyError: If required upstream feature is missing
    """
    plan = self._resolve_feature_plan(feature)

    # Get all upstream features we need
    upstream_features = set()

    if field is None:
        # All fields' dependencies
        for cont in plan.feature.fields:
            upstream_features.update(self._get_field_dependencies(plan, cont.key))
    else:
        # Specific field's dependencies
        upstream_features.update(self._get_field_dependencies(plan, field))

    # Load metadata for each upstream feature
    # Use the feature's graph to look up upstream feature classes
    if isinstance(feature, FeatureKey):
        from metaxy.models.feature import FeatureGraph

        graph = FeatureGraph.get_active()
    else:
        graph = feature.graph

    upstream_metadata = {}
    for upstream_fq_key in upstream_features:
        upstream_feature_key = upstream_fq_key.feature

        # Extract filters for this specific upstream feature
        upstream_filters = None
        if filters:
            upstream_key_str = upstream_feature_key.to_string()
            if upstream_key_str in filters:
                upstream_filters = filters[upstream_key_str]

        try:
            # Look up the Feature class from the graph and pass it to read_metadata
            # This way we use the bound graph instead of relying on active context
            upstream_feature_cls = graph.features_by_key[upstream_feature_key]
            lazy_frame = self.read_metadata(
                upstream_feature_cls,
                filters=upstream_filters,  # Pass extracted filters (Sequence or None)
                allow_fallback=allow_fallback,
                current_only=current_only,  # Pass through current_only
            )
            # Use string key for dict
            upstream_metadata[upstream_feature_key.to_string()] = lazy_frame
        except FeatureNotFoundError as e:
            raise DependencyError(
                f"Missing upstream feature {upstream_feature_key.to_string()} "
                f"required by {plan.feature.key.to_string()}"
            ) from e

    return upstream_metadata

resolve_update

resolve_update(feature: type[BaseFeature], *, samples: DataFrame[Any] | LazyFrame[Any] | None = None, filters: Mapping[str, Sequence[Expr]] | None = None, lazy: Literal[False] = False, **kwargs: Any) -> Increment
resolve_update(feature: type[BaseFeature], *, samples: DataFrame[Any] | LazyFrame[Any] | None = None, filters: Mapping[str, Sequence[Expr]] | None = None, lazy: Literal[True], **kwargs: Any) -> LazyIncrement
resolve_update(feature: type[BaseFeature], *, samples: DataFrame[Any] | LazyFrame[Any] | None = None, filters: Mapping[str, Sequence[Expr]] | None = None, lazy: bool = False, **kwargs: Any) -> Increment | LazyIncrement

Calculate an incremental update for a feature.

Parameters:

  • feature (type[BaseFeature]) –

    Feature class to resolve updates for

  • samples (DataFrame[Any] | LazyFrame[Any] | None, default: None ) –

    Pre-computed DataFrame with ID columns and PROVENANCE_BY_FIELD_COL column. When provided, MetadataStore skips upstream loading, joining, and field provenance calculation.

    Required for root features (features with no upstream dependencies). Root features don't have upstream to calculate PROVENANCE_BY_FIELD_COL from, so users must provide samples with manually computed PROVENANCE_BY_FIELD_COL column.

    For non-root features, use this when you want to bypass the automatic upstream loading and field provenance calculation.

    Examples:

    • Loading upstream from custom sources

    • Pre-computing field provenances with custom logic

    • Testing specific scenarios

    Setting this parameter during normal operations is not required.

  • filters (Mapping[str, Sequence[Expr]] | None, default: None ) –

    Dict mapping feature keys (as strings) to lists of Narwhals filter expressions. Applied when reading upstream metadata to filter samples at the source. Example: {"upstream/feature": [nw.col("x") > 10], ...}

  • lazy (bool, default: False ) –

    If True, return metaxy.data_versioning.diff.LazyIncrement with lazy Narwhals LazyFrames. If False, return metaxy.data_versioning.diff.Increment with eager Narwhals DataFrames.

  • **kwargs (Any, default: {} ) –

    Backend-specific parameters

Raises:

  • ValueError

    If no samples DataFrame has been provided when resolving an update for a root feature.

Examples:

# Root feature - samples required
samples = pl.DataFrame({
    "sample_uid": [1, 2, 3],
    PROVENANCE_BY_FIELD_COL: [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
})
result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
# Non-root feature - automatic (normal usage)
result = store.resolve_update(DownstreamFeature)
# Non-root feature - with escape hatch (advanced)
custom_samples = compute_custom_field_provenance(...)
result = store.resolve_update(DownstreamFeature, samples=custom_samples)
Note

Users can then process only added/changed and call write_metadata().

Source code in src/metaxy/metadata_store/base.py
def resolve_update(
    self,
    feature: type[BaseFeature],
    *,
    samples: nw.DataFrame[Any] | nw.LazyFrame[Any] | None = None,
    filters: Mapping[str, Sequence[nw.Expr]] | None = None,
    lazy: bool = False,
    **kwargs: Any,
) -> Increment | LazyIncrement:
    """Calculate an incremental update for a feature.

    Args:
        feature: Feature class to resolve updates for
        samples: Pre-computed DataFrame with ID columns
            and `PROVENANCE_BY_FIELD_COL` column. When provided, `MetadataStore` skips upstream loading, joining,
            and field provenance calculation.

            **Required for root features** (features with no upstream dependencies).
            Root features don't have upstream to calculate `PROVENANCE_BY_FIELD_COL` from, so users
            must provide samples with manually computed `PROVENANCE_BY_FIELD_COL` column.

            For non-root features, use this when you
            want to bypass the automatic upstream loading and field provenance calculation.

            Examples:

            - Loading upstream from custom sources

            - Pre-computing field provenances with custom logic

            - Testing specific scenarios

            Setting this parameter during normal operations is not required.

        filters: Dict mapping feature keys (as strings) to lists of Narwhals filter expressions.
            Applied when reading upstream metadata to filter samples at the source.
            Example: {"upstream/feature": [nw.col("x") > 10], ...}
        lazy: If `True`, return [metaxy.data_versioning.diff.LazyIncrement][] with lazy Narwhals LazyFrames.
            If `False`, return [metaxy.data_versioning.diff.Increment][] with eager Narwhals DataFrames.
        **kwargs: Backend-specific parameters

    Raises:
        ValueError: If no `samples` DataFrame has been provided when resolving an update for a root feature.

    Examples:
        ```py
        # Root feature - samples required
        samples = pl.DataFrame({
            "sample_uid": [1, 2, 3],
            PROVENANCE_BY_FIELD_COL: [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
        })
        result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
        ```

        ```py
        # Non-root feature - automatic (normal usage)
        result = store.resolve_update(DownstreamFeature)
        ```

        ```py
        # Non-root feature - with escape hatch (advanced)
        custom_samples = compute_custom_field_provenance(...)
        result = store.resolve_update(DownstreamFeature, samples=custom_samples)
        ```

    Note:
        Users can then process only added/changed and call write_metadata().
    """
    import narwhals as nw

    plan = feature.graph.get_feature_plan(feature.spec().key)

    # Escape hatch: if samples provided, use them directly (skip join/calculation)
    if samples is not None:
        import logging

        import polars as pl

        logger = logging.getLogger(__name__)

        # Convert samples to lazy if needed
        if isinstance(samples, nw.LazyFrame):
            samples_lazy = samples
        elif isinstance(samples, nw.DataFrame):
            samples_lazy = samples.lazy()
        else:
            samples_lazy = nw.from_native(samples).lazy()

        # Check if samples are Polars-backed (common case for escape hatch)
        samples_native = samples_lazy.to_native()
        is_polars_samples = isinstance(samples_native, (pl.DataFrame, pl.LazyFrame))

        if is_polars_samples and self._supports_native_components():
            # User provided Polars samples but store uses native (SQL) backend
            # Need to materialize current metadata to Polars for compatibility
            logger.warning(
                f"Feature {feature.spec().key}: samples parameter is Polars-backed but store uses native SQL backend. "
                f"Materializing current metadata to Polars for diff comparison. "
                f"For better performance, consider using samples with backend matching the store's backend."
            )
            # Get current metadata and materialize to Polars
            current_lazy_native = self.read_metadata_in_store(
                feature, feature_version=feature.feature_version()
            )
            if current_lazy_native is not None:
                # Convert to Polars using Narwhals' built-in method
                current_lazy = nw.from_native(
                    current_lazy_native.collect().to_polars().lazy()
                )
            else:
                current_lazy = None
        else:
            # Same backend or no conversion needed - direct read
            current_lazy = self.read_metadata_in_store(
                feature, feature_version=feature.feature_version()
            )

        # Use diff resolver to compare samples with current
        from metaxy.data_versioning.diff.narwhals import NarwhalsDiffResolver

        diff_resolver = NarwhalsDiffResolver()

        lazy_result = diff_resolver.find_changes(
            target_provenance=samples_lazy,
            current_metadata=current_lazy,
            id_columns=feature.spec().id_columns,  # Get ID columns from feature spec
        )

        return lazy_result if lazy else lazy_result.collect()

    # Root features without samples: error (samples required)
    if not plan.deps:
        raise ValueError(
            f"Feature {feature.spec().key} has no upstream dependencies (root feature). "
            f"Must provide 'samples' parameter with sample_uid and {PROVENANCE_BY_FIELD_COL} columns. "
            f"Root features require manual {PROVENANCE_BY_FIELD_COL} computation."
        )

    # Non-root features without samples: automatic upstream loading
    # Check where upstream data lives
    upstream_location = self._check_upstream_location(feature)

    if upstream_location == "all_local":
        # All upstream in this store - use native field provenance calculations
        return self._resolve_update_native(feature, filters=filters, lazy=lazy)
    else:
        # Some upstream in fallback stores - use Polars components
        return self._resolve_update_polars(feature, filters=filters, lazy=lazy)