Skip to content

Metadata Stores

Metaxy abstracts interactions with metadata behind an interface called MetadaStore.

Users can extend this class to implement support for arbitrary metadata storage such as databases, lakehouse formats, or really any kind of external system. Metaxy has built-in support for the following metadata store types:

Databases

Storage Only


Metadata Store Interface

metaxy.MetadataStore

MetadataStore(*, versioning_engine_cls: type[VersioningEngineT], hash_algorithm: HashAlgorithm | None = None, versioning_engine: VersioningEngineOptions = 'auto', fallback_stores: list[MetadataStore] | None = None, auto_create_tables: bool | None = None, materialization_id: str | None = None)

Bases: ABC

Abstract base class for metadata storage backends.

Parameters:

  • hash_algorithm (HashAlgorithm | None, default: None ) –

    Hash algorithm to use for the versioning engine.

  • versioning_engine (VersioningEngineOptions, default: 'auto' ) –

    Which versioning engine to use.

    • "auto": Prefer the store's native engine, fall back to Polars if needed

    • "native": Always use the store's native engine, raise VersioningEngineMismatchError if provided dataframes are incompatible

    • "polars": Always use the Polars engine

  • fallback_stores (list[MetadataStore] | None, default: None ) –

    Ordered list of read-only fallback stores. Used when upstream features are not in this store. VersioningEngineMismatchError is not raised when reading from fallback stores.

  • auto_create_tables (bool | None, default: None ) –

    If True, automatically create tables when opening the store. If None (default), reads from global MetaxyConfig (which reads from METAXY_AUTO_CREATE_TABLES env var). If False, never auto-create tables.

    Warning

    Auto-create is intended for development/testing only. Use proper database migration tools like Alembic for production deployments.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID. If provided, all metadata writes will include this ID in the metaxy_materialization_id column. Can be overridden per MetadataStore.write_metadata call.

Raises:

  • ValueError

    If fallback stores use different hash algorithms or truncation lengths

  • VersioningEngineMismatchError

    If a user-provided dataframe has a wrong implementation and versioning_engine is set to native

Source code in src/metaxy/metadata_store/base.py
def __init__(
    self,
    *,
    versioning_engine_cls: type[VersioningEngineT],
    hash_algorithm: HashAlgorithm | None = None,
    versioning_engine: VersioningEngineOptions = "auto",
    fallback_stores: list[MetadataStore] | None = None,
    auto_create_tables: bool | None = None,
    materialization_id: str | None = None,
):
    """
    Initialize the metadata store.

    Args:
        hash_algorithm: Hash algorithm to use for the versioning engine.

        versioning_engine: Which versioning engine to use.

            - "auto": Prefer the store's native engine, fall back to Polars if needed

            - "native": Always use the store's native engine, raise `VersioningEngineMismatchError`
                if provided dataframes are incompatible

            - "polars": Always use the Polars engine

        fallback_stores: Ordered list of read-only fallback stores.
            Used when upstream features are not in this store.
            `VersioningEngineMismatchError` is not raised when reading from fallback stores.
        auto_create_tables: If True, automatically create tables when opening the store.
            If None (default), reads from global MetaxyConfig (which reads from METAXY_AUTO_CREATE_TABLES env var).
            If False, never auto-create tables.

            !!! warning
                Auto-create is intended for development/testing only.
                Use proper database migration tools like Alembic for production deployments.

        materialization_id: Optional external orchestration ID.
            If provided, all metadata writes will include this ID in the `metaxy_materialization_id` column.
            Can be overridden per [`MetadataStore.write_metadata`][metaxy.MetadataStore.write_metadata] call.

    Raises:
        ValueError: If fallback stores use different hash algorithms or truncation lengths
        VersioningEngineMismatchError: If a user-provided dataframe has a wrong implementation
            and versioning_engine is set to `native`
    """
    # Initialize state early so properties can check it
    self._is_open = False
    self._context_depth = 0
    self._versioning_engine = versioning_engine
    self._allow_cross_project_writes = False
    self._materialization_id = materialization_id
    self._open_cm: AbstractContextManager[Self] | None = (
        None  # Track the open() context manager
    )
    self.versioning_engine_cls = versioning_engine_cls

    # Resolve auto_create_tables from global config if not explicitly provided
    if auto_create_tables is None:
        from metaxy.config import MetaxyConfig

        self.auto_create_tables = MetaxyConfig.get().auto_create_tables
    else:
        self.auto_create_tables = auto_create_tables

    # Use store's default algorithm if not specified
    if hash_algorithm is None:
        hash_algorithm = self._get_default_hash_algorithm()

    self.hash_algorithm = hash_algorithm

    self.fallback_stores = fallback_stores or []

Attributes

metaxy.MetadataStore.materialization_id property

materialization_id: str | None

The external orchestration ID for this store instance.

If set, all metadata writes include this ID in the metaxy_materialization_id column, allowing filtering of rows written during a specific materialization run.

Functions

metaxy.MetadataStore.config_model abstractmethod classmethod

config_model() -> type[MetadataStoreConfig]

Return the configuration model class for this store type.

Subclasses must override this to return their specific config class.

Returns:

Note

Subclasses override this with a more specific return type. Type checkers may show a warning about incompatible override, but this is intentional - each store returns its own config type.

Source code in src/metaxy/metadata_store/base.py
@classmethod
@abstractmethod
def config_model(cls) -> type[MetadataStoreConfig]:
    """Return the configuration model class for this store type.

    Subclasses must override this to return their specific config class.

    Returns:
        The config class type (e.g., DuckDBMetadataStoreConfig)

    Note:
        Subclasses override this with a more specific return type.
        Type checkers may show a warning about incompatible override,
        but this is intentional - each store returns its own config type.
    """
    ...

metaxy.MetadataStore.from_config classmethod

from_config(config: MetadataStoreConfig, **kwargs: Any) -> Self

Create a store instance from a configuration object.

This method creates a store by: 1. Converting the config to a dict 2. Resolving fallback store names to actual store instances 3. Calling the store's init with the config parameters

Parameters:

  • config (MetadataStoreConfig) –

    Configuration object (should be the type returned by config_model())

  • **kwargs (Any, default: {} ) –

    Additional arguments passed directly to the store constructor (e.g., materialization_id for runtime parameters not in config)

Returns:

  • Self

    A new store instance configured according to the config object

Example
from metaxy.metadata_store.duckdb import (
    DuckDBMetadataStore,
    DuckDBMetadataStoreConfig,
)

config = DuckDBMetadataStoreConfig(
    database="metadata.db",
    fallback_stores=["prod"],
)

store = DuckDBMetadataStore.from_config(config)
Source code in src/metaxy/metadata_store/base.py
@classmethod
def from_config(cls, config: MetadataStoreConfig, **kwargs: Any) -> Self:
    """Create a store instance from a configuration object.

    This method creates a store by:
    1. Converting the config to a dict
    2. Resolving fallback store names to actual store instances
    3. Calling the store's __init__ with the config parameters

    Args:
        config: Configuration object (should be the type returned by config_model())
        **kwargs: Additional arguments passed directly to the store constructor
            (e.g., materialization_id for runtime parameters not in config)

    Returns:
        A new store instance configured according to the config object

    Example:
        ```python
        from metaxy.metadata_store.duckdb import (
            DuckDBMetadataStore,
            DuckDBMetadataStoreConfig,
        )

        config = DuckDBMetadataStoreConfig(
            database="metadata.db",
            fallback_stores=["prod"],
        )

        store = DuckDBMetadataStore.from_config(config)
        ```
    """
    # Convert config to dict, excluding unset values
    config_dict = config.model_dump(exclude_unset=True)

    # Pop and resolve fallback store names to actual store instances
    fallback_store_names = config_dict.pop("fallback_stores", [])
    fallback_stores = [
        MetaxyConfig.get().get_store(name) for name in fallback_store_names
    ]

    # Create store with resolved fallback stores, config, and extra kwargs
    return cls(fallback_stores=fallback_stores, **config_dict, **kwargs)

metaxy.MetadataStore.resolve_update

resolve_update(feature: type[BaseFeature], *, samples: IntoFrame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: Literal[False] = False, versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> Increment
resolve_update(feature: type[BaseFeature], *, samples: IntoFrame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: Literal[True], versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> LazyIncrement
resolve_update(feature: type[BaseFeature], *, samples: IntoFrame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: bool = False, versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> Increment | LazyIncrement

Calculate an incremental update for a feature.

This is the main workhorse in Metaxy.

Parameters:

  • feature (type[BaseFeature]) –

    Feature class to resolve updates for

  • samples (IntoFrame | None, default: None ) –

    A dataframe with joined upstream metadata and "metaxy_provenance_by_field" column set. When provided, MetadataStore skips loading upstream feature metadata and provenance calculations.

    Required for root features

    Metaxy doesn't know how to populate input metadata for root features, so samples argument for must be provided for them.

    Tip

    For non-root features, use samples to customize the automatic upstream loading and field provenance calculation. For example, it can be used to requires processing for specific sample IDs.

    Setting this parameter during normal operations is not required.

  • filters (Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None, default: None ) –

    A mapping from feature keys to lists of Narwhals filter expressions. Keys can be feature classes, FeatureKey objects, or string paths. Applied at read-time. May filter the current feature, in this case it will also be applied to samples (if provided). Example: {UpstreamFeature: [nw.col("x") > 10], ...}

  • global_filters (Sequence[Expr] | None, default: None ) –

    A list of Narwhals filter expressions applied to all features. These filters are combined with any feature-specific filters from filters. Useful for filtering by common columns like sample_uid across all features. Example: [nw.col("sample_uid").is_in(["s1", "s2"])]

  • lazy (bool, default: False ) –
  • versioning_engine (Literal['auto', 'native', 'polars'] | None, default: None ) –

    Override the store's versioning engine for this operation.

  • skip_comparison (bool, default: False ) –

    If True, skip the increment comparison logic and return all upstream samples in Increment.added. The changed and removed frames will be empty.

Raises:

  • ValueError

    If no samples dataframe has been provided when resolving an update for a root feature.

  • VersioningEngineMismatchError

    If versioning_engine has been set to "native" and a dataframe of a different implementation has been encountered during resolve_update.

With a root feature

samples = pl.DataFrame({
    "sample_uid": [1, 2, 3],
    "metaxy_provenance_by_field": [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
})
result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
Source code in src/metaxy/metadata_store/base.py
def resolve_update(
    self,
    feature: type[BaseFeature],
    *,
    samples: IntoFrame | None = None,
    filters: Mapping[CoercibleToFeatureKey, Sequence[nw.Expr]] | None = None,
    global_filters: Sequence[nw.Expr] | None = None,
    lazy: bool = False,
    versioning_engine: Literal["auto", "native", "polars"] | None = None,
    skip_comparison: bool = False,
    **kwargs: Any,
) -> Increment | LazyIncrement:
    """Calculate an incremental update for a feature.

    This is the main workhorse in Metaxy.

    Args:
        feature: Feature class to resolve updates for
        samples: A dataframe with joined upstream metadata and `"metaxy_provenance_by_field"` column set.
            When provided, `MetadataStore` skips loading upstream feature metadata and provenance calculations.

            !!! info "Required for root features"
                Metaxy doesn't know how to populate input metadata for root features,
                so `samples` argument for **must** be provided for them.

            !!! tip
                For non-root features, use `samples` to customize the automatic upstream loading and field provenance calculation.
                For example, it can be used to requires processing for specific sample IDs.

            Setting this parameter during normal operations is not required.

        filters: A mapping from feature keys to lists of Narwhals filter expressions.
            Keys can be feature classes, FeatureKey objects, or string paths.
            Applied at read-time. May filter the current feature,
            in this case it will also be applied to `samples` (if provided).
            Example: `{UpstreamFeature: [nw.col("x") > 10], ...}`
        global_filters: A list of Narwhals filter expressions applied to all features.
            These filters are combined with any feature-specific filters from `filters`.
            Useful for filtering by common columns like `sample_uid` across all features.
            Example: `[nw.col("sample_uid").is_in(["s1", "s2"])]`
        lazy: Whether to return a [metaxy.versioning.types.LazyIncrement][] or a [metaxy.versioning.types.Increment][].
        versioning_engine: Override the store's versioning engine for this operation.
        skip_comparison: If True, skip the increment comparison logic and return all
            upstream samples in `Increment.added`. The `changed` and `removed` frames will
            be empty.

    Raises:
        ValueError: If no `samples` dataframe has been provided when resolving an update for a root feature.
        VersioningEngineMismatchError: If `versioning_engine` has been set to `"native"`
            and a dataframe of a different implementation has been encountered during `resolve_update`.

    !!! example "With a root feature"

        ```py
        samples = pl.DataFrame({
            "sample_uid": [1, 2, 3],
            "metaxy_provenance_by_field": [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
        })
        result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
        ```
    """
    import narwhals as nw

    # Convert samples to Narwhals frame if not already
    samples_nw: nw.DataFrame[Any] | nw.LazyFrame[Any] | None = None
    if samples is not None:
        if isinstance(samples, (nw.DataFrame, nw.LazyFrame)):
            samples_nw = samples
        else:
            samples_nw = nw.from_native(samples)

    # Normalize filter keys to FeatureKey
    normalized_filters: dict[FeatureKey, list[nw.Expr]] = {}
    if filters:
        for key, exprs in filters.items():
            feature_key = self._resolve_feature_key(key)
            normalized_filters[feature_key] = list(exprs)

    # Convert global_filters to a list for easy concatenation
    global_filter_list = list(global_filters) if global_filters else []

    graph = current_graph()
    plan = graph.get_feature_plan(feature.spec().key)

    # Root features without samples: error (samples required)
    if not plan.deps and samples_nw is None:
        raise ValueError(
            f"Feature {feature.spec().key} has no upstream dependencies (root feature). "
            f"Must provide 'samples' parameter with sample_uid and {METAXY_PROVENANCE_BY_FIELD} columns. "
            f"Root features require manual {METAXY_PROVENANCE_BY_FIELD} computation."
        )

    # Combine feature-specific filters with global filters
    current_feature_filters = [
        *normalized_filters.get(feature.spec().key, []),
        *global_filter_list,
    ]

    current_metadata = self.read_metadata_in_store(
        feature,
        filters=[
            nw.col(METAXY_FEATURE_VERSION)
            == graph.get_feature_version(feature.spec().key),
            *current_feature_filters,
        ],
    )

    upstream_by_key: dict[FeatureKey, nw.LazyFrame[Any]] = {}
    filters_by_key: dict[FeatureKey, list[nw.Expr]] = {}

    # if samples are provided, use them as source of truth for upstream data
    if samples_nw is not None:
        # Apply filters to samples if any
        filtered_samples = samples_nw
        if current_feature_filters:
            filtered_samples = samples_nw.filter(current_feature_filters)

        # fill in METAXY_PROVENANCE column if it's missing (e.g. for root features)
        samples_nw = self.hash_struct_version_column(
            plan,
            df=filtered_samples,
            struct_column=METAXY_PROVENANCE_BY_FIELD,
            hash_column=METAXY_PROVENANCE,
        )

        # For root features, add data_version columns if they don't exist
        # (root features have no computation, so data_version equals provenance)
        if METAXY_DATA_VERSION_BY_FIELD not in samples_nw.columns:
            samples_nw = samples_nw.with_columns(
                nw.col(METAXY_PROVENANCE_BY_FIELD).alias(
                    METAXY_DATA_VERSION_BY_FIELD
                ),
                nw.col(METAXY_PROVENANCE).alias(METAXY_DATA_VERSION),
            )
    else:
        for upstream_spec in plan.deps or []:
            # Combine feature-specific filters with global filters for upstream
            upstream_filters = [
                *normalized_filters.get(upstream_spec.key, []),
                *global_filter_list,
            ]
            upstream_feature_metadata = self.read_metadata(
                upstream_spec.key,
                filters=upstream_filters,
            )
            if upstream_feature_metadata is not None:
                upstream_by_key[upstream_spec.key] = upstream_feature_metadata

    # determine which implementation to use for resolving the increment
    # consider (1) whether all upstream metadata has been loaded with the native implementation
    # (2) if samples have native implementation

    # Use parameter if provided, otherwise use store default
    engine_mode = (
        versioning_engine
        if versioning_engine is not None
        else self._versioning_engine
    )

    # If "polars" mode, force Polars immediately
    if engine_mode == "polars":
        implementation = nw.Implementation.POLARS
        switched_to_polars = True
    else:
        implementation = self.native_implementation()
        switched_to_polars = False

        for upstream_key, df in upstream_by_key.items():
            if df.implementation != implementation:
                switched_to_polars = True
                # Only raise error in "native" mode if no fallback stores configured.
                # If fallback stores exist, the implementation mismatch indicates data came
                # from fallback (different implementation), which is legitimate fallback access.
                # If data were local, it would have the native implementation.
                if engine_mode == "native" and not self.fallback_stores:
                    raise VersioningEngineMismatchError(
                        f"versioning_engine='native' but upstream feature `{upstream_key.to_string()}` "
                        f"has implementation {df.implementation}, expected {self.native_implementation()}"
                    )
                elif engine_mode == "auto" or (
                    engine_mode == "native" and self.fallback_stores
                ):
                    PolarsMaterializationWarning.warn_on_implementation_mismatch(
                        expected=self.native_implementation(),
                        actual=df.implementation,
                        message=f"Using Polars for resolving the increment instead. This was caused by upstream feature `{upstream_key.to_string()}`.",
                    )
                implementation = nw.Implementation.POLARS
                break

        if (
            samples_nw is not None
            and samples_nw.implementation != self.native_implementation()
        ):
            if not switched_to_polars:
                if engine_mode == "native":
                    # Always raise error for samples with wrong implementation, regardless
                    # of fallback stores, because samples come from user argument, not from fallback
                    raise VersioningEngineMismatchError(
                        f"versioning_engine='native' but provided `samples` have implementation {samples_nw.implementation}, "
                        f"expected {self.native_implementation()}"
                    )
                elif engine_mode == "auto":
                    PolarsMaterializationWarning.warn_on_implementation_mismatch(
                        expected=self.native_implementation(),
                        actual=samples_nw.implementation,
                        message=f"Provided `samples` have implementation {samples_nw.implementation}. Using Polars for resolving the increment instead.",
                    )
            implementation = nw.Implementation.POLARS
            switched_to_polars = True

    if switched_to_polars:
        if current_metadata:
            current_metadata = switch_implementation_to_polars(current_metadata)
        if samples_nw:
            samples_nw = switch_implementation_to_polars(samples_nw)
        for upstream_key, df in upstream_by_key.items():
            upstream_by_key[upstream_key] = switch_implementation_to_polars(df)

    with self.create_versioning_engine(
        plan=plan, implementation=implementation
    ) as engine:
        if skip_comparison:
            # Skip comparison: return all upstream samples as added
            if samples_nw is not None:
                # Root features or user-provided samples: use samples directly
                # Note: samples already has metaxy_provenance computed
                added = samples_nw.lazy()
            else:
                # Non-root features: load all upstream with provenance
                added = engine.load_upstream_with_provenance(
                    upstream=upstream_by_key,
                    hash_algo=self.hash_algorithm,
                    filters=filters_by_key,
                )
            changed = None
            removed = None
        else:
            added, changed, removed = engine.resolve_increment_with_provenance(
                current=current_metadata,
                upstream=upstream_by_key,
                hash_algorithm=self.hash_algorithm,
                filters=filters_by_key,
                sample=samples_nw.lazy() if samples_nw is not None else None,
            )

    # Convert None to empty DataFrames
    if changed is None:
        changed = empty_frame_like(added)
    if removed is None:
        removed = empty_frame_like(added)

    if lazy:
        return LazyIncrement(
            added=added
            if isinstance(added, nw.LazyFrame)
            else nw.from_native(added),
            changed=changed
            if isinstance(changed, nw.LazyFrame)
            else nw.from_native(changed),
            removed=removed
            if isinstance(removed, nw.LazyFrame)
            else nw.from_native(removed),
        )
    else:
        return Increment(
            added=added.collect() if isinstance(added, nw.LazyFrame) else added,
            changed=changed.collect()
            if isinstance(changed, nw.LazyFrame)
            else changed,
            removed=removed.collect()
            if isinstance(removed, nw.LazyFrame)
            else removed,
        )

metaxy.MetadataStore.read_metadata

read_metadata(feature: CoercibleToFeatureKey, *, feature_version: str | None = None, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, allow_fallback: bool = True, current_only: bool = True, latest_only: bool = True) -> LazyFrame[Any]

Read metadata with optional fallback to upstream stores.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to read metadata for

  • feature_version (str | None, default: None ) –

    Explicit feature_version to filter by (mutually exclusive with current_only=True)

  • filters (Sequence[Expr] | None, default: None ) –

    Sequence of Narwhals filter expressions to apply to this feature. Example: [nw.col("x") > 10, nw.col("y") < 5]

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to include. Metaxy's system columns are always included.

  • allow_fallback (bool, default: True ) –

    If True, check fallback stores on local miss

  • current_only (bool, default: True ) –

    If True, only return rows with current feature_version

  • latest_only (bool, default: True ) –

    Whether to deduplicate samples within id_columns groups ordered by metaxy_created_at.

Returns:

Raises:

Info

When this method is called with default arguments, it will return the latest (by metaxy_created_at) metadata for the current feature version. Therefore, it's perfectly suitable for most use cases.

Warning

The order of rows is not guaranteed.

Source code in src/metaxy/metadata_store/base.py
def read_metadata(
    self,
    feature: CoercibleToFeatureKey,
    *,
    feature_version: str | None = None,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    current_only: bool = True,
    latest_only: bool = True,
) -> nw.LazyFrame[Any]:
    """
    Read metadata with optional fallback to upstream stores.

    Args:
        feature: Feature to read metadata for
        feature_version: Explicit feature_version to filter by (mutually exclusive with current_only=True)
        filters: Sequence of Narwhals filter expressions to apply to this feature.
            Example: `[nw.col("x") > 10, nw.col("y") < 5]`
        columns: Subset of columns to include. Metaxy's system columns are always included.
        allow_fallback: If `True`, check fallback stores on local miss
        current_only: If `True`, only return rows with current feature_version
        latest_only: Whether to deduplicate samples within `id_columns` groups ordered by `metaxy_created_at`.

    Returns:
        Narwhals LazyFrame with metadata

    Raises:
        FeatureNotFoundError: If feature not found in any store
        SystemDataNotFoundError: When attempting to read non-existent Metaxy system data
        ValueError: If both feature_version and current_only=True are provided

    !!! info
        When this method is called with default arguments, it will return the latest (by `metaxy_created_at`)
        metadata for the current feature version. Therefore, it's perfectly suitable for most use cases.

    !!! warning
        The order of rows is not guaranteed.
    """
    filters = filters or []
    columns = columns or []

    feature_key = self._resolve_feature_key(feature)
    is_system_table = self._is_system_table(feature_key)

    # Validate mutually exclusive parameters
    if feature_version is not None and current_only:
        raise ValueError(
            "Cannot specify both feature_version and current_only=True. "
            "Use current_only=False with feature_version parameter."
        )

    # Add feature_version filter only when needed
    if current_only or feature_version is not None and not is_system_table:
        version_filter = nw.col(METAXY_FEATURE_VERSION) == (
            current_graph().get_feature_version(feature_key)
            if current_only
            else feature_version
        )
        filters = [version_filter, *filters]

    if columns and not is_system_table:
        # Add only system columns that aren't already in the user's columns list
        columns_set = set(columns)
        missing_system_cols = [
            c for c in ALL_SYSTEM_COLUMNS if c not in columns_set
        ]
        read_columns = [*columns, *missing_system_cols]
    else:
        read_columns = None

    lazy_frame = None
    try:
        lazy_frame = self.read_metadata_in_store(
            feature, filters=filters, columns=read_columns
        )
    except FeatureNotFoundError as e:
        # do not read system features from fallback stores
        if is_system_table:
            raise SystemDataNotFoundError(
                f"System Metaxy data with key {feature_key} is missing in {self.display()}. Invoke `metaxy graph push` before attempting to read system data."
            ) from e

    # Handle case where read_metadata_in_store returns None (no exception raised)
    if lazy_frame is None and is_system_table:
        raise SystemDataNotFoundError(
            f"System Metaxy data with key {feature_key} is missing in {self.display()}. Invoke `metaxy graph push` before attempting to read system data."
        )

    if lazy_frame is not None and not is_system_table and latest_only:
        from metaxy.models.constants import METAXY_CREATED_AT

        # Apply deduplication
        lazy_frame = self.versioning_engine_cls.keep_latest_by_group(
            df=lazy_frame,
            group_columns=list(
                self._resolve_feature_plan(feature_key).feature.id_columns
            ),
            timestamp_column=METAXY_CREATED_AT,
        )

    if lazy_frame is not None:
        # After dedup, filter to requested columns if specified
        if columns:
            lazy_frame = lazy_frame.select(columns)

        return lazy_frame

    # Try fallback stores
    if allow_fallback:
        for store in self.fallback_stores:
            try:
                # Use full read_metadata to handle nested fallback chains
                return store.read_metadata(
                    feature,
                    feature_version=feature_version,
                    filters=filters,
                    columns=columns,
                    allow_fallback=True,
                    current_only=current_only,
                    latest_only=latest_only,
                )
            except FeatureNotFoundError:
                # Try next fallback store
                continue

    # Not found anywhere
    raise FeatureNotFoundError(
        f"Feature {feature_key.to_string()} not found in store"
        + (" or fallback stores" if allow_fallback else "")
    )

metaxy.MetadataStore.write_metadata

write_metadata(feature: CoercibleToFeatureKey, df: IntoFrame, materialization_id: str | None = None) -> None

Write metadata for a feature (append-only by design).

Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to write metadata for

  • df (IntoFrame) –

    Metadata DataFrame of any type supported by Narwhals. Must have metaxy_provenance_by_field column of type Struct with fields matching feature's fields. Optionally, may also contain metaxy_data_version_by_field.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID for this write. Overrides the store's default materialization_id if provided. Useful for tracking which orchestration run produced this metadata.

Raises:

Note
  • Must be called within a MetadataStore.open(mode="write") context manager.

  • Metaxy always performs an "append" operation. Metadata is never deleted or mutated.

  • Fallback stores are never used for writes.

  • Features from other Metaxy projects cannot be written to, unless project validation has been disabled with MetadataStore.allow_cross_project_writes.

Source code in src/metaxy/metadata_store/base.py
def write_metadata(
    self,
    feature: CoercibleToFeatureKey,
    df: IntoFrame,
    materialization_id: str | None = None,
) -> None:
    """
    Write metadata for a feature (append-only by design).

    Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.

    Args:
        feature: Feature to write metadata for
        df: Metadata DataFrame of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).
            Must have `metaxy_provenance_by_field` column of type Struct with fields matching feature's fields.
            Optionally, may also contain `metaxy_data_version_by_field`.
        materialization_id: Optional external orchestration ID for this write.
            Overrides the store's default `materialization_id` if provided.
            Useful for tracking which orchestration run produced this metadata.

    Raises:
        MetadataSchemaError: If DataFrame schema is invalid
        StoreNotOpenError: If store is not open
        ValueError: If writing to a feature from a different project than expected

    Note:
        - Must be called within a `MetadataStore.open(mode="write")` context manager.

        - Metaxy always performs an "append" operation. Metadata is never deleted or mutated.

        - Fallback stores are never used for writes.

        - Features from other Metaxy projects cannot be written to, unless project validation has been disabled with [MetadataStore.allow_cross_project_writes][].

    """
    self._check_open()

    feature_key = self._resolve_feature_key(feature)
    is_system_table = self._is_system_table(feature_key)

    # Validate project for non-system tables
    if not is_system_table:
        self._validate_project_write(feature)

    # Convert Polars to Narwhals to Polars if needed
    # if isinstance(df_nw, (pl.DataFrame, pl.LazyFrame)):
    df_nw = nw.from_native(df)

    assert isinstance(df_nw, nw.DataFrame), "df must be a Narwhal DataFrame"

    # For system tables, write directly without feature_version tracking
    if is_system_table:
        self._validate_schema_system_table(df_nw)
        self.write_metadata_to_store(feature_key, df_nw)
        return

    if METAXY_PROVENANCE_BY_FIELD not in df_nw.columns:
        from metaxy.metadata_store.exceptions import MetadataSchemaError

        raise MetadataSchemaError(
            f"DataFrame must have '{METAXY_PROVENANCE_BY_FIELD}' column"
        )

    # Add all required system columns
    # warning: for dataframes that do not match the native MetadataStore implementation
    # and are missing the METAXY_DATA_VERSION column, this call will lead to materializing the equivalent Polars DataFrame
    # while calculating the missing METAXY_DATA_VERSION column
    df_nw = self._add_system_columns(
        df_nw, feature, materialization_id=materialization_id
    )

    self._validate_schema(df_nw)
    self.write_metadata_to_store(feature_key, df_nw)

metaxy.MetadataStore.write_metadata_multi

write_metadata_multi(metadata: Mapping[Any, IntoFrame], materialization_id: str | None = None) -> None

Write metadata for multiple features in reverse topological order.

Processes features so that dependents are written before their dependencies. This ordering ensures that downstream features are written first, which can be useful for certain data consistency requirements or when features need to be processed in a specific order.

Parameters:

  • metadata (Mapping[Any, IntoFrame]) –

    Mapping from feature keys to metadata DataFrames. Keys can be any type coercible to FeatureKey (string, sequence, FeatureKey, or BaseFeature class). Values must be DataFrames compatible with Narwhals, containing required system columns.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID for all writes. Overrides the store's default materialization_id if provided. Applied to all feature writes in this batch.

Raises:

Note
  • Must be called within a MetadataStore.open(mode="write") context manager.
  • Empty mappings are handled gracefully (no-op).
  • Each feature's metadata is written via write_metadata, so all validation and system column handling from that method applies.
Example
with store.open(mode="write"):
    store.write_metadata_multi({
        ChildFeature: child_df,
        ParentFeature: parent_df,
    })
# Features are written in reverse topological order:
# ChildFeature first, then ParentFeature
Source code in src/metaxy/metadata_store/base.py
def write_metadata_multi(
    self,
    metadata: Mapping[Any, IntoFrame],
    materialization_id: str | None = None,
) -> None:
    """
    Write metadata for multiple features in reverse topological order.

    Processes features so that dependents are written before their dependencies.
    This ordering ensures that downstream features are written first, which can
    be useful for certain data consistency requirements or when features need
    to be processed in a specific order.

    Args:
        metadata: Mapping from feature keys to metadata DataFrames.
            Keys can be any type coercible to FeatureKey (string, sequence,
            FeatureKey, or BaseFeature class). Values must be DataFrames
            compatible with Narwhals, containing required system columns.
        materialization_id: Optional external orchestration ID for all writes.
            Overrides the store's default `materialization_id` if provided.
            Applied to all feature writes in this batch.

    Raises:
        MetadataSchemaError: If any DataFrame schema is invalid
        StoreNotOpenError: If store is not open
        ValueError: If writing to a feature from a different project than expected

    Note:
        - Must be called within a `MetadataStore.open(mode="write")` context manager.
        - Empty mappings are handled gracefully (no-op).
        - Each feature's metadata is written via `write_metadata`, so all
          validation and system column handling from that method applies.

    Example:
        ```py
        with store.open(mode="write"):
            store.write_metadata_multi({
                ChildFeature: child_df,
                ParentFeature: parent_df,
            })
        # Features are written in reverse topological order:
        # ChildFeature first, then ParentFeature
        ```
    """
    if not metadata:
        return

    # Build mapping from resolved keys to dataframes in one pass
    resolved_metadata = {
        self._resolve_feature_key(key): df for key, df in metadata.items()
    }

    # Get reverse topological order (dependents first)
    graph = current_graph()
    sorted_keys = graph.topological_sort_features(
        list(resolved_metadata.keys()), descending=True
    )

    # Write metadata in reverse topological order
    for feature_key in sorted_keys:
        self.write_metadata(
            feature_key,
            resolved_metadata[feature_key],
            materialization_id=materialization_id,
        )

metaxy.MetadataStore.native_implementation

native_implementation() -> Implementation

Get the native Narwhals implementation for this store's backend.

Source code in src/metaxy/metadata_store/base.py
def native_implementation(self) -> nw.Implementation:
    """Get the native Narwhals implementation for this store's backend."""
    return self.versioning_engine_cls.implementation()

metaxy.MetadataStore.create_versioning_engine

create_versioning_engine(plan: FeaturePlan, implementation: Implementation) -> Iterator[VersioningEngine | PolarsVersioningEngine]

Creates an appropriate provenance engine.

Falls back to Polars implementation if the required implementation differs from the store's native implementation.

Parameters:

  • plan (FeaturePlan) –

    The feature plan.

  • implementation (Implementation) –

    The desired engine implementation.

Returns:

  • Iterator[VersioningEngine | PolarsVersioningEngine]

    An appropriate provenance engine.

Source code in src/metaxy/metadata_store/base.py
@contextmanager
def create_versioning_engine(
    self, plan: FeaturePlan, implementation: nw.Implementation
) -> Iterator[VersioningEngine | PolarsVersioningEngine]:
    """
    Creates an appropriate provenance engine.

    Falls back to Polars implementation if the required implementation differs from the store's native implementation.

    Args:
        plan: The feature plan.
        implementation: The desired engine implementation.

    Returns:
        An appropriate provenance engine.
    """

    if implementation == nw.Implementation.POLARS:
        cm = self._create_polars_versioning_engine(plan)
    elif implementation == self.native_implementation():
        cm = self._create_versioning_engine(plan)
    else:
        cm = self._create_polars_versioning_engine(plan)

    with cm as engine:
        yield engine

metaxy.MetadataStore.open abstractmethod

open(mode: AccessMode = 'read') -> Iterator[Self]

Open/initialize the store for operations.

Context manager that opens the store with specified access mode. Called internally by __enter__. Child classes should implement backend-specific connection setup/teardown here.

Parameters:

  • mode (AccessMode, default: 'read' ) –

    Access mode for this connection session.

Yields:

  • Self ( Self ) –

    The store instance with connection open

Note

Users should prefer using with store: pattern except when write access mode is needed.

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
@contextmanager
def open(self, mode: AccessMode = "read") -> Iterator[Self]:
    """Open/initialize the store for operations.

    Context manager that opens the store with specified access mode.
    Called internally by `__enter__`.
    Child classes should implement backend-specific connection setup/teardown here.

    Args:
        mode: Access mode for this connection session.

    Yields:
        Self: The store instance with connection open

    Note:
        Users should prefer using `with store:` pattern except when write access mode is needed.
    """
    ...

metaxy.MetadataStore.__enter__

__enter__() -> Self

Enter context manager - opens store in READ mode by default.

Use MetadataStore.open for write access mode instead.

Returns:

  • Self ( Self ) –

    The opened store instance

Source code in src/metaxy/metadata_store/base.py
def __enter__(self) -> Self:
    """Enter context manager - opens store in READ mode by default.

    Use [`MetadataStore.open`][metaxy.metadata_store.base.MetadataStore.open] for write access mode instead.

    Returns:
        Self: The opened store instance
    """
    # Determine mode based on auto_create_tables
    mode = "write" if self.auto_create_tables else "read"

    # Open the store (open() manages _context_depth internally)
    self._open_cm = self.open(mode)
    self._open_cm.__enter__()

    return self

metaxy.MetadataStore.validate_hash_algorithm

validate_hash_algorithm(check_fallback_stores: bool = True) -> None

Validate that hash algorithm is supported by this store's components.

Public method - can be called to verify hash compatibility.

Parameters:

  • check_fallback_stores (bool, default: True ) –

    If True, also validate hash is supported by fallback stores (ensures compatibility for future cross-store operations)

Raises:

  • ValueError

    If hash algorithm not supported by components or fallback stores

Source code in src/metaxy/metadata_store/base.py
def validate_hash_algorithm(
    self,
    check_fallback_stores: bool = True,
) -> None:
    """Validate that hash algorithm is supported by this store's components.

    Public method - can be called to verify hash compatibility.

    Args:
        check_fallback_stores: If True, also validate hash is supported by
            fallback stores (ensures compatibility for future cross-store operations)

    Raises:
        ValueError: If hash algorithm not supported by components or fallback stores
    """
    # Validate hash algorithm support without creating a full engine
    # (engine creation requires a graph which isn't available during store init)
    self._validate_hash_algorithm_support()

    # Check fallback stores
    if check_fallback_stores:
        for fallback in self.fallback_stores:
            fallback.validate_hash_algorithm(check_fallback_stores=False)

metaxy.MetadataStore.allow_cross_project_writes

allow_cross_project_writes() -> Iterator[None]

Context manager to temporarily allow cross-project writes.

This is an escape hatch for legitimate cross-project operations like migrations, where metadata needs to be written to features from different projects.

Example
# During migration, allow writing to features from different projects
with store.allow_cross_project_writes():
    store.write_metadata(feature_from_project_a, metadata_a)
    store.write_metadata(feature_from_project_b, metadata_b)

Yields:

  • None ( None ) –

    The context manager temporarily disables project validation

Source code in src/metaxy/metadata_store/base.py
@contextmanager
def allow_cross_project_writes(self) -> Iterator[None]:
    """Context manager to temporarily allow cross-project writes.

    This is an escape hatch for legitimate cross-project operations like migrations,
    where metadata needs to be written to features from different projects.

    Example:
        ```py
        # During migration, allow writing to features from different projects
        with store.allow_cross_project_writes():
            store.write_metadata(feature_from_project_a, metadata_a)
            store.write_metadata(feature_from_project_b, metadata_b)
        ```

    Yields:
        None: The context manager temporarily disables project validation
    """
    previous_value = self._allow_cross_project_writes
    try:
        self._allow_cross_project_writes = True
        yield
    finally:
        self._allow_cross_project_writes = previous_value

metaxy.MetadataStore.write_metadata_to_store abstractmethod

write_metadata_to_store(feature_key: FeatureKey, df: Frame, **kwargs: Any) -> None

Internal write implementation (backend-specific).

Backends may convert to their specific type if needed (e.g., Polars, Ibis).

Parameters:

  • feature_key (FeatureKey) –

    Feature key to write to

  • df (Frame) –

    Narwhals-compatible DataFrame with metadata to write

  • **kwargs (Any, default: {} ) –

    Backend-specific parameters

Note: Subclasses implement this for their storage backend.

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def write_metadata_to_store(
    self,
    feature_key: FeatureKey,
    df: Frame,
    **kwargs: Any,
) -> None:
    """
    Internal write implementation (backend-specific).

    Backends may convert to their specific type if needed (e.g., Polars, Ibis).

    Args:
        feature_key: Feature key to write to
        df: [Narwhals](https://narwhals-dev.github.io/narwhals/)-compatible DataFrame with metadata to write
        **kwargs: Backend-specific parameters

    Note: Subclasses implement this for their storage backend.
    """
    pass

metaxy.MetadataStore.drop_feature_metadata

drop_feature_metadata(feature: CoercibleToFeatureKey) -> None

Drop all metadata for a feature.

This removes all stored metadata for the specified feature from the store. Useful for cleanup in tests or when re-computing feature metadata from scratch.

Warning

This operation is irreversible and will permanently delete all metadata for the specified feature.

Parameters:

Example
store.drop_feature_metadata(MyFeature)
assert not store.has_feature(MyFeature)
Source code in src/metaxy/metadata_store/base.py
def drop_feature_metadata(self, feature: CoercibleToFeatureKey) -> None:
    """Drop all metadata for a feature.

    This removes all stored metadata for the specified feature from the store.
    Useful for cleanup in tests or when re-computing feature metadata from scratch.

    Warning:
        This operation is irreversible and will **permanently delete all metadata** for the specified feature.

    Args:
        feature: Feature class or key to drop metadata for

    Example:
        ```py
        store.drop_feature_metadata(MyFeature)
        assert not store.has_feature(MyFeature)
        ```
    """
    self._check_open()
    feature_key = self._resolve_feature_key(feature)
    self._drop_feature_metadata_impl(feature_key)

metaxy.MetadataStore.read_metadata_in_store abstractmethod

read_metadata_in_store(feature: CoercibleToFeatureKey, *, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, **kwargs: Any) -> LazyFrame[Any] | None

Read metadata from THIS store only without using any fallbacks stores.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to read metadata for

  • filters (Sequence[Expr] | None, default: None ) –

    List of Narwhals filter expressions for this specific feature.

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to return

  • **kwargs (Any, default: {} ) –

    Backend-specific parameters

Returns:

  • LazyFrame[Any] | None

    Narwhals LazyFrame with metadata, or None if feature not found in the store

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def read_metadata_in_store(
    self,
    feature: CoercibleToFeatureKey,
    *,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    **kwargs: Any,
) -> nw.LazyFrame[Any] | None:
    """
    Read metadata from THIS store only without using any fallbacks stores.

    Args:
        feature: Feature to read metadata for
        filters: List of Narwhals filter expressions for this specific feature.
        columns: Subset of columns to return
        **kwargs: Backend-specific parameters

    Returns:
        Narwhals LazyFrame with metadata, or None if feature not found in the store
    """
    pass

metaxy.MetadataStore.has_feature

has_feature(feature: CoercibleToFeatureKey, *, check_fallback: bool = False) -> bool

Check if feature exists in store.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to check

  • check_fallback (bool, default: False ) –

    If True, also check fallback stores

Returns:

  • bool

    True if feature exists, False otherwise

Source code in src/metaxy/metadata_store/base.py
def has_feature(
    self,
    feature: CoercibleToFeatureKey,
    *,
    check_fallback: bool = False,
) -> bool:
    """
    Check if feature exists in store.

    Args:
        feature: Feature to check
        check_fallback: If True, also check fallback stores

    Returns:
        True if feature exists, False otherwise
    """
    self._check_open()

    if self.read_metadata_in_store(feature) is not None:
        return True

    # Check fallback stores
    if not check_fallback:
        return self._has_feature_impl(feature)
    else:
        for store in self.fallback_stores:
            if store.has_feature(feature, check_fallback=True):
                return True

    return False

metaxy.MetadataStore.display abstractmethod

display() -> str

Return a human-readable display string for this store.

Used in warnings, logs, and CLI output to identify the store.

Returns:

  • str

    Display string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def display(self) -> str:
    """Return a human-readable display string for this store.

    Used in warnings, logs, and CLI output to identify the store.

    Returns:
        Display string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")
    """
    pass

metaxy.MetadataStore.get_store_metadata

get_store_metadata(feature_key: CoercibleToFeatureKey) -> dict[str, Any]

Arbitrary key-value pairs with useful metadata like path in storage.

Useful for logging purposes. This method should not expose sensitive information.

Source code in src/metaxy/metadata_store/base.py
def get_store_metadata(self, feature_key: CoercibleToFeatureKey) -> dict[str, Any]:
    """Arbitrary key-value pairs with useful metadata like path in storage.

    Useful for logging purposes. This method should not expose sensitive information.
    """
    return {}

metaxy.MetadataStore.copy_metadata

copy_metadata(from_store: MetadataStore, features: list[CoercibleToFeatureKey] | None = None, *, from_snapshot: str | None = None, filters: Mapping[str, Sequence[Expr]] | None = None, incremental: bool = True) -> dict[str, int]

Copy metadata from another store with fine-grained filtering.

This is a reusable method that can be called programmatically or from CLI/migrations. Copies metadata for specified features, preserving the original snapshot_version.

Parameters:

  • from_store (MetadataStore) –

    Source metadata store to copy from (must be opened)

  • features (list[CoercibleToFeatureKey] | None, default: None ) –

    List of features to copy. Can be: - None: copies all features from source store - List of FeatureKey or Feature classes: copies specified features

  • from_snapshot (str | None, default: None ) –

    Snapshot version to filter source data by. If None, uses latest snapshot from source store. Only rows with this snapshot_version will be copied. The snapshot_version is preserved in the destination store.

  • filters (Mapping[str, Sequence[Expr]] | None, default: None ) –

    Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions. These filters are applied when reading from the source store. Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}

  • incremental (bool, default: True ) –

    If True (default), filter out rows that already exist in the destination store by performing an anti-join on sample_uid for the same snapshot_version.

    The implementation uses an anti-join: source LEFT ANTI JOIN destination ON sample_uid filtered by snapshot_version.

    Disabling incremental (incremental=False) may improve performance when: - You know the destination is empty or has no overlap with source - The destination store uses deduplication

    When incremental=False, it's the user's responsibility to avoid duplicates or configure deduplication at the storage layer.

Returns:

  • dict[str, int]

    Dict with statistics: {"features_copied": int, "rows_copied": int}

Raises:

Examples:

# Simple: copy all features from latest snapshot
stats = dest_store.copy_metadata(from_store=source_store)
# Copy specific features from a specific snapshot
stats = dest_store.copy_metadata(
    from_store=source_store,
    features=[FeatureKey(["my_feature"])],
    from_snapshot="abc123",
)
# Copy with filters
stats = dest_store.copy_metadata(
    from_store=source_store,
    filters={"my/feature": [nw.col("sample_uid").is_in(["s1", "s2"])]},
)
# Copy specific features with filters
stats = dest_store.copy_metadata(
    from_store=source_store,
    features=[
        FeatureKey(["feature_a"]),
        FeatureKey(["feature_b"]),
    ],
    filters={
        "feature_a": [nw.col("field_a") > 10, nw.col("sample_uid").is_in(["s1", "s2"])],
        "feature_b": [nw.col("field_b") < 30],
    },
)
Source code in src/metaxy/metadata_store/base.py
def copy_metadata(
    self,
    from_store: MetadataStore,
    features: list[CoercibleToFeatureKey] | None = None,
    *,
    from_snapshot: str | None = None,
    filters: Mapping[str, Sequence[nw.Expr]] | None = None,
    incremental: bool = True,
) -> dict[str, int]:
    """Copy metadata from another store with fine-grained filtering.

    This is a reusable method that can be called programmatically or from CLI/migrations.
    Copies metadata for specified features, preserving the original snapshot_version.

    Args:
        from_store: Source metadata store to copy from (must be opened)
        features: List of features to copy. Can be:
            - None: copies all features from source store
            - List of FeatureKey or Feature classes: copies specified features
        from_snapshot: Snapshot version to filter source data by. If None, uses latest snapshot
            from source store. Only rows with this snapshot_version will be copied.
            The snapshot_version is preserved in the destination store.
        filters: Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions.
            These filters are applied when reading from the source store.
            Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
        incremental: If True (default), filter out rows that already exist in the destination
            store by performing an anti-join on sample_uid for the same snapshot_version.

            The implementation uses an anti-join: source LEFT ANTI JOIN destination ON sample_uid
            filtered by snapshot_version.

            Disabling incremental (incremental=False) may improve performance when:
            - You know the destination is empty or has no overlap with source
            - The destination store uses deduplication

            When incremental=False, it's the user's responsibility to avoid duplicates or
            configure deduplication at the storage layer.

    Returns:
        Dict with statistics: {"features_copied": int, "rows_copied": int}

    Raises:
        ValueError: If from_store or self (destination) is not open
        FeatureNotFoundError: If a specified feature doesn't exist in source store

    Examples:
        ```py
        # Simple: copy all features from latest snapshot
        stats = dest_store.copy_metadata(from_store=source_store)
        ```

        ```py
        # Copy specific features from a specific snapshot
        stats = dest_store.copy_metadata(
            from_store=source_store,
            features=[FeatureKey(["my_feature"])],
            from_snapshot="abc123",
        )
        ```

        ```py
        # Copy with filters
        stats = dest_store.copy_metadata(
            from_store=source_store,
            filters={"my/feature": [nw.col("sample_uid").is_in(["s1", "s2"])]},
        )
        ```

        ```py
        # Copy specific features with filters
        stats = dest_store.copy_metadata(
            from_store=source_store,
            features=[
                FeatureKey(["feature_a"]),
                FeatureKey(["feature_b"]),
            ],
            filters={
                "feature_a": [nw.col("field_a") > 10, nw.col("sample_uid").is_in(["s1", "s2"])],
                "feature_b": [nw.col("field_b") < 30],
            },
        )
        ```
    """
    import logging

    logger = logging.getLogger(__name__)

    # Validate destination store is open
    if not self._is_open:
        raise ValueError(
            'Destination store must be opened with store.open("write") before use'
        )

    # Auto-open source store if not already open
    if not from_store._is_open:
        with from_store.open("read"):
            return self._copy_metadata_impl(
                from_store=from_store,
                features=features,
                from_snapshot=from_snapshot,
                filters=filters,
                incremental=incremental,
                logger=logger,
            )
    else:
        return self._copy_metadata_impl(
            from_store=from_store,
            features=features,
            from_snapshot=from_snapshot,
            filters=filters,
            incremental=incremental,
            logger=logger,
        )

metaxy.metadata_store.types.AccessMode module-attribute

AccessMode = Literal['read', 'write']

metaxy.metadata_store.base.VersioningEngineOptions module-attribute

VersioningEngineOptions = Literal['auto', 'native', 'polars']

Base Configuration Class

The following base configuration class is typically used by child metadata stores:

metaxy.metadata_store.base.MetadataStoreConfig

Bases: BaseSettings

Base configuration class for metadata stores.

This class defines common configuration fields shared by all metadata store types. Store-specific config classes should inherit from this and add their own fields.

Example
from metaxy.metadata_store.duckdb import DuckDBMetadataStoreConfig

config = DuckDBMetadataStoreConfig(
    database="metadata.db",
    hash_algorithm=HashAlgorithm.MD5,
)

store = DuckDBMetadataStore.from_config(config)

Configuration

The base MetadataStoreConfig class injects the following configuration options:

fallback_stores

List of fallback store names to search when features are not found in the current store.

Type: list[str]

[stores.dev.config]
# Optional
# fallback_stores = []
[tool.metaxy.stores.dev.config]
# Optional
# fallback_stores = []
export METAXY_STORES__DEV__CONFIG__FALLBACK_STORES=...

hash_algorithm

Hash algorithm for versioning. If None, uses store's default.

Type: metaxy.versioning.types.HashAlgorithm | None

[stores.dev.config]
# Optional
# hash_algorithm = null
[tool.metaxy.stores.dev.config]
# Optional
# hash_algorithm = null
export METAXY_STORES__DEV__CONFIG__HASH_ALGORITHM=...

versioning_engine

Which versioning engine to use: 'auto' (prefer native), 'native', or 'polars'.

Type: Literal['auto', 'native', 'polars'] | Default: "auto"

[stores.dev.config]
versioning_engine = "auto"
[tool.metaxy.stores.dev.config]
versioning_engine = "auto"
export METAXY_STORES__DEV__CONFIG__VERSIONING_ENGINE=auto

Project Write Validation

By default, MetadataStore raises a ValueError when attempting to write to a project that doesn't match the expected project from MetaxyConfig.get().project.

For legitimate cross-project operations (such as migrations that need to update features across multiple projects), use MetadataStore.allow_cross_project_writes:

with store.open("write"), store.allow_cross_project_writes():
    store.write_metadata(ExternallyDefinedFeature, df)