Skip to content

Metadata Stores

Metaxy abstracts interactions with metadata behind an interface called MetadataStore.

Users can extend this class to implement support for arbitrary metadata storage such as databases, lakehouse formats, or really any kind of external system.

Here are some of the built-in metadata store types:

Databases

Storage Only

The full list can be found here


Metadata Store Interface

metaxy.MetadataStore

MetadataStore(*, versioning_engine_cls: type[VersioningEngineT], hash_algorithm: HashAlgorithm | None = None, versioning_engine: VersioningEngineOptions = 'auto', fallback_stores: list[MetadataStore] | None = None, auto_create_tables: bool | None = None, materialization_id: str | None = None)

Bases: ABC

Abstract base class for metadata storage backends.

Parameters:

  • hash_algorithm (HashAlgorithm | None, default: None ) –

    Hash algorithm to use for the versioning engine.

  • versioning_engine (VersioningEngineOptions, default: 'auto' ) –

    Which versioning engine to use.

    • "auto": Prefer the store's native engine, fall back to Polars if needed

    • "native": Always use the store's native engine, raise VersioningEngineMismatchError if provided dataframes are incompatible

    • "polars": Always use the Polars engine

  • fallback_stores (list[MetadataStore] | None, default: None ) –

    Ordered list of read-only fallback stores. Used when upstream features are not in this store. VersioningEngineMismatchError is not raised when reading from fallback stores.

  • auto_create_tables (bool | None, default: None ) –

    If True, automatically create tables when opening the store. If None (default), reads from global MetaxyConfig (which reads from METAXY_AUTO_CREATE_TABLES env var). If False, never auto-create tables.

    Warning

    Auto-create is intended for development/testing only. Use proper database migration tools like Alembic for production deployments.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID. If provided, all metadata writes will include this ID in the metaxy_materialization_id column. Can be overridden per MetadataStore.write_metadata call.

Raises:

  • ValueError

    If fallback stores use different hash algorithms or truncation lengths

  • VersioningEngineMismatchError

    If a user-provided dataframe has a wrong implementation and versioning_engine is set to native

Source code in src/metaxy/metadata_store/base.py
def __init__(
    self,
    *,
    versioning_engine_cls: type[VersioningEngineT],
    hash_algorithm: HashAlgorithm | None = None,
    versioning_engine: VersioningEngineOptions = "auto",
    fallback_stores: list[MetadataStore] | None = None,
    auto_create_tables: bool | None = None,
    materialization_id: str | None = None,
):
    """
    Initialize the metadata store.

    Args:
        hash_algorithm: Hash algorithm to use for the versioning engine.

        versioning_engine: Which versioning engine to use.

            - "auto": Prefer the store's native engine, fall back to Polars if needed

            - "native": Always use the store's native engine, raise `VersioningEngineMismatchError`
                if provided dataframes are incompatible

            - "polars": Always use the Polars engine

        fallback_stores: Ordered list of read-only fallback stores.
            Used when upstream features are not in this store.
            `VersioningEngineMismatchError` is not raised when reading from fallback stores.
        auto_create_tables: If True, automatically create tables when opening the store.
            If None (default), reads from global MetaxyConfig (which reads from METAXY_AUTO_CREATE_TABLES env var).
            If False, never auto-create tables.

            !!! warning
                Auto-create is intended for development/testing only.
                Use proper database migration tools like Alembic for production deployments.

        materialization_id: Optional external orchestration ID.
            If provided, all metadata writes will include this ID in the `metaxy_materialization_id` column.
            Can be overridden per [`MetadataStore.write_metadata`][metaxy.MetadataStore.write_metadata] call.

    Raises:
        ValueError: If fallback stores use different hash algorithms or truncation lengths
        VersioningEngineMismatchError: If a user-provided dataframe has a wrong implementation
            and versioning_engine is set to `native`
    """
    # Initialize state early so properties can check it
    self._is_open = False
    self._context_depth = 0
    self._versioning_engine = versioning_engine
    self._allow_cross_project_writes = False
    self._materialization_id = materialization_id
    self._open_cm: AbstractContextManager[Self] | None = (
        None  # Track the open() context manager
    )
    self.versioning_engine_cls = versioning_engine_cls

    # Resolve auto_create_tables from global config if not explicitly provided
    if auto_create_tables is None:
        from metaxy.config import MetaxyConfig

        self.auto_create_tables = MetaxyConfig.get().auto_create_tables
    else:
        self.auto_create_tables = auto_create_tables

    # Use store's default algorithm if not specified
    if hash_algorithm is None:
        hash_algorithm = self._get_default_hash_algorithm()

    self.hash_algorithm = hash_algorithm

    self.fallback_stores = fallback_stores or []

Attributes

metaxy.MetadataStore.materialization_id property

materialization_id: str | None

The external orchestration ID for this store instance.

If set, all metadata writes include this ID in the metaxy_materialization_id column, allowing filtering of rows written during a specific materialization run.

Functions

metaxy.MetadataStore.resolve_update

resolve_update(feature: CoercibleToFeatureKey, *, samples: IntoFrame | Frame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: Literal[False] = False, versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> Increment
resolve_update(feature: CoercibleToFeatureKey, *, samples: IntoFrame | Frame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: Literal[True], versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> LazyIncrement
resolve_update(feature: CoercibleToFeatureKey, *, samples: IntoFrame | Frame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: bool = False, versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> Increment | LazyIncrement

Calculate an incremental update for a feature.

This is the main workhorse in Metaxy.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature class to resolve updates for

  • samples (IntoFrame | Frame | None, default: None ) –

    A dataframe with joined upstream metadata and "metaxy_provenance_by_field" column set. When provided, MetadataStore skips loading upstream feature metadata and provenance calculations.

    Required for root features

    Metaxy doesn't know how to populate input metadata for root features, so samples argument for must be provided for them.

    Tip

    For non-root features, use samples to customize the automatic upstream loading and field provenance calculation. For example, it can be used to requires processing for specific sample IDs.

    Setting this parameter during normal operations is not required.

  • filters (Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None, default: None ) –

    A mapping from feature keys to lists of Narwhals filter expressions. Keys can be feature classes, FeatureKey objects, or string paths. Applied at read-time. May filter the current feature, in this case it will also be applied to samples (if provided). Example: {UpstreamFeature: [nw.col("x") > 10], ...}

  • global_filters (Sequence[Expr] | None, default: None ) –

    A list of Narwhals filter expressions applied to all features. These filters are combined with any feature-specific filters from filters. Useful for filtering by common columns like sample_uid across all features. Example: [nw.col("sample_uid").is_in(["s1", "s2"])]

  • lazy (bool, default: False ) –
  • versioning_engine (Literal['auto', 'native', 'polars'] | None, default: None ) –

    Override the store's versioning engine for this operation.

  • skip_comparison (bool, default: False ) –

    If True, skip the increment comparison logic and return all upstream samples in Increment.added. The changed and removed frames will be empty.

Raises:

  • ValueError

    If no samples dataframe has been provided when resolving an update for a root feature.

  • VersioningEngineMismatchError

    If versioning_engine has been set to "native" and a dataframe of a different implementation has been encountered during resolve_update.

With a root feature

samples = pl.DataFrame({
    "sample_uid": [1, 2, 3],
    "metaxy_provenance_by_field": [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
})
result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
Source code in src/metaxy/metadata_store/base.py
def resolve_update(
    self,
    feature: CoercibleToFeatureKey,
    *,
    samples: IntoFrame | Frame | None = None,
    filters: Mapping[CoercibleToFeatureKey, Sequence[nw.Expr]] | None = None,
    global_filters: Sequence[nw.Expr] | None = None,
    lazy: bool = False,
    versioning_engine: Literal["auto", "native", "polars"] | None = None,
    skip_comparison: bool = False,
    **kwargs: Any,
) -> Increment | LazyIncrement:
    """Calculate an incremental update for a feature.

    This is the main workhorse in Metaxy.

    Args:
        feature: Feature class to resolve updates for
        samples: A dataframe with joined upstream metadata and `"metaxy_provenance_by_field"` column set.
            When provided, `MetadataStore` skips loading upstream feature metadata and provenance calculations.

            !!! info "Required for root features"
                Metaxy doesn't know how to populate input metadata for root features,
                so `samples` argument for **must** be provided for them.

            !!! tip
                For non-root features, use `samples` to customize the automatic upstream loading and field provenance calculation.
                For example, it can be used to requires processing for specific sample IDs.

            Setting this parameter during normal operations is not required.

        filters: A mapping from feature keys to lists of Narwhals filter expressions.
            Keys can be feature classes, FeatureKey objects, or string paths.
            Applied at read-time. May filter the current feature,
            in this case it will also be applied to `samples` (if provided).
            Example: `{UpstreamFeature: [nw.col("x") > 10], ...}`
        global_filters: A list of Narwhals filter expressions applied to all features.
            These filters are combined with any feature-specific filters from `filters`.
            Useful for filtering by common columns like `sample_uid` across all features.
            Example: `[nw.col("sample_uid").is_in(["s1", "s2"])]`
        lazy: Whether to return a [metaxy.versioning.types.LazyIncrement][] or a [metaxy.versioning.types.Increment][].
        versioning_engine: Override the store's versioning engine for this operation.
        skip_comparison: If True, skip the increment comparison logic and return all
            upstream samples in `Increment.added`. The `changed` and `removed` frames will
            be empty.

    Raises:
        ValueError: If no `samples` dataframe has been provided when resolving an update for a root feature.
        VersioningEngineMismatchError: If `versioning_engine` has been set to `"native"`
            and a dataframe of a different implementation has been encountered during `resolve_update`.

    !!! example "With a root feature"

        ```py
        samples = pl.DataFrame({
            "sample_uid": [1, 2, 3],
            "metaxy_provenance_by_field": [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
        })
        result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
        ```
    """
    import narwhals as nw

    # Convert samples to Narwhals frame if not already
    samples_nw: nw.DataFrame[Any] | nw.LazyFrame[Any] | None = None
    if samples is not None:
        if isinstance(samples, (nw.DataFrame, nw.LazyFrame)):
            samples_nw = samples
        else:
            samples_nw = nw.from_native(samples)  # ty: ignore[invalid-assignment]

    # Normalize filter keys to FeatureKey
    normalized_filters: dict[FeatureKey, list[nw.Expr]] = {}
    if filters:
        for key, exprs in filters.items():
            feature_key = self._resolve_feature_key(key)
            normalized_filters[feature_key] = list(exprs)

    # Convert global_filters to a list for easy concatenation
    global_filter_list = list(global_filters) if global_filters else []

    feature_key = self._resolve_feature_key(feature)
    graph = current_graph()
    plan = graph.get_feature_plan(feature_key)

    # Root features without samples: error (samples required)
    if not plan.deps and samples_nw is None:
        raise ValueError(
            f"Feature {feature_key} has no upstream dependencies (root feature). "
            f"Must provide 'samples' parameter with sample_uid and {METAXY_PROVENANCE_BY_FIELD} columns. "
            f"Root features require manual {METAXY_PROVENANCE_BY_FIELD} computation."
        )

    # Combine feature-specific filters with global filters
    current_feature_filters = [
        *normalized_filters.get(feature_key, []),
        *global_filter_list,
    ]

    # Read current metadata with deduplication (latest_only=True by default)
    # Use allow_fallback=False since we only want metadata from THIS store
    # to determine what needs to be updated locally
    try:
        current_metadata: nw.LazyFrame[Any] | None = self.read_metadata(
            feature_key,
            filters=current_feature_filters if current_feature_filters else None,
            allow_fallback=False,
            current_only=True,  # filters by current feature_version
            latest_only=True,  # deduplicates by id_columns, keeping latest
        )
    except FeatureNotFoundError:
        current_metadata = None

    upstream_by_key: dict[FeatureKey, nw.LazyFrame[Any]] = {}
    filters_by_key: dict[FeatureKey, list[nw.Expr]] = {}

    # if samples are provided, use them as source of truth for upstream data
    if samples_nw is not None:
        # Apply filters to samples if any
        filtered_samples = samples_nw
        if current_feature_filters:
            filtered_samples = samples_nw.filter(current_feature_filters)

        # fill in METAXY_PROVENANCE column if it's missing (e.g. for root features)
        samples_nw = self.hash_struct_version_column(
            plan,
            df=filtered_samples,
            struct_column=METAXY_PROVENANCE_BY_FIELD,
            hash_column=METAXY_PROVENANCE,
        )

        # For root features, add data_version columns if they don't exist
        # (root features have no computation, so data_version equals provenance)
        # Use collect_schema().names() to avoid PerformanceWarning on lazy frames
        if METAXY_DATA_VERSION_BY_FIELD not in samples_nw.collect_schema().names():
            samples_nw = samples_nw.with_columns(
                nw.col(METAXY_PROVENANCE_BY_FIELD).alias(
                    METAXY_DATA_VERSION_BY_FIELD
                ),
                nw.col(METAXY_PROVENANCE).alias(METAXY_DATA_VERSION),
            )
    else:
        for upstream_spec in plan.deps or []:
            # Combine feature-specific filters with global filters for upstream
            upstream_filters = [
                *normalized_filters.get(upstream_spec.key, []),
                *global_filter_list,
            ]
            upstream_feature_metadata = self.read_metadata(
                upstream_spec.key,
                filters=upstream_filters,
            )
            if upstream_feature_metadata is not None:
                upstream_by_key[upstream_spec.key] = upstream_feature_metadata

    # determine which implementation to use for resolving the increment
    # consider (1) whether all upstream metadata has been loaded with the native implementation
    # (2) if samples have native implementation

    # Use parameter if provided, otherwise use store default
    engine_mode = (
        versioning_engine
        if versioning_engine is not None
        else self._versioning_engine
    )

    # If "polars" mode, force Polars immediately
    if engine_mode == "polars":
        implementation = nw.Implementation.POLARS
        switched_to_polars = True
    else:
        implementation = self.native_implementation()
        switched_to_polars = False

        for upstream_key, df in upstream_by_key.items():
            if df.implementation != implementation:
                switched_to_polars = True
                # Only raise error in "native" mode if no fallback stores configured.
                # If fallback stores exist, the implementation mismatch indicates data came
                # from fallback (different implementation), which is legitimate fallback access.
                # If data were local, it would have the native implementation.
                if engine_mode == "native" and not self.fallback_stores:
                    raise VersioningEngineMismatchError(
                        f"versioning_engine='native' but upstream feature `{upstream_key.to_string()}` "
                        f"has implementation {df.implementation}, expected {self.native_implementation()}"
                    )
                elif engine_mode == "auto" or (
                    engine_mode == "native" and self.fallback_stores
                ):
                    PolarsMaterializationWarning.warn_on_implementation_mismatch(
                        expected=self.native_implementation(),
                        actual=df.implementation,
                        message=f"Using Polars for resolving the increment instead. This was caused by upstream feature `{upstream_key.to_string()}`.",
                    )
                implementation = nw.Implementation.POLARS
                break

        if (
            samples_nw is not None
            and samples_nw.implementation != self.native_implementation()
        ):
            if not switched_to_polars:
                if engine_mode == "native":
                    # Always raise error for samples with wrong implementation, regardless
                    # of fallback stores, because samples come from user argument, not from fallback
                    raise VersioningEngineMismatchError(
                        f"versioning_engine='native' but provided `samples` have implementation {samples_nw.implementation}, "
                        f"expected {self.native_implementation()}"
                    )
                elif engine_mode == "auto":
                    PolarsMaterializationWarning.warn_on_implementation_mismatch(
                        expected=self.native_implementation(),
                        actual=samples_nw.implementation,
                        message=f"Provided `samples` have implementation {samples_nw.implementation}. Using Polars for resolving the increment instead.",
                    )
            implementation = nw.Implementation.POLARS
            switched_to_polars = True

    if switched_to_polars:
        if current_metadata:
            current_metadata = switch_implementation_to_polars(current_metadata)
        if samples_nw:
            samples_nw = switch_implementation_to_polars(samples_nw)
        for upstream_key, df in upstream_by_key.items():
            upstream_by_key[upstream_key] = switch_implementation_to_polars(df)

    with self.create_versioning_engine(
        plan=plan, implementation=implementation
    ) as engine:
        if skip_comparison:
            # Skip comparison: return all upstream samples as added
            if samples_nw is not None:
                # Root features or user-provided samples: use samples directly
                # Note: samples already has metaxy_provenance computed
                added = samples_nw.lazy()
                input_df = None  # Root features have no upstream input
            else:
                # Non-root features: load all upstream with provenance
                added = engine.load_upstream_with_provenance(
                    upstream=upstream_by_key,
                    hash_algo=self.hash_algorithm,
                    filters=filters_by_key,
                )
                input_df = (
                    added  # Input is the same as added when skipping comparison
                )
            changed = None
            removed = None
        else:
            added, changed, removed, input_df = (
                engine.resolve_increment_with_provenance(
                    current=current_metadata,
                    upstream=upstream_by_key,
                    hash_algorithm=self.hash_algorithm,
                    filters=filters_by_key,
                    sample=samples_nw.lazy() if samples_nw is not None else None,
                )
            )

    # Convert None to empty DataFrames
    if changed is None:
        changed = empty_frame_like(added)
    if removed is None:
        removed = empty_frame_like(added)

    if lazy:
        return LazyIncrement(
            added=added
            if isinstance(added, nw.LazyFrame)
            else nw.from_native(added),
            changed=changed
            if isinstance(changed, nw.LazyFrame)
            else nw.from_native(changed),
            removed=removed
            if isinstance(removed, nw.LazyFrame)
            else nw.from_native(removed),
            input=input_df
            if input_df is None or isinstance(input_df, nw.LazyFrame)
            else nw.from_native(input_df),
        )
    else:
        return Increment(
            added=added.collect() if isinstance(added, nw.LazyFrame) else added,
            changed=changed.collect()
            if isinstance(changed, nw.LazyFrame)
            else changed,
            removed=removed.collect()
            if isinstance(removed, nw.LazyFrame)
            else removed,
        )

metaxy.MetadataStore.compute_provenance

compute_provenance(feature: CoercibleToFeatureKey, df: FrameT) -> FrameT

Compute provenance columns for a DataFrame with pre-joined upstream data.

Note

This method may be useful in very rare cases. Rely on MetadataStore.resolve_update instead.

Use this method when you perform custom joins outside of Metaxy's auto-join system but still want Metaxy to compute provenance. The method computes metaxy_provenance_by_field, metaxy_provenance, metaxy_data_version_by_field, and metaxy_data_version columns based on the upstream metadata.

Info

The input DataFrame must contain the renamed metaxy_data_version_by_field columns from each upstream feature. The naming convention follows the pattern metaxy_data_version_by_field__<feature_key.to_column_suffix()>. For example, for an upstream feature with key ["video", "raw"], the column should be named metaxy_data_version_by_field__video_raw.

Parameters:

  • feature (CoercibleToFeatureKey) –

    The feature to compute provenance for.

  • df (FrameT) –

    A DataFrame containing pre-joined upstream data with renamed metaxy_data_version_by_field columns from each upstream feature.

Returns:

  • FrameT

    The input DataFrame with provenance columns added. Returns the same

  • FrameT

    frame type as the input, either an eager DataFrame or a LazyFrame.

Raises:

  • StoreNotOpenError

    If the store is not open.

  • ValueError

    If required upstream metaxy_data_version_by_field columns are missing from the DataFrame.

Example
    # Read upstream metadata
    video_df = store.read_metadata(VideoFeature).collect()
    audio_df = store.read_metadata(AudioFeature).collect()

    # Rename data_version_by_field columns to the expected convention
    video_df = video_df.rename({
        "metaxy_data_version_by_field": "metaxy_data_version_by_field__video_raw"
    })
    audio_df = audio_df.rename({
        "metaxy_data_version_by_field": "metaxy_data_version_by_field__audio_raw"
    })

    # Perform custom join
    joined = video_df.join(audio_df, on="sample_uid", how="inner")

    # Compute provenance
    with_provenance = store.compute_provenance(MyFeature, joined)

    # Pass to resolve_update
    increment = store.resolve_update(MyFeature, samples=with_provenance)
Source code in src/metaxy/metadata_store/base.py
def compute_provenance(
    self,
    feature: CoercibleToFeatureKey,
    df: FrameT,
) -> FrameT:
    """Compute provenance columns for a DataFrame with pre-joined upstream data.

    !!! note
        This method may be useful in very rare cases.
        Rely on [`MetadataStore.resolve_update`][metaxy.metadata_store.base.MetadataStore.resolve_update] instead.

    Use this method when you perform custom joins outside of Metaxy's auto-join
    system but still want Metaxy to compute provenance. The method computes
    metaxy_provenance_by_field, metaxy_provenance, metaxy_data_version_by_field,
    and metaxy_data_version columns based on the upstream metadata.

    !!! info
        The input DataFrame must contain the renamed metaxy_data_version_by_field
        columns from each upstream feature. The naming convention follows the pattern
        `metaxy_data_version_by_field__<feature_key.to_column_suffix()>`. For example, for an
        upstream feature with key `["video", "raw"]`, the column should be named
        `metaxy_data_version_by_field__video_raw`.

    Args:
        feature: The feature to compute provenance for.
        df: A DataFrame containing pre-joined upstream data with renamed
            metaxy_data_version_by_field columns from each upstream feature.

    Returns:
        The input DataFrame with provenance columns added. Returns the same
        frame type as the input, either an eager DataFrame or a LazyFrame.

    Raises:
        StoreNotOpenError: If the store is not open.
        ValueError: If required upstream `metaxy_data_version_by_field` columns
            are missing from the DataFrame.

    Example:
        ```py

            # Read upstream metadata
            video_df = store.read_metadata(VideoFeature).collect()
            audio_df = store.read_metadata(AudioFeature).collect()

            # Rename data_version_by_field columns to the expected convention
            video_df = video_df.rename({
                "metaxy_data_version_by_field": "metaxy_data_version_by_field__video_raw"
            })
            audio_df = audio_df.rename({
                "metaxy_data_version_by_field": "metaxy_data_version_by_field__audio_raw"
            })

            # Perform custom join
            joined = video_df.join(audio_df, on="sample_uid", how="inner")

            # Compute provenance
            with_provenance = store.compute_provenance(MyFeature, joined)

            # Pass to resolve_update
            increment = store.resolve_update(MyFeature, samples=with_provenance)
        ```
    """
    self._check_open()

    feature_key = self._resolve_feature_key(feature)
    graph = current_graph()
    plan = graph.get_feature_plan(feature_key)

    # Use native implementation if DataFrame matches, otherwise fall back to Polars
    implementation = self.native_implementation()
    if df.implementation != implementation:
        implementation = nw.Implementation.POLARS
        df = switch_implementation_to_polars(df)  # ty: ignore[no-matching-overload]

    with self.create_versioning_engine(
        plan=plan, implementation=implementation
    ) as engine:
        # Validate required upstream columns exist
        expected_columns = {
            dep.feature: engine.get_renamed_data_version_by_field_col(dep.feature)
            for dep in (plan.feature_deps or [])
        }

        df_columns = set(df.collect_schema().names())  # ty: ignore[invalid-argument-type]
        missing_columns = [
            f"{col} (from upstream feature {key.to_string()})"
            for key, col in expected_columns.items()
            if col not in df_columns
        ]

        if missing_columns:
            raise ValueError(
                f"DataFrame is missing required upstream columns for computing "
                f"provenance of feature {feature_key.to_string()}. "
                f"Missing columns: {missing_columns}. "
                f"Make sure to rename metaxy_data_version_by_field columns from "
                f"each upstream feature using the pattern "
                f"metaxy_data_version_by_field__<feature_key.table_name>."
            )

        return engine.compute_provenance_columns(df, hash_algo=self.hash_algorithm)  # ty: ignore[invalid-argument-type]

metaxy.MetadataStore.read_metadata

read_metadata(feature: CoercibleToFeatureKey, *, feature_version: str | None = None, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, allow_fallback: bool = True, current_only: bool = True, latest_only: bool = True) -> LazyFrame[Any]

Read metadata with optional fallback to upstream stores.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to read metadata for

  • feature_version (str | None, default: None ) –

    Explicit feature_version to filter by (mutually exclusive with current_only=True)

  • filters (Sequence[Expr] | None, default: None ) –

    Sequence of Narwhals filter expressions to apply to this feature. Example: [nw.col("x") > 10, nw.col("y") < 5]

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to include. Metaxy's system columns are always included.

  • allow_fallback (bool, default: True ) –

    If True, check fallback stores on local miss

  • current_only (bool, default: True ) –

    If True, only return rows with current feature_version

  • latest_only (bool, default: True ) –

    Whether to deduplicate samples within id_columns groups ordered by metaxy_created_at.

Returns:

Raises:

Info

When this method is called with default arguments, it will return the latest (by metaxy_created_at) metadata for the current feature version. Therefore, it's perfectly suitable for most use cases.

Warning

The order of rows is not guaranteed.

Source code in src/metaxy/metadata_store/base.py
def read_metadata(
    self,
    feature: CoercibleToFeatureKey,
    *,
    feature_version: str | None = None,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    current_only: bool = True,
    latest_only: bool = True,
) -> nw.LazyFrame[Any]:
    """
    Read metadata with optional fallback to upstream stores.

    Args:
        feature: Feature to read metadata for
        feature_version: Explicit feature_version to filter by (mutually exclusive with current_only=True)
        filters: Sequence of Narwhals filter expressions to apply to this feature.
            Example: `[nw.col("x") > 10, nw.col("y") < 5]`
        columns: Subset of columns to include. Metaxy's system columns are always included.
        allow_fallback: If `True`, check fallback stores on local miss
        current_only: If `True`, only return rows with current feature_version
        latest_only: Whether to deduplicate samples within `id_columns` groups ordered by `metaxy_created_at`.

    Returns:
        Narwhals LazyFrame with metadata

    Raises:
        FeatureNotFoundError: If feature not found in any store
        SystemDataNotFoundError: When attempting to read non-existent Metaxy system data
        ValueError: If both feature_version and current_only=True are provided

    !!! info
        When this method is called with default arguments, it will return the latest (by `metaxy_created_at`)
        metadata for the current feature version. Therefore, it's perfectly suitable for most use cases.

    !!! warning
        The order of rows is not guaranteed.
    """
    self._check_open()

    filters = filters or []
    columns = columns or []

    feature_key = self._resolve_feature_key(feature)
    is_system_table = self._is_system_table(feature_key)

    # Validate mutually exclusive parameters
    if feature_version is not None and current_only:
        raise ValueError(
            "Cannot specify both feature_version and current_only=True. "
            "Use current_only=False with feature_version parameter."
        )

    # Add feature_version filter only when needed
    if current_only or feature_version is not None and not is_system_table:
        version_filter = nw.col(METAXY_FEATURE_VERSION) == (
            current_graph().get_feature_version(feature_key)
            if current_only
            else feature_version
        )
        filters = [version_filter, *filters]

    if columns and not is_system_table:
        # Add only system columns that aren't already in the user's columns list
        columns_set = set(columns)
        missing_system_cols = [
            c for c in ALL_SYSTEM_COLUMNS if c not in columns_set
        ]
        read_columns = [*columns, *missing_system_cols]
    else:
        read_columns = None

    lazy_frame = None
    try:
        lazy_frame = self.read_metadata_in_store(
            feature, filters=filters, columns=read_columns
        )
    except FeatureNotFoundError as e:
        # do not read system features from fallback stores
        if is_system_table:
            raise SystemDataNotFoundError(
                f"System Metaxy data with key {feature_key} is missing in {self.display()}. Invoke `metaxy graph push` before attempting to read system data."
            ) from e

    # Handle case where read_metadata_in_store returns None (no exception raised)
    if lazy_frame is None and is_system_table:
        raise SystemDataNotFoundError(
            f"System Metaxy data with key {feature_key} is missing in {self.display()}. Invoke `metaxy graph push` before attempting to read system data."
        )

    if lazy_frame is not None and not is_system_table and latest_only:
        from metaxy.models.constants import METAXY_CREATED_AT

        # Apply deduplication
        lazy_frame = self.versioning_engine_cls.keep_latest_by_group(
            df=lazy_frame,
            group_columns=list(
                self._resolve_feature_plan(feature_key).feature.id_columns
            ),
            timestamp_column=METAXY_CREATED_AT,
        )

    if lazy_frame is not None:
        # After dedup, filter to requested columns if specified
        if columns:
            lazy_frame = lazy_frame.select(columns)

        return lazy_frame

    # Try fallback stores (opened on demand)
    if allow_fallback:
        for store in self.fallback_stores:
            try:
                # Open fallback store on demand for reading
                with store:
                    # Use full read_metadata to handle nested fallback chains
                    return store.read_metadata(
                        feature,
                        feature_version=feature_version,
                        filters=filters,
                        columns=columns,
                        allow_fallback=True,
                        current_only=current_only,
                        latest_only=latest_only,
                    )
            except FeatureNotFoundError:
                # Try next fallback store
                continue

    # Not found anywhere
    raise FeatureNotFoundError(
        f"Feature {feature_key.to_string()} not found in store"
        + (" or fallback stores" if allow_fallback else "")
    )

metaxy.MetadataStore.write_metadata

write_metadata(feature: CoercibleToFeatureKey, df: IntoFrame, materialization_id: str | None = None) -> None

Write metadata for a feature (append-only by design).

Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to write metadata for

  • df (IntoFrame) –

    Metadata DataFrame of any type supported by Narwhals. Must have metaxy_provenance_by_field column of type Struct with fields matching feature's fields. Optionally, may also contain metaxy_data_version_by_field.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID for this write. Overrides the store's default materialization_id if provided. Useful for tracking which orchestration run produced this metadata.

Raises:

Note
  • Must be called within a MetadataStore.open(mode="write") context manager.

  • Metaxy always performs an "append" operation. Metadata is never deleted or mutated.

  • Fallback stores are never used for writes.

  • Features from other Metaxy projects cannot be written to, unless project validation has been disabled with MetadataStore.allow_cross_project_writes.

Source code in src/metaxy/metadata_store/base.py
def write_metadata(
    self,
    feature: CoercibleToFeatureKey,
    df: IntoFrame,
    materialization_id: str | None = None,
) -> None:
    """
    Write metadata for a feature (append-only by design).

    Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.

    Args:
        feature: Feature to write metadata for
        df: Metadata DataFrame of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).
            Must have `metaxy_provenance_by_field` column of type Struct with fields matching feature's fields.
            Optionally, may also contain `metaxy_data_version_by_field`.
        materialization_id: Optional external orchestration ID for this write.
            Overrides the store's default `materialization_id` if provided.
            Useful for tracking which orchestration run produced this metadata.

    Raises:
        MetadataSchemaError: If DataFrame schema is invalid
        StoreNotOpenError: If store is not open
        ValueError: If writing to a feature from a different project than expected

    Note:
        - Must be called within a `MetadataStore.open(mode="write")` context manager.

        - Metaxy always performs an "append" operation. Metadata is never deleted or mutated.

        - Fallback stores are never used for writes.

        - Features from other Metaxy projects cannot be written to, unless project validation has been disabled with [MetadataStore.allow_cross_project_writes][].

    """
    self._check_open()

    feature_key = self._resolve_feature_key(feature)
    is_system_table = self._is_system_table(feature_key)

    # Validate project for non-system tables
    if not is_system_table:
        self._validate_project_write(feature)

    # Convert Polars to Narwhals to Polars if needed
    # if isinstance(df_nw, (pl.DataFrame, pl.LazyFrame)):
    df_nw = nw.from_native(df)

    assert isinstance(df_nw, (nw.DataFrame, nw.LazyFrame)), (
        f"df must be a Narwhals DataFrame, got {type(df_nw)}"
    )

    # For system tables, write directly without feature_version tracking
    if is_system_table:
        self._validate_schema_system_table(df_nw)
        self.write_metadata_to_store(feature_key, df_nw)
        return

    # Use collect_schema().names() to avoid PerformanceWarning on lazy frames
    if METAXY_PROVENANCE_BY_FIELD not in df_nw.collect_schema().names():
        from metaxy.metadata_store.exceptions import MetadataSchemaError

        raise MetadataSchemaError(
            f"DataFrame must have '{METAXY_PROVENANCE_BY_FIELD}' column"
        )

    # Add all required system columns
    # warning: for dataframes that do not match the native MetadataStore implementation
    # and are missing the METAXY_DATA_VERSION column, this call will lead to materializing the equivalent Polars DataFrame
    # while calculating the missing METAXY_DATA_VERSION column
    df_nw = self._add_system_columns(
        df_nw, feature, materialization_id=materialization_id
    )

    self._validate_schema(df_nw)
    self.write_metadata_to_store(feature_key, df_nw)

metaxy.MetadataStore.write_metadata_multi

write_metadata_multi(metadata: Mapping[Any, IntoFrame], materialization_id: str | None = None) -> None

Write metadata for multiple features in reverse topological order.

Processes features so that dependents are written before their dependencies. This ordering ensures that downstream features are written first, which can be useful for certain data consistency requirements or when features need to be processed in a specific order.

Parameters:

  • metadata (Mapping[Any, IntoFrame]) –

    Mapping from feature keys to metadata DataFrames. Keys can be any type coercible to FeatureKey (string, sequence, FeatureKey, or BaseFeature class). Values must be DataFrames compatible with Narwhals, containing required system columns.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID for all writes. Overrides the store's default materialization_id if provided. Applied to all feature writes in this batch.

Raises:

Note
  • Must be called within a MetadataStore.open(mode="write") context manager.
  • Empty mappings are handled gracefully (no-op).
  • Each feature's metadata is written via write_metadata, so all validation and system column handling from that method applies.
Example
with store.open(mode="write"):
    store.write_metadata_multi({
        ChildFeature: child_df,
        ParentFeature: parent_df,
    })
# Features are written in reverse topological order:
# ChildFeature first, then ParentFeature
Source code in src/metaxy/metadata_store/base.py
def write_metadata_multi(
    self,
    metadata: Mapping[Any, IntoFrame],
    materialization_id: str | None = None,
) -> None:
    """
    Write metadata for multiple features in reverse topological order.

    Processes features so that dependents are written before their dependencies.
    This ordering ensures that downstream features are written first, which can
    be useful for certain data consistency requirements or when features need
    to be processed in a specific order.

    Args:
        metadata: Mapping from feature keys to metadata DataFrames.
            Keys can be any type coercible to FeatureKey (string, sequence,
            FeatureKey, or BaseFeature class). Values must be DataFrames
            compatible with Narwhals, containing required system columns.
        materialization_id: Optional external orchestration ID for all writes.
            Overrides the store's default `materialization_id` if provided.
            Applied to all feature writes in this batch.

    Raises:
        MetadataSchemaError: If any DataFrame schema is invalid
        StoreNotOpenError: If store is not open
        ValueError: If writing to a feature from a different project than expected

    Note:
        - Must be called within a `MetadataStore.open(mode="write")` context manager.
        - Empty mappings are handled gracefully (no-op).
        - Each feature's metadata is written via `write_metadata`, so all
          validation and system column handling from that method applies.

    Example:
        ```py
        with store.open(mode="write"):
            store.write_metadata_multi({
                ChildFeature: child_df,
                ParentFeature: parent_df,
            })
        # Features are written in reverse topological order:
        # ChildFeature first, then ParentFeature
        ```
    """
    if not metadata:
        return

    # Build mapping from resolved keys to dataframes in one pass
    resolved_metadata = {
        self._resolve_feature_key(key): df for key, df in metadata.items()
    }

    # Get reverse topological order (dependents first)
    graph = current_graph()
    sorted_keys = graph.topological_sort_features(
        list(resolved_metadata.keys()), descending=True
    )

    # Write metadata in reverse topological order
    for feature_key in sorted_keys:
        self.write_metadata(
            feature_key,
            resolved_metadata[feature_key],
            materialization_id=materialization_id,
        )

metaxy.MetadataStore.config_model abstractmethod classmethod

config_model() -> type[MetadataStoreConfig]

Return the configuration model class for this store type.

Subclasses must override this to return their specific config class.

Returns:

Note

Subclasses override this with a more specific return type. Type checkers may show a warning about incompatible override, but this is intentional - each store returns its own config type.

Source code in src/metaxy/metadata_store/base.py
@classmethod
@abstractmethod
def config_model(cls) -> type[MetadataStoreConfig]:
    """Return the configuration model class for this store type.

    Subclasses must override this to return their specific config class.

    Returns:
        The config class type (e.g., DuckDBMetadataStoreConfig)

    Note:
        Subclasses override this with a more specific return type.
        Type checkers may show a warning about incompatible override,
        but this is intentional - each store returns its own config type.
    """
    ...

metaxy.MetadataStore.from_config classmethod

from_config(config: MetadataStoreConfig, **kwargs: Any) -> Self

Create a store instance from a configuration object.

This method creates a store by: 1. Converting the config to a dict 2. Resolving fallback store names to actual store instances 3. Calling the store's init with the config parameters

Parameters:

  • config (MetadataStoreConfig) –

    Configuration object (should be the type returned by config_model())

  • **kwargs (Any, default: {} ) –

    Additional arguments passed directly to the store constructor (e.g., materialization_id for runtime parameters not in config)

Returns:

  • Self

    A new store instance configured according to the config object

Example
from metaxy.metadata_store.duckdb import (
    DuckDBMetadataStore,
    DuckDBMetadataStoreConfig,
)

config = DuckDBMetadataStoreConfig(
    database="metadata.db",
    fallback_stores=["prod"],
)

store = DuckDBMetadataStore.from_config(config)
Source code in src/metaxy/metadata_store/base.py
@classmethod
def from_config(cls, config: MetadataStoreConfig, **kwargs: Any) -> Self:
    """Create a store instance from a configuration object.

    This method creates a store by:
    1. Converting the config to a dict
    2. Resolving fallback store names to actual store instances
    3. Calling the store's __init__ with the config parameters

    Args:
        config: Configuration object (should be the type returned by config_model())
        **kwargs: Additional arguments passed directly to the store constructor
            (e.g., materialization_id for runtime parameters not in config)

    Returns:
        A new store instance configured according to the config object

    Example:
        ```python
        from metaxy.metadata_store.duckdb import (
            DuckDBMetadataStore,
            DuckDBMetadataStoreConfig,
        )

        config = DuckDBMetadataStoreConfig(
            database="metadata.db",
            fallback_stores=["prod"],
        )

        store = DuckDBMetadataStore.from_config(config)
        ```
    """
    # Convert config to dict, excluding unset values
    config_dict = config.model_dump(exclude_unset=True)

    # Pop and resolve fallback store names to actual store instances
    fallback_store_names = config_dict.pop("fallback_stores", [])
    fallback_stores = [
        MetaxyConfig.get().get_store(name) for name in fallback_store_names
    ]

    # Create store with resolved fallback stores, config, and extra kwargs
    return cls(fallback_stores=fallback_stores, **config_dict, **kwargs)

metaxy.MetadataStore.native_implementation

native_implementation() -> Implementation

Get the native Narwhals implementation for this store's backend.

Source code in src/metaxy/metadata_store/base.py
def native_implementation(self) -> nw.Implementation:
    """Get the native Narwhals implementation for this store's backend."""
    return self.versioning_engine_cls.implementation()

metaxy.MetadataStore.create_versioning_engine

create_versioning_engine(plan: FeaturePlan, implementation: Implementation) -> Iterator[VersioningEngine | PolarsVersioningEngine]

Creates an appropriate provenance engine.

Falls back to Polars implementation if the required implementation differs from the store's native implementation.

Parameters:

  • plan (FeaturePlan) –

    The feature plan.

  • implementation (Implementation) –

    The desired engine implementation.

Returns:

  • Iterator[VersioningEngine | PolarsVersioningEngine]

    An appropriate provenance engine.

Source code in src/metaxy/metadata_store/base.py
@contextmanager
def create_versioning_engine(
    self, plan: FeaturePlan, implementation: nw.Implementation
) -> Iterator[VersioningEngine | PolarsVersioningEngine]:
    """
    Creates an appropriate provenance engine.

    Falls back to Polars implementation if the required implementation differs from the store's native implementation.

    Args:
        plan: The feature plan.
        implementation: The desired engine implementation.

    Returns:
        An appropriate provenance engine.
    """

    if implementation == nw.Implementation.POLARS:
        cm = self._create_polars_versioning_engine(plan)
    elif implementation == self.native_implementation():
        cm = self._create_versioning_engine(plan)
    else:
        cm = self._create_polars_versioning_engine(plan)

    with cm as engine:
        yield engine

metaxy.MetadataStore.open abstractmethod

open(mode: AccessMode = 'read') -> Iterator[Self]

Open/initialize the store for operations.

Context manager that opens the store with specified access mode. Called internally by __enter__. Child classes should implement backend-specific connection setup/teardown here.

Parameters:

  • mode (AccessMode, default: 'read' ) –

    Access mode for this connection session.

Yields:

  • Self ( Self ) –

    The store instance with connection open

Note

Users should prefer using with store: pattern except when write access mode is needed.

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
@contextmanager
def open(self, mode: AccessMode = "read") -> Iterator[Self]:
    """Open/initialize the store for operations.

    Context manager that opens the store with specified access mode.
    Called internally by `__enter__`.
    Child classes should implement backend-specific connection setup/teardown here.

    Args:
        mode: Access mode for this connection session.

    Yields:
        Self: The store instance with connection open

    Note:
        Users should prefer using `with store:` pattern except when write access mode is needed.
    """
    ...

metaxy.MetadataStore.__enter__

__enter__() -> Self

Enter context manager - opens store in READ mode by default.

Use MetadataStore.open for write access mode instead.

Returns:

  • Self ( Self ) –

    The opened store instance

Source code in src/metaxy/metadata_store/base.py
def __enter__(self) -> Self:
    """Enter context manager - opens store in READ mode by default.

    Use [`MetadataStore.open`][metaxy.metadata_store.base.MetadataStore.open] for write access mode instead.

    Returns:
        Self: The opened store instance
    """
    # Determine mode based on auto_create_tables
    mode = "write" if self.auto_create_tables else "read"

    # Open the store (open() manages _context_depth internally)
    self._open_cm = self.open(mode)  # ty: ignore[invalid-assignment]
    self._open_cm.__enter__()  # ty: ignore[possibly-missing-attribute]

    return self

metaxy.MetadataStore.validate_hash_algorithm

validate_hash_algorithm(check_fallback_stores: bool = True) -> None

Validate that hash algorithm is supported by this store's components.

Public method - can be called to verify hash compatibility.

Parameters:

  • check_fallback_stores (bool, default: True ) –

    If True, also validate hash is supported by fallback stores (ensures compatibility for future cross-store operations)

Raises:

  • ValueError

    If hash algorithm not supported by components or fallback stores

Source code in src/metaxy/metadata_store/base.py
def validate_hash_algorithm(
    self,
    check_fallback_stores: bool = True,
) -> None:
    """Validate that hash algorithm is supported by this store's components.

    Public method - can be called to verify hash compatibility.

    Args:
        check_fallback_stores: If True, also validate hash is supported by
            fallback stores (ensures compatibility for future cross-store operations)

    Raises:
        ValueError: If hash algorithm not supported by components or fallback stores
    """
    # Validate hash algorithm support without creating a full engine
    # (engine creation requires a graph which isn't available during store init)
    self._validate_hash_algorithm_support()

    # Check fallback stores
    if check_fallback_stores:
        for fallback in self.fallback_stores:
            fallback.validate_hash_algorithm(check_fallback_stores=False)

metaxy.MetadataStore.allow_cross_project_writes

allow_cross_project_writes() -> Iterator[None]

Context manager to temporarily allow cross-project writes.

This is an escape hatch for legitimate cross-project operations like migrations, where metadata needs to be written to features from different projects.

Example
# During migration, allow writing to features from different projects
with store.allow_cross_project_writes():
    store.write_metadata(feature_from_project_a, metadata_a)
    store.write_metadata(feature_from_project_b, metadata_b)

Yields:

  • None ( None ) –

    The context manager temporarily disables project validation

Source code in src/metaxy/metadata_store/base.py
@contextmanager
def allow_cross_project_writes(self) -> Iterator[None]:
    """Context manager to temporarily allow cross-project writes.

    This is an escape hatch for legitimate cross-project operations like migrations,
    where metadata needs to be written to features from different projects.

    Example:
        ```py
        # During migration, allow writing to features from different projects
        with store.allow_cross_project_writes():
            store.write_metadata(feature_from_project_a, metadata_a)
            store.write_metadata(feature_from_project_b, metadata_b)
        ```

    Yields:
        None: The context manager temporarily disables project validation
    """
    previous_value = self._allow_cross_project_writes
    try:
        self._allow_cross_project_writes = True
        yield
    finally:
        self._allow_cross_project_writes = previous_value

metaxy.MetadataStore.write_metadata_to_store abstractmethod

write_metadata_to_store(feature_key: FeatureKey, df: Frame, **kwargs: Any) -> None

Internal write implementation (backend-specific).

Backends may convert to their specific type if needed (e.g., Polars, Ibis).

Parameters:

  • feature_key (FeatureKey) –

    Feature key to write to

  • df (Frame) –

    Narwhals-compatible DataFrame with metadata to write

  • **kwargs (Any, default: {} ) –

    Backend-specific parameters

Note: Subclasses implement this for their storage backend.

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def write_metadata_to_store(
    self,
    feature_key: FeatureKey,
    df: Frame,
    **kwargs: Any,
) -> None:
    """
    Internal write implementation (backend-specific).

    Backends may convert to their specific type if needed (e.g., Polars, Ibis).

    Args:
        feature_key: Feature key to write to
        df: [Narwhals](https://narwhals-dev.github.io/narwhals/)-compatible DataFrame with metadata to write
        **kwargs: Backend-specific parameters

    Note: Subclasses implement this for their storage backend.
    """
    pass

metaxy.MetadataStore.drop_feature_metadata

drop_feature_metadata(feature: CoercibleToFeatureKey) -> None

Drop all metadata for a feature.

This removes all stored metadata for the specified feature from the store. Useful for cleanup in tests or when re-computing feature metadata from scratch.

Warning

This operation is irreversible and will permanently delete all metadata for the specified feature.

Parameters:

Example
store.drop_feature_metadata(MyFeature)
assert not store.has_feature(MyFeature)
Source code in src/metaxy/metadata_store/base.py
def drop_feature_metadata(self, feature: CoercibleToFeatureKey) -> None:
    """Drop all metadata for a feature.

    This removes all stored metadata for the specified feature from the store.
    Useful for cleanup in tests or when re-computing feature metadata from scratch.

    Warning:
        This operation is irreversible and will **permanently delete all metadata** for the specified feature.

    Args:
        feature: Feature class or key to drop metadata for

    Example:
        ```py
        store.drop_feature_metadata(MyFeature)
        assert not store.has_feature(MyFeature)
        ```
    """
    self._check_open()
    feature_key = self._resolve_feature_key(feature)
    self._drop_feature_metadata_impl(feature_key)

metaxy.MetadataStore.read_metadata_in_store abstractmethod

read_metadata_in_store(feature: CoercibleToFeatureKey, *, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, **kwargs: Any) -> LazyFrame[Any] | None

Read metadata from THIS store only without using any fallbacks stores.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to read metadata for

  • filters (Sequence[Expr] | None, default: None ) –

    List of Narwhals filter expressions for this specific feature.

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to return

  • **kwargs (Any, default: {} ) –

    Backend-specific parameters

Returns:

  • LazyFrame[Any] | None

    Narwhals LazyFrame with metadata, or None if feature not found in the store

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def read_metadata_in_store(
    self,
    feature: CoercibleToFeatureKey,
    *,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    **kwargs: Any,
) -> nw.LazyFrame[Any] | None:
    """
    Read metadata from THIS store only without using any fallbacks stores.

    Args:
        feature: Feature to read metadata for
        filters: List of Narwhals filter expressions for this specific feature.
        columns: Subset of columns to return
        **kwargs: Backend-specific parameters

    Returns:
        Narwhals LazyFrame with metadata, or None if feature not found in the store
    """
    pass

metaxy.MetadataStore.has_feature

has_feature(feature: CoercibleToFeatureKey, *, check_fallback: bool = False) -> bool

Check if feature exists in store.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to check

  • check_fallback (bool, default: False ) –

    If True, also check fallback stores

Returns:

  • bool

    True if feature exists, False otherwise

Source code in src/metaxy/metadata_store/base.py
def has_feature(
    self,
    feature: CoercibleToFeatureKey,
    *,
    check_fallback: bool = False,
) -> bool:
    """
    Check if feature exists in store.

    Args:
        feature: Feature to check
        check_fallback: If True, also check fallback stores

    Returns:
        True if feature exists, False otherwise
    """
    self._check_open()

    if self.read_metadata_in_store(feature) is not None:
        return True

    # Check fallback stores
    if not check_fallback:
        return self._has_feature_impl(feature)
    else:
        for store in self.fallback_stores:
            if store.has_feature(feature, check_fallback=True):
                return True

    return False

metaxy.MetadataStore.display abstractmethod

display() -> str

Return a human-readable display string for this store.

Used in warnings, logs, and CLI output to identify the store.

Returns:

  • str

    Display string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def display(self) -> str:
    """Return a human-readable display string for this store.

    Used in warnings, logs, and CLI output to identify the store.

    Returns:
        Display string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")
    """
    pass

metaxy.MetadataStore.find_store_for_feature

find_store_for_feature(feature_key: CoercibleToFeatureKey, *, check_fallback: bool = True) -> MetadataStore | None

Find the store that contains the given feature.

Parameters:

  • feature_key (CoercibleToFeatureKey) –

    Feature to find

  • check_fallback (bool, default: True ) –

    Whether to check fallback stores when the feature is not found in the current store

Returns:

  • MetadataStore | None

    The store containing the feature, or None if not found

Source code in src/metaxy/metadata_store/base.py
def find_store_for_feature(
    self,
    feature_key: CoercibleToFeatureKey,
    *,
    check_fallback: bool = True,
) -> MetadataStore | None:
    """Find the store that contains the given feature.

    Args:
        feature_key: Feature to find
        check_fallback: Whether to check fallback stores when the feature
            is not found in the current store

    Returns:
        The store containing the feature, or None if not found
    """
    self._check_open()

    # Check if feature exists in this store
    if self.has_feature(feature_key):
        return self

    # Try fallback stores if enabled (opened on demand)
    if check_fallback:
        for store in self.fallback_stores:
            with store:
                found = store.find_store_for_feature(
                    feature_key, check_fallback=True
                )
                if found is not None:
                    return found

    return None

metaxy.MetadataStore.get_store_metadata

get_store_metadata(feature_key: CoercibleToFeatureKey, *, check_fallback: bool = True) -> dict[str, Any]

Arbitrary key-value pairs with useful metadata for logging purposes (like a path in storage).

This method should not expose sensitive information.

Parameters:

  • feature_key (CoercibleToFeatureKey) –

    Feature to get metadata for

  • check_fallback (bool, default: True ) –

    Whether to check fallback stores when the feature is not found in the current store

Returns:

  • dict[str, Any]

    Dictionary with store-specific metadata (e.g., "display", "table_name", "uri")

Source code in src/metaxy/metadata_store/base.py
def get_store_metadata(
    self,
    feature_key: CoercibleToFeatureKey,
    *,
    check_fallback: bool = True,
) -> dict[str, Any]:
    """Arbitrary key-value pairs with useful metadata for logging purposes (like a path in storage).

    This method should not expose sensitive information.

    Args:
        feature_key: Feature to get metadata for
        check_fallback: Whether to check fallback stores when the feature
            is not found in the current store

    Returns:
        Dictionary with store-specific metadata (e.g., `"display"`, `"table_name"`, `"uri"`)
    """
    store = self.find_store_for_feature(feature_key, check_fallback=check_fallback)
    if store is None:
        return {}
    return {
        "display": store.display(),
        **store._get_store_metadata_impl(feature_key),
    }

metaxy.MetadataStore.calculate_input_progress

calculate_input_progress(lazy_increment: LazyIncrement, feature_key: CoercibleToFeatureKey) -> float | None

Calculate progress percentage from lazy increment.

Uses the input field from LazyIncrement to count total input units and compares with added to determine how many are missing.

Progress represents the percentage of input units that have been processed at least once. Stale samples (in changed) are counted as processed since they have existing metadata, even though they may need re-processing due to upstream changes.

Parameters:

  • lazy_increment (LazyIncrement) –

    The lazy increment containing input and added dataframes.

  • feature_key (CoercibleToFeatureKey) –

    The feature key to look up lineage information.

Returns:

  • float | None

    Progress percentage (0-100), or None if input is not available.

Source code in src/metaxy/metadata_store/base.py
def calculate_input_progress(
    self,
    lazy_increment: LazyIncrement,
    feature_key: CoercibleToFeatureKey,
) -> float | None:
    """Calculate progress percentage from lazy increment.

    Uses the `input` field from LazyIncrement to count total input units
    and compares with `added` to determine how many are missing.

    Progress represents the percentage of input units that have been processed
    at least once. Stale samples (in `changed`) are counted as processed since
    they have existing metadata, even though they may need re-processing due to
    upstream changes.

    Args:
        lazy_increment: The lazy increment containing input and added dataframes.
        feature_key: The feature key to look up lineage information.

    Returns:
        Progress percentage (0-100), or None if input is not available.
    """
    if lazy_increment.input is None:
        return None

    key = self._resolve_feature_key(feature_key)
    graph = current_graph()
    plan = graph.get_feature_plan(key)

    # Get the columns that define input units from the feature plan
    input_id_columns = plan.input_id_columns

    # Count distinct input units using two separate queries
    # We can't use concat because input and added may have different schemas
    # (e.g., nullable vs non-nullable columns)
    total_units: int = (
        lazy_increment.input.select(input_id_columns)
        .unique()
        .select(nw.len())
        .collect()
        .item()
    )

    if total_units == 0:
        return None  # No input available from upstream

    missing_units: int = (
        lazy_increment.added.select(input_id_columns)
        .unique()
        .select(nw.len())
        .collect()
        .item()
    )

    processed_units = total_units - missing_units
    return (processed_units / total_units) * 100

metaxy.MetadataStore.copy_metadata

copy_metadata(from_store: MetadataStore, features: list[CoercibleToFeatureKey] | None = None, *, filters: Mapping[str, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, current_only: bool = False, latest_only: bool = True) -> dict[str, int]

Copy metadata from another store with fine-grained filtering.

This is a reusable method that can be called programmatically or from CLI/migrations. Copies metadata for specified features, preserving the original snapshot_version.

Parameters:

  • from_store (MetadataStore) –

    Source metadata store to copy from (must be opened for reading)

  • features (list[CoercibleToFeatureKey] | None, default: None ) –

    List of features to copy. Can be: - None: copies all features from active graph - List of FeatureKey or Feature classes: copies specified features

  • filters (Mapping[str, Sequence[Expr]] | None, default: None ) –

    Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions. These filters are applied when reading from the source store. Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}

  • global_filters (Sequence[Expr] | None, default: None ) –

    Sequence of Narwhals filter expressions applied to all features. These filters are combined with any feature-specific filters from filters. Example: [nw.col("sample_uid").is_in(["s1", "s2"])]

  • current_only (bool, default: False ) –

    If True, only copy rows with the current feature_version (as defined in the loaded feature graph). Defaults to False to copy all versions.

  • latest_only (bool, default: True ) –

    If True (default), deduplicate samples within id_columns groups by keeping only the latest row per group (ordered by metaxy_created_at).

Returns:

  • dict[str, int]

    Dict with statistics: {"features_copied": int, "rows_copied": int}

Raises:

Examples:

# Copy all features
with source_store.open("read"), dest_store.open("write"):
    stats = dest_store.copy_metadata(from_store=source_store)
# Copy specific features
with source_store.open("read"), dest_store.open("write"):
    stats = dest_store.copy_metadata(
        from_store=source_store,
        features=[FeatureKey(["my_feature"])],
    )
# Copy with global filters applied to all features
with source_store.open("read"), dest_store.open("write"):
    stats = dest_store.copy_metadata(
        from_store=source_store,
        global_filters=[nw.col("sample_uid").is_in(["s1", "s2"])],
    )
# Copy specific features with per-feature filters
with source_store.open("read"), dest_store.open("write"):
    stats = dest_store.copy_metadata(
        from_store=source_store,
        features=[
            FeatureKey(["feature_a"]),
            FeatureKey(["feature_b"]),
        ],
        filters={
            "feature_a": [nw.col("field_a") > 10],
            "feature_b": [nw.col("field_b") < 30],
        },
    )
Source code in src/metaxy/metadata_store/base.py
def copy_metadata(
    self,
    from_store: MetadataStore,
    features: list[CoercibleToFeatureKey] | None = None,
    *,
    filters: Mapping[str, Sequence[nw.Expr]] | None = None,
    global_filters: Sequence[nw.Expr] | None = None,
    current_only: bool = False,
    latest_only: bool = True,
) -> dict[str, int]:
    """Copy metadata from another store with fine-grained filtering.

    This is a reusable method that can be called programmatically or from CLI/migrations.
    Copies metadata for specified features, preserving the original snapshot_version.

    Args:
        from_store: Source metadata store to copy from (must be opened for reading)
        features: List of features to copy. Can be:
            - None: copies all features from active graph
            - List of FeatureKey or Feature classes: copies specified features
        filters: Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions.
            These filters are applied when reading from the source store.
            Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
        global_filters: Sequence of Narwhals filter expressions applied to all features.
            These filters are combined with any feature-specific filters from `filters`.
            Example: [nw.col("sample_uid").is_in(["s1", "s2"])]
        current_only: If True, only copy rows with the current feature_version (as defined
            in the loaded feature graph). Defaults to False to copy all versions.
        latest_only: If True (default), deduplicate samples within `id_columns` groups
            by keeping only the latest row per group (ordered by `metaxy_created_at`).

    Returns:
        Dict with statistics: {"features_copied": int, "rows_copied": int}

    Raises:
        ValueError: If source or destination store is not open
        FeatureNotFoundError: If a specified feature doesn't exist in source store

    Examples:
        ```py
        # Copy all features
        with source_store.open("read"), dest_store.open("write"):
            stats = dest_store.copy_metadata(from_store=source_store)
        ```

        ```py
        # Copy specific features
        with source_store.open("read"), dest_store.open("write"):
            stats = dest_store.copy_metadata(
                from_store=source_store,
                features=[FeatureKey(["my_feature"])],
            )
        ```

        ```py
        # Copy with global filters applied to all features
        with source_store.open("read"), dest_store.open("write"):
            stats = dest_store.copy_metadata(
                from_store=source_store,
                global_filters=[nw.col("sample_uid").is_in(["s1", "s2"])],
            )
        ```

        ```py
        # Copy specific features with per-feature filters
        with source_store.open("read"), dest_store.open("write"):
            stats = dest_store.copy_metadata(
                from_store=source_store,
                features=[
                    FeatureKey(["feature_a"]),
                    FeatureKey(["feature_b"]),
                ],
                filters={
                    "feature_a": [nw.col("field_a") > 10],
                    "feature_b": [nw.col("field_b") < 30],
                },
            )
        ```
    """
    import logging

    logger = logging.getLogger(__name__)

    # Validate both stores are open
    if not self._is_open:
        raise ValueError(
            'Destination store must be opened with store.open("write") before use'
        )
    if not from_store._is_open:
        raise ValueError(
            'Source store must be opened with store.open("read") before use'
        )

    return self._copy_metadata_impl(
        from_store=from_store,
        features=features,
        filters=filters,
        global_filters=global_filters,
        current_only=current_only,
        latest_only=latest_only,
        logger=logger,
    )

metaxy.metadata_store.types.AccessMode module-attribute

AccessMode = Literal['read', 'write']

metaxy.metadata_store.base.VersioningEngineOptions module-attribute

VersioningEngineOptions = Literal['auto', 'native', 'polars']

Base Configuration Class

The following base configuration class is typically used by child metadata stores:

metaxy.metadata_store.base.MetadataStoreConfig

Bases: BaseSettings

Base configuration class for metadata stores.

This class defines common configuration fields shared by all metadata store types. Store-specific config classes should inherit from this and add their own fields.

Example
from metaxy.metadata_store.duckdb import DuckDBMetadataStoreConfig

config = DuckDBMetadataStoreConfig(
    database="metadata.db",
    hash_algorithm=HashAlgorithm.MD5,
)

store = DuckDBMetadataStore.from_config(config)

Configuration

The base MetadataStoreConfig class injects the following configuration options:

fallback_stores

List of fallback store names to search when features are not found in the current store.

Type: list[str]

[stores.dev.config]
# Optional
# fallback_stores = []
[tool.metaxy.stores.dev.config]
# Optional
# fallback_stores = []
export METAXY_STORES__DEV__CONFIG__FALLBACK_STORES=...

hash_algorithm

Hash algorithm for versioning. If None, uses store's default.

Type: metaxy.versioning.types.HashAlgorithm | None

[stores.dev.config]
# Optional
# hash_algorithm = null
[tool.metaxy.stores.dev.config]
# Optional
# hash_algorithm = null
export METAXY_STORES__DEV__CONFIG__HASH_ALGORITHM=...

versioning_engine

Which versioning engine to use: 'auto' (prefer native), 'native', or 'polars'.

Type: Literal['auto', 'native', 'polars'] | Default: "auto"

[stores.dev.config]
versioning_engine = "auto"
[tool.metaxy.stores.dev.config]
versioning_engine = "auto"
export METAXY_STORES__DEV__CONFIG__VERSIONING_ENGINE=auto

Project Write Validation

By default, MetadataStore raises a ValueError when attempting to write to a project that doesn't match the expected project from MetaxyConfig.get().project.

For legitimate cross-project operations (such as migrations that need to update features across multiple projects), use MetadataStore.allow_cross_project_writes:

with store.open("write"), store.allow_cross_project_writes():
    store.write_metadata(ExternallyDefinedFeature, df)