Metadata Stores¶
Metaxy abstracts interactions with metadata behind an interface called MetadaStore.
Users can extend this class to implement support for arbitrary metadata storage such as databases, lakehouse formats, or really any kind of external system. Metaxy has built-in support for the following metadata store types:
Databases¶
-
IbisMetadataStore(a base class) - see Ibis integration
Storage Only¶
Metadata Store Interface¶
metaxy.MetadataStore
¶
MetadataStore(*, versioning_engine_cls: type[VersioningEngineT], hash_algorithm: HashAlgorithm | None = None, versioning_engine: VersioningEngineOptions = 'auto', fallback_stores: list[MetadataStore] | None = None, auto_create_tables: bool | None = None, materialization_id: str | None = None)
Bases: ABC
Abstract base class for metadata storage backends.
Parameters:
-
hash_algorithm(HashAlgorithm | None, default:None) –Hash algorithm to use for the versioning engine.
-
versioning_engine(VersioningEngineOptions, default:'auto') –Which versioning engine to use.
-
"auto": Prefer the store's native engine, fall back to Polars if needed
-
"native": Always use the store's native engine, raise
VersioningEngineMismatchErrorif provided dataframes are incompatible -
"polars": Always use the Polars engine
-
-
fallback_stores(list[MetadataStore] | None, default:None) –Ordered list of read-only fallback stores. Used when upstream features are not in this store.
VersioningEngineMismatchErroris not raised when reading from fallback stores. -
auto_create_tables(bool | None, default:None) –If True, automatically create tables when opening the store. If None (default), reads from global MetaxyConfig (which reads from METAXY_AUTO_CREATE_TABLES env var). If False, never auto-create tables.
Warning
Auto-create is intended for development/testing only. Use proper database migration tools like Alembic for production deployments.
-
materialization_id(str | None, default:None) –Optional external orchestration ID. If provided, all metadata writes will include this ID in the
metaxy_materialization_idcolumn. Can be overridden perMetadataStore.write_metadatacall.
Raises:
-
ValueError–If fallback stores use different hash algorithms or truncation lengths
-
VersioningEngineMismatchError–If a user-provided dataframe has a wrong implementation and versioning_engine is set to
native
Source code in src/metaxy/metadata_store/base.py
def __init__(
self,
*,
versioning_engine_cls: type[VersioningEngineT],
hash_algorithm: HashAlgorithm | None = None,
versioning_engine: VersioningEngineOptions = "auto",
fallback_stores: list[MetadataStore] | None = None,
auto_create_tables: bool | None = None,
materialization_id: str | None = None,
):
"""
Initialize the metadata store.
Args:
hash_algorithm: Hash algorithm to use for the versioning engine.
versioning_engine: Which versioning engine to use.
- "auto": Prefer the store's native engine, fall back to Polars if needed
- "native": Always use the store's native engine, raise `VersioningEngineMismatchError`
if provided dataframes are incompatible
- "polars": Always use the Polars engine
fallback_stores: Ordered list of read-only fallback stores.
Used when upstream features are not in this store.
`VersioningEngineMismatchError` is not raised when reading from fallback stores.
auto_create_tables: If True, automatically create tables when opening the store.
If None (default), reads from global MetaxyConfig (which reads from METAXY_AUTO_CREATE_TABLES env var).
If False, never auto-create tables.
!!! warning
Auto-create is intended for development/testing only.
Use proper database migration tools like Alembic for production deployments.
materialization_id: Optional external orchestration ID.
If provided, all metadata writes will include this ID in the `metaxy_materialization_id` column.
Can be overridden per [`MetadataStore.write_metadata`][metaxy.MetadataStore.write_metadata] call.
Raises:
ValueError: If fallback stores use different hash algorithms or truncation lengths
VersioningEngineMismatchError: If a user-provided dataframe has a wrong implementation
and versioning_engine is set to `native`
"""
# Initialize state early so properties can check it
self._is_open = False
self._context_depth = 0
self._versioning_engine = versioning_engine
self._allow_cross_project_writes = False
self._materialization_id = materialization_id
self._open_cm: AbstractContextManager[Self] | None = (
None # Track the open() context manager
)
self.versioning_engine_cls = versioning_engine_cls
# Resolve auto_create_tables from global config if not explicitly provided
if auto_create_tables is None:
from metaxy.config import MetaxyConfig
self.auto_create_tables = MetaxyConfig.get().auto_create_tables
else:
self.auto_create_tables = auto_create_tables
# Use store's default algorithm if not specified
if hash_algorithm is None:
hash_algorithm = self._get_default_hash_algorithm()
self.hash_algorithm = hash_algorithm
self.fallback_stores = fallback_stores or []
Attributes¶
metaxy.MetadataStore.materialization_id
property
¶
materialization_id: str | None
The external orchestration ID for this store instance.
If set, all metadata writes include this ID in the metaxy_materialization_id column,
allowing filtering of rows written during a specific materialization run.
Functions¶
metaxy.MetadataStore.config_model
abstractmethod
classmethod
¶
config_model() -> type[MetadataStoreConfig]
Return the configuration model class for this store type.
Subclasses must override this to return their specific config class.
Returns:
-
type[MetadataStoreConfig]–The config class type (e.g., DuckDBMetadataStoreConfig)
Note
Subclasses override this with a more specific return type. Type checkers may show a warning about incompatible override, but this is intentional - each store returns its own config type.
Source code in src/metaxy/metadata_store/base.py
@classmethod
@abstractmethod
def config_model(cls) -> type[MetadataStoreConfig]:
"""Return the configuration model class for this store type.
Subclasses must override this to return their specific config class.
Returns:
The config class type (e.g., DuckDBMetadataStoreConfig)
Note:
Subclasses override this with a more specific return type.
Type checkers may show a warning about incompatible override,
but this is intentional - each store returns its own config type.
"""
...
metaxy.MetadataStore.from_config
classmethod
¶
from_config(config: MetadataStoreConfig, **kwargs: Any) -> Self
Create a store instance from a configuration object.
This method creates a store by: 1. Converting the config to a dict 2. Resolving fallback store names to actual store instances 3. Calling the store's init with the config parameters
Parameters:
-
config(MetadataStoreConfig) –Configuration object (should be the type returned by config_model())
-
**kwargs(Any, default:{}) –Additional arguments passed directly to the store constructor (e.g., materialization_id for runtime parameters not in config)
Returns:
-
Self–A new store instance configured according to the config object
Example
Source code in src/metaxy/metadata_store/base.py
@classmethod
def from_config(cls, config: MetadataStoreConfig, **kwargs: Any) -> Self:
"""Create a store instance from a configuration object.
This method creates a store by:
1. Converting the config to a dict
2. Resolving fallback store names to actual store instances
3. Calling the store's __init__ with the config parameters
Args:
config: Configuration object (should be the type returned by config_model())
**kwargs: Additional arguments passed directly to the store constructor
(e.g., materialization_id for runtime parameters not in config)
Returns:
A new store instance configured according to the config object
Example:
```python
from metaxy.metadata_store.duckdb import (
DuckDBMetadataStore,
DuckDBMetadataStoreConfig,
)
config = DuckDBMetadataStoreConfig(
database="metadata.db",
fallback_stores=["prod"],
)
store = DuckDBMetadataStore.from_config(config)
```
"""
# Convert config to dict, excluding unset values
config_dict = config.model_dump(exclude_unset=True)
# Pop and resolve fallback store names to actual store instances
fallback_store_names = config_dict.pop("fallback_stores", [])
fallback_stores = [
MetaxyConfig.get().get_store(name) for name in fallback_store_names
]
# Create store with resolved fallback stores, config, and extra kwargs
return cls(fallback_stores=fallback_stores, **config_dict, **kwargs)
metaxy.MetadataStore.resolve_update
¶
resolve_update(feature: type[BaseFeature], *, samples: IntoFrame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: Literal[False] = False, versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> Increment
resolve_update(feature: type[BaseFeature], *, samples: IntoFrame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: Literal[True], versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> LazyIncrement
resolve_update(feature: type[BaseFeature], *, samples: IntoFrame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: bool = False, versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> Increment | LazyIncrement
Calculate an incremental update for a feature.
This is the main workhorse in Metaxy.
Parameters:
-
feature(type[BaseFeature]) –Feature class to resolve updates for
-
samples(IntoFrame | None, default:None) –A dataframe with joined upstream metadata and
"metaxy_provenance_by_field"column set. When provided,MetadataStoreskips loading upstream feature metadata and provenance calculations.Required for root features
Metaxy doesn't know how to populate input metadata for root features, so
samplesargument for must be provided for them.Tip
For non-root features, use
samplesto customize the automatic upstream loading and field provenance calculation. For example, it can be used to requires processing for specific sample IDs.Setting this parameter during normal operations is not required.
-
filters(Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None, default:None) –A mapping from feature keys to lists of Narwhals filter expressions. Keys can be feature classes, FeatureKey objects, or string paths. Applied at read-time. May filter the current feature, in this case it will also be applied to
samples(if provided). Example:{UpstreamFeature: [nw.col("x") > 10], ...} -
global_filters(Sequence[Expr] | None, default:None) –A list of Narwhals filter expressions applied to all features. These filters are combined with any feature-specific filters from
filters. Useful for filtering by common columns likesample_uidacross all features. Example:[nw.col("sample_uid").is_in(["s1", "s2"])] -
lazy(bool, default:False) –Whether to return a metaxy.versioning.types.LazyIncrement or a metaxy.versioning.types.Increment.
-
versioning_engine(Literal['auto', 'native', 'polars'] | None, default:None) –Override the store's versioning engine for this operation.
-
skip_comparison(bool, default:False) –If True, skip the increment comparison logic and return all upstream samples in
Increment.added. Thechangedandremovedframes will be empty.
Raises:
-
ValueError–If no
samplesdataframe has been provided when resolving an update for a root feature. -
VersioningEngineMismatchError–If
versioning_enginehas been set to"native"and a dataframe of a different implementation has been encountered duringresolve_update.
With a root feature
Source code in src/metaxy/metadata_store/base.py
def resolve_update(
self,
feature: type[BaseFeature],
*,
samples: IntoFrame | None = None,
filters: Mapping[CoercibleToFeatureKey, Sequence[nw.Expr]] | None = None,
global_filters: Sequence[nw.Expr] | None = None,
lazy: bool = False,
versioning_engine: Literal["auto", "native", "polars"] | None = None,
skip_comparison: bool = False,
**kwargs: Any,
) -> Increment | LazyIncrement:
"""Calculate an incremental update for a feature.
This is the main workhorse in Metaxy.
Args:
feature: Feature class to resolve updates for
samples: A dataframe with joined upstream metadata and `"metaxy_provenance_by_field"` column set.
When provided, `MetadataStore` skips loading upstream feature metadata and provenance calculations.
!!! info "Required for root features"
Metaxy doesn't know how to populate input metadata for root features,
so `samples` argument for **must** be provided for them.
!!! tip
For non-root features, use `samples` to customize the automatic upstream loading and field provenance calculation.
For example, it can be used to requires processing for specific sample IDs.
Setting this parameter during normal operations is not required.
filters: A mapping from feature keys to lists of Narwhals filter expressions.
Keys can be feature classes, FeatureKey objects, or string paths.
Applied at read-time. May filter the current feature,
in this case it will also be applied to `samples` (if provided).
Example: `{UpstreamFeature: [nw.col("x") > 10], ...}`
global_filters: A list of Narwhals filter expressions applied to all features.
These filters are combined with any feature-specific filters from `filters`.
Useful for filtering by common columns like `sample_uid` across all features.
Example: `[nw.col("sample_uid").is_in(["s1", "s2"])]`
lazy: Whether to return a [metaxy.versioning.types.LazyIncrement][] or a [metaxy.versioning.types.Increment][].
versioning_engine: Override the store's versioning engine for this operation.
skip_comparison: If True, skip the increment comparison logic and return all
upstream samples in `Increment.added`. The `changed` and `removed` frames will
be empty.
Raises:
ValueError: If no `samples` dataframe has been provided when resolving an update for a root feature.
VersioningEngineMismatchError: If `versioning_engine` has been set to `"native"`
and a dataframe of a different implementation has been encountered during `resolve_update`.
!!! example "With a root feature"
```py
samples = pl.DataFrame({
"sample_uid": [1, 2, 3],
"metaxy_provenance_by_field": [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
})
result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
```
"""
import narwhals as nw
# Convert samples to Narwhals frame if not already
samples_nw: nw.DataFrame[Any] | nw.LazyFrame[Any] | None = None
if samples is not None:
if isinstance(samples, (nw.DataFrame, nw.LazyFrame)):
samples_nw = samples
else:
samples_nw = nw.from_native(samples)
# Normalize filter keys to FeatureKey
normalized_filters: dict[FeatureKey, list[nw.Expr]] = {}
if filters:
for key, exprs in filters.items():
feature_key = self._resolve_feature_key(key)
normalized_filters[feature_key] = list(exprs)
# Convert global_filters to a list for easy concatenation
global_filter_list = list(global_filters) if global_filters else []
graph = current_graph()
plan = graph.get_feature_plan(feature.spec().key)
# Root features without samples: error (samples required)
if not plan.deps and samples_nw is None:
raise ValueError(
f"Feature {feature.spec().key} has no upstream dependencies (root feature). "
f"Must provide 'samples' parameter with sample_uid and {METAXY_PROVENANCE_BY_FIELD} columns. "
f"Root features require manual {METAXY_PROVENANCE_BY_FIELD} computation."
)
# Combine feature-specific filters with global filters
current_feature_filters = [
*normalized_filters.get(feature.spec().key, []),
*global_filter_list,
]
current_metadata = self.read_metadata_in_store(
feature,
filters=[
nw.col(METAXY_FEATURE_VERSION)
== graph.get_feature_version(feature.spec().key),
*current_feature_filters,
],
)
upstream_by_key: dict[FeatureKey, nw.LazyFrame[Any]] = {}
filters_by_key: dict[FeatureKey, list[nw.Expr]] = {}
# if samples are provided, use them as source of truth for upstream data
if samples_nw is not None:
# Apply filters to samples if any
filtered_samples = samples_nw
if current_feature_filters:
filtered_samples = samples_nw.filter(current_feature_filters)
# fill in METAXY_PROVENANCE column if it's missing (e.g. for root features)
samples_nw = self.hash_struct_version_column(
plan,
df=filtered_samples,
struct_column=METAXY_PROVENANCE_BY_FIELD,
hash_column=METAXY_PROVENANCE,
)
# For root features, add data_version columns if they don't exist
# (root features have no computation, so data_version equals provenance)
if METAXY_DATA_VERSION_BY_FIELD not in samples_nw.columns:
samples_nw = samples_nw.with_columns(
nw.col(METAXY_PROVENANCE_BY_FIELD).alias(
METAXY_DATA_VERSION_BY_FIELD
),
nw.col(METAXY_PROVENANCE).alias(METAXY_DATA_VERSION),
)
else:
for upstream_spec in plan.deps or []:
# Combine feature-specific filters with global filters for upstream
upstream_filters = [
*normalized_filters.get(upstream_spec.key, []),
*global_filter_list,
]
upstream_feature_metadata = self.read_metadata(
upstream_spec.key,
filters=upstream_filters,
)
if upstream_feature_metadata is not None:
upstream_by_key[upstream_spec.key] = upstream_feature_metadata
# determine which implementation to use for resolving the increment
# consider (1) whether all upstream metadata has been loaded with the native implementation
# (2) if samples have native implementation
# Use parameter if provided, otherwise use store default
engine_mode = (
versioning_engine
if versioning_engine is not None
else self._versioning_engine
)
# If "polars" mode, force Polars immediately
if engine_mode == "polars":
implementation = nw.Implementation.POLARS
switched_to_polars = True
else:
implementation = self.native_implementation()
switched_to_polars = False
for upstream_key, df in upstream_by_key.items():
if df.implementation != implementation:
switched_to_polars = True
# Only raise error in "native" mode if no fallback stores configured.
# If fallback stores exist, the implementation mismatch indicates data came
# from fallback (different implementation), which is legitimate fallback access.
# If data were local, it would have the native implementation.
if engine_mode == "native" and not self.fallback_stores:
raise VersioningEngineMismatchError(
f"versioning_engine='native' but upstream feature `{upstream_key.to_string()}` "
f"has implementation {df.implementation}, expected {self.native_implementation()}"
)
elif engine_mode == "auto" or (
engine_mode == "native" and self.fallback_stores
):
PolarsMaterializationWarning.warn_on_implementation_mismatch(
expected=self.native_implementation(),
actual=df.implementation,
message=f"Using Polars for resolving the increment instead. This was caused by upstream feature `{upstream_key.to_string()}`.",
)
implementation = nw.Implementation.POLARS
break
if (
samples_nw is not None
and samples_nw.implementation != self.native_implementation()
):
if not switched_to_polars:
if engine_mode == "native":
# Always raise error for samples with wrong implementation, regardless
# of fallback stores, because samples come from user argument, not from fallback
raise VersioningEngineMismatchError(
f"versioning_engine='native' but provided `samples` have implementation {samples_nw.implementation}, "
f"expected {self.native_implementation()}"
)
elif engine_mode == "auto":
PolarsMaterializationWarning.warn_on_implementation_mismatch(
expected=self.native_implementation(),
actual=samples_nw.implementation,
message=f"Provided `samples` have implementation {samples_nw.implementation}. Using Polars for resolving the increment instead.",
)
implementation = nw.Implementation.POLARS
switched_to_polars = True
if switched_to_polars:
if current_metadata:
current_metadata = switch_implementation_to_polars(current_metadata)
if samples_nw:
samples_nw = switch_implementation_to_polars(samples_nw)
for upstream_key, df in upstream_by_key.items():
upstream_by_key[upstream_key] = switch_implementation_to_polars(df)
with self.create_versioning_engine(
plan=plan, implementation=implementation
) as engine:
if skip_comparison:
# Skip comparison: return all upstream samples as added
if samples_nw is not None:
# Root features or user-provided samples: use samples directly
# Note: samples already has metaxy_provenance computed
added = samples_nw.lazy()
else:
# Non-root features: load all upstream with provenance
added = engine.load_upstream_with_provenance(
upstream=upstream_by_key,
hash_algo=self.hash_algorithm,
filters=filters_by_key,
)
changed = None
removed = None
else:
added, changed, removed = engine.resolve_increment_with_provenance(
current=current_metadata,
upstream=upstream_by_key,
hash_algorithm=self.hash_algorithm,
filters=filters_by_key,
sample=samples_nw.lazy() if samples_nw is not None else None,
)
# Convert None to empty DataFrames
if changed is None:
changed = empty_frame_like(added)
if removed is None:
removed = empty_frame_like(added)
if lazy:
return LazyIncrement(
added=added
if isinstance(added, nw.LazyFrame)
else nw.from_native(added),
changed=changed
if isinstance(changed, nw.LazyFrame)
else nw.from_native(changed),
removed=removed
if isinstance(removed, nw.LazyFrame)
else nw.from_native(removed),
)
else:
return Increment(
added=added.collect() if isinstance(added, nw.LazyFrame) else added,
changed=changed.collect()
if isinstance(changed, nw.LazyFrame)
else changed,
removed=removed.collect()
if isinstance(removed, nw.LazyFrame)
else removed,
)
metaxy.MetadataStore.read_metadata
¶
read_metadata(feature: CoercibleToFeatureKey, *, feature_version: str | None = None, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, allow_fallback: bool = True, current_only: bool = True, latest_only: bool = True) -> LazyFrame[Any]
Read metadata with optional fallback to upstream stores.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to read metadata for
-
feature_version(str | None, default:None) –Explicit feature_version to filter by (mutually exclusive with current_only=True)
-
filters(Sequence[Expr] | None, default:None) –Sequence of Narwhals filter expressions to apply to this feature. Example:
[nw.col("x") > 10, nw.col("y") < 5] -
columns(Sequence[str] | None, default:None) –Subset of columns to include. Metaxy's system columns are always included.
-
allow_fallback(bool, default:True) –If
True, check fallback stores on local miss -
current_only(bool, default:True) –If
True, only return rows with current feature_version -
latest_only(bool, default:True) –Whether to deduplicate samples within
id_columnsgroups ordered bymetaxy_created_at.
Returns:
Raises:
-
FeatureNotFoundError–If feature not found in any store
-
SystemDataNotFoundError–When attempting to read non-existent Metaxy system data
-
ValueError–If both feature_version and current_only=True are provided
Info
When this method is called with default arguments, it will return the latest (by metaxy_created_at)
metadata for the current feature version. Therefore, it's perfectly suitable for most use cases.
Warning
The order of rows is not guaranteed.
Source code in src/metaxy/metadata_store/base.py
def read_metadata(
self,
feature: CoercibleToFeatureKey,
*,
feature_version: str | None = None,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
current_only: bool = True,
latest_only: bool = True,
) -> nw.LazyFrame[Any]:
"""
Read metadata with optional fallback to upstream stores.
Args:
feature: Feature to read metadata for
feature_version: Explicit feature_version to filter by (mutually exclusive with current_only=True)
filters: Sequence of Narwhals filter expressions to apply to this feature.
Example: `[nw.col("x") > 10, nw.col("y") < 5]`
columns: Subset of columns to include. Metaxy's system columns are always included.
allow_fallback: If `True`, check fallback stores on local miss
current_only: If `True`, only return rows with current feature_version
latest_only: Whether to deduplicate samples within `id_columns` groups ordered by `metaxy_created_at`.
Returns:
Narwhals LazyFrame with metadata
Raises:
FeatureNotFoundError: If feature not found in any store
SystemDataNotFoundError: When attempting to read non-existent Metaxy system data
ValueError: If both feature_version and current_only=True are provided
!!! info
When this method is called with default arguments, it will return the latest (by `metaxy_created_at`)
metadata for the current feature version. Therefore, it's perfectly suitable for most use cases.
!!! warning
The order of rows is not guaranteed.
"""
filters = filters or []
columns = columns or []
feature_key = self._resolve_feature_key(feature)
is_system_table = self._is_system_table(feature_key)
# Validate mutually exclusive parameters
if feature_version is not None and current_only:
raise ValueError(
"Cannot specify both feature_version and current_only=True. "
"Use current_only=False with feature_version parameter."
)
# Add feature_version filter only when needed
if current_only or feature_version is not None and not is_system_table:
version_filter = nw.col(METAXY_FEATURE_VERSION) == (
current_graph().get_feature_version(feature_key)
if current_only
else feature_version
)
filters = [version_filter, *filters]
if columns and not is_system_table:
# Add only system columns that aren't already in the user's columns list
columns_set = set(columns)
missing_system_cols = [
c for c in ALL_SYSTEM_COLUMNS if c not in columns_set
]
read_columns = [*columns, *missing_system_cols]
else:
read_columns = None
lazy_frame = None
try:
lazy_frame = self.read_metadata_in_store(
feature, filters=filters, columns=read_columns
)
except FeatureNotFoundError as e:
# do not read system features from fallback stores
if is_system_table:
raise SystemDataNotFoundError(
f"System Metaxy data with key {feature_key} is missing in {self.display()}. Invoke `metaxy graph push` before attempting to read system data."
) from e
# Handle case where read_metadata_in_store returns None (no exception raised)
if lazy_frame is None and is_system_table:
raise SystemDataNotFoundError(
f"System Metaxy data with key {feature_key} is missing in {self.display()}. Invoke `metaxy graph push` before attempting to read system data."
)
if lazy_frame is not None and not is_system_table and latest_only:
from metaxy.models.constants import METAXY_CREATED_AT
# Apply deduplication
lazy_frame = self.versioning_engine_cls.keep_latest_by_group(
df=lazy_frame,
group_columns=list(
self._resolve_feature_plan(feature_key).feature.id_columns
),
timestamp_column=METAXY_CREATED_AT,
)
if lazy_frame is not None:
# After dedup, filter to requested columns if specified
if columns:
lazy_frame = lazy_frame.select(columns)
return lazy_frame
# Try fallback stores
if allow_fallback:
for store in self.fallback_stores:
try:
# Use full read_metadata to handle nested fallback chains
return store.read_metadata(
feature,
feature_version=feature_version,
filters=filters,
columns=columns,
allow_fallback=True,
current_only=current_only,
latest_only=latest_only,
)
except FeatureNotFoundError:
# Try next fallback store
continue
# Not found anywhere
raise FeatureNotFoundError(
f"Feature {feature_key.to_string()} not found in store"
+ (" or fallback stores" if allow_fallback else "")
)
metaxy.MetadataStore.write_metadata
¶
write_metadata(feature: CoercibleToFeatureKey, df: IntoFrame, materialization_id: str | None = None) -> None
Write metadata for a feature (append-only by design).
Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to write metadata for
-
df(IntoFrame) –Metadata DataFrame of any type supported by Narwhals. Must have
metaxy_provenance_by_fieldcolumn of type Struct with fields matching feature's fields. Optionally, may also containmetaxy_data_version_by_field. -
materialization_id(str | None, default:None) –Optional external orchestration ID for this write. Overrides the store's default
materialization_idif provided. Useful for tracking which orchestration run produced this metadata.
Raises:
-
MetadataSchemaError–If DataFrame schema is invalid
-
StoreNotOpenError–If store is not open
-
ValueError–If writing to a feature from a different project than expected
Note
-
Must be called within a
MetadataStore.open(mode="write")context manager. -
Metaxy always performs an "append" operation. Metadata is never deleted or mutated.
-
Fallback stores are never used for writes.
-
Features from other Metaxy projects cannot be written to, unless project validation has been disabled with MetadataStore.allow_cross_project_writes.
Source code in src/metaxy/metadata_store/base.py
def write_metadata(
self,
feature: CoercibleToFeatureKey,
df: IntoFrame,
materialization_id: str | None = None,
) -> None:
"""
Write metadata for a feature (append-only by design).
Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.
Args:
feature: Feature to write metadata for
df: Metadata DataFrame of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).
Must have `metaxy_provenance_by_field` column of type Struct with fields matching feature's fields.
Optionally, may also contain `metaxy_data_version_by_field`.
materialization_id: Optional external orchestration ID for this write.
Overrides the store's default `materialization_id` if provided.
Useful for tracking which orchestration run produced this metadata.
Raises:
MetadataSchemaError: If DataFrame schema is invalid
StoreNotOpenError: If store is not open
ValueError: If writing to a feature from a different project than expected
Note:
- Must be called within a `MetadataStore.open(mode="write")` context manager.
- Metaxy always performs an "append" operation. Metadata is never deleted or mutated.
- Fallback stores are never used for writes.
- Features from other Metaxy projects cannot be written to, unless project validation has been disabled with [MetadataStore.allow_cross_project_writes][].
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
is_system_table = self._is_system_table(feature_key)
# Validate project for non-system tables
if not is_system_table:
self._validate_project_write(feature)
# Convert Polars to Narwhals to Polars if needed
# if isinstance(df_nw, (pl.DataFrame, pl.LazyFrame)):
df_nw = nw.from_native(df)
assert isinstance(df_nw, nw.DataFrame), "df must be a Narwhal DataFrame"
# For system tables, write directly without feature_version tracking
if is_system_table:
self._validate_schema_system_table(df_nw)
self.write_metadata_to_store(feature_key, df_nw)
return
if METAXY_PROVENANCE_BY_FIELD not in df_nw.columns:
from metaxy.metadata_store.exceptions import MetadataSchemaError
raise MetadataSchemaError(
f"DataFrame must have '{METAXY_PROVENANCE_BY_FIELD}' column"
)
# Add all required system columns
# warning: for dataframes that do not match the native MetadataStore implementation
# and are missing the METAXY_DATA_VERSION column, this call will lead to materializing the equivalent Polars DataFrame
# while calculating the missing METAXY_DATA_VERSION column
df_nw = self._add_system_columns(
df_nw, feature, materialization_id=materialization_id
)
self._validate_schema(df_nw)
self.write_metadata_to_store(feature_key, df_nw)
metaxy.MetadataStore.write_metadata_multi
¶
write_metadata_multi(metadata: Mapping[Any, IntoFrame], materialization_id: str | None = None) -> None
Write metadata for multiple features in reverse topological order.
Processes features so that dependents are written before their dependencies. This ordering ensures that downstream features are written first, which can be useful for certain data consistency requirements or when features need to be processed in a specific order.
Parameters:
-
metadata(Mapping[Any, IntoFrame]) –Mapping from feature keys to metadata DataFrames. Keys can be any type coercible to FeatureKey (string, sequence, FeatureKey, or BaseFeature class). Values must be DataFrames compatible with Narwhals, containing required system columns.
-
materialization_id(str | None, default:None) –Optional external orchestration ID for all writes. Overrides the store's default
materialization_idif provided. Applied to all feature writes in this batch.
Raises:
-
MetadataSchemaError–If any DataFrame schema is invalid
-
StoreNotOpenError–If store is not open
-
ValueError–If writing to a feature from a different project than expected
Note
- Must be called within a
MetadataStore.open(mode="write")context manager. - Empty mappings are handled gracefully (no-op).
- Each feature's metadata is written via
write_metadata, so all validation and system column handling from that method applies.
Example
Source code in src/metaxy/metadata_store/base.py
def write_metadata_multi(
self,
metadata: Mapping[Any, IntoFrame],
materialization_id: str | None = None,
) -> None:
"""
Write metadata for multiple features in reverse topological order.
Processes features so that dependents are written before their dependencies.
This ordering ensures that downstream features are written first, which can
be useful for certain data consistency requirements or when features need
to be processed in a specific order.
Args:
metadata: Mapping from feature keys to metadata DataFrames.
Keys can be any type coercible to FeatureKey (string, sequence,
FeatureKey, or BaseFeature class). Values must be DataFrames
compatible with Narwhals, containing required system columns.
materialization_id: Optional external orchestration ID for all writes.
Overrides the store's default `materialization_id` if provided.
Applied to all feature writes in this batch.
Raises:
MetadataSchemaError: If any DataFrame schema is invalid
StoreNotOpenError: If store is not open
ValueError: If writing to a feature from a different project than expected
Note:
- Must be called within a `MetadataStore.open(mode="write")` context manager.
- Empty mappings are handled gracefully (no-op).
- Each feature's metadata is written via `write_metadata`, so all
validation and system column handling from that method applies.
Example:
```py
with store.open(mode="write"):
store.write_metadata_multi({
ChildFeature: child_df,
ParentFeature: parent_df,
})
# Features are written in reverse topological order:
# ChildFeature first, then ParentFeature
```
"""
if not metadata:
return
# Build mapping from resolved keys to dataframes in one pass
resolved_metadata = {
self._resolve_feature_key(key): df for key, df in metadata.items()
}
# Get reverse topological order (dependents first)
graph = current_graph()
sorted_keys = graph.topological_sort_features(
list(resolved_metadata.keys()), descending=True
)
# Write metadata in reverse topological order
for feature_key in sorted_keys:
self.write_metadata(
feature_key,
resolved_metadata[feature_key],
materialization_id=materialization_id,
)
metaxy.MetadataStore.native_implementation
¶
native_implementation() -> Implementation
metaxy.MetadataStore.create_versioning_engine
¶
create_versioning_engine(plan: FeaturePlan, implementation: Implementation) -> Iterator[VersioningEngine | PolarsVersioningEngine]
Creates an appropriate provenance engine.
Falls back to Polars implementation if the required implementation differs from the store's native implementation.
Parameters:
-
plan(FeaturePlan) –The feature plan.
-
implementation(Implementation) –The desired engine implementation.
Returns:
-
Iterator[VersioningEngine | PolarsVersioningEngine]–An appropriate provenance engine.
Source code in src/metaxy/metadata_store/base.py
@contextmanager
def create_versioning_engine(
self, plan: FeaturePlan, implementation: nw.Implementation
) -> Iterator[VersioningEngine | PolarsVersioningEngine]:
"""
Creates an appropriate provenance engine.
Falls back to Polars implementation if the required implementation differs from the store's native implementation.
Args:
plan: The feature plan.
implementation: The desired engine implementation.
Returns:
An appropriate provenance engine.
"""
if implementation == nw.Implementation.POLARS:
cm = self._create_polars_versioning_engine(plan)
elif implementation == self.native_implementation():
cm = self._create_versioning_engine(plan)
else:
cm = self._create_polars_versioning_engine(plan)
with cm as engine:
yield engine
metaxy.MetadataStore.open
abstractmethod
¶
open(mode: AccessMode = 'read') -> Iterator[Self]
Open/initialize the store for operations.
Context manager that opens the store with specified access mode.
Called internally by __enter__.
Child classes should implement backend-specific connection setup/teardown here.
Parameters:
-
mode(AccessMode, default:'read') –Access mode for this connection session.
Yields:
-
Self(Self) –The store instance with connection open
Note
Users should prefer using with store: pattern except when write access mode is needed.
Source code in src/metaxy/metadata_store/base.py
@abstractmethod
@contextmanager
def open(self, mode: AccessMode = "read") -> Iterator[Self]:
"""Open/initialize the store for operations.
Context manager that opens the store with specified access mode.
Called internally by `__enter__`.
Child classes should implement backend-specific connection setup/teardown here.
Args:
mode: Access mode for this connection session.
Yields:
Self: The store instance with connection open
Note:
Users should prefer using `with store:` pattern except when write access mode is needed.
"""
...
metaxy.MetadataStore.__enter__
¶
Enter context manager - opens store in READ mode by default.
Use MetadataStore.open for write access mode instead.
Returns:
-
Self(Self) –The opened store instance
Source code in src/metaxy/metadata_store/base.py
def __enter__(self) -> Self:
"""Enter context manager - opens store in READ mode by default.
Use [`MetadataStore.open`][metaxy.metadata_store.base.MetadataStore.open] for write access mode instead.
Returns:
Self: The opened store instance
"""
# Determine mode based on auto_create_tables
mode = "write" if self.auto_create_tables else "read"
# Open the store (open() manages _context_depth internally)
self._open_cm = self.open(mode)
self._open_cm.__enter__()
return self
metaxy.MetadataStore.validate_hash_algorithm
¶
validate_hash_algorithm(check_fallback_stores: bool = True) -> None
Validate that hash algorithm is supported by this store's components.
Public method - can be called to verify hash compatibility.
Parameters:
-
check_fallback_stores(bool, default:True) –If True, also validate hash is supported by fallback stores (ensures compatibility for future cross-store operations)
Raises:
-
ValueError–If hash algorithm not supported by components or fallback stores
Source code in src/metaxy/metadata_store/base.py
def validate_hash_algorithm(
self,
check_fallback_stores: bool = True,
) -> None:
"""Validate that hash algorithm is supported by this store's components.
Public method - can be called to verify hash compatibility.
Args:
check_fallback_stores: If True, also validate hash is supported by
fallback stores (ensures compatibility for future cross-store operations)
Raises:
ValueError: If hash algorithm not supported by components or fallback stores
"""
# Validate hash algorithm support without creating a full engine
# (engine creation requires a graph which isn't available during store init)
self._validate_hash_algorithm_support()
# Check fallback stores
if check_fallback_stores:
for fallback in self.fallback_stores:
fallback.validate_hash_algorithm(check_fallback_stores=False)
metaxy.MetadataStore.allow_cross_project_writes
¶
allow_cross_project_writes() -> Iterator[None]
Context manager to temporarily allow cross-project writes.
This is an escape hatch for legitimate cross-project operations like migrations, where metadata needs to be written to features from different projects.
Example
Yields:
-
None(None) –The context manager temporarily disables project validation
Source code in src/metaxy/metadata_store/base.py
@contextmanager
def allow_cross_project_writes(self) -> Iterator[None]:
"""Context manager to temporarily allow cross-project writes.
This is an escape hatch for legitimate cross-project operations like migrations,
where metadata needs to be written to features from different projects.
Example:
```py
# During migration, allow writing to features from different projects
with store.allow_cross_project_writes():
store.write_metadata(feature_from_project_a, metadata_a)
store.write_metadata(feature_from_project_b, metadata_b)
```
Yields:
None: The context manager temporarily disables project validation
"""
previous_value = self._allow_cross_project_writes
try:
self._allow_cross_project_writes = True
yield
finally:
self._allow_cross_project_writes = previous_value
metaxy.MetadataStore.write_metadata_to_store
abstractmethod
¶
write_metadata_to_store(feature_key: FeatureKey, df: Frame, **kwargs: Any) -> None
Internal write implementation (backend-specific).
Backends may convert to their specific type if needed (e.g., Polars, Ibis).
Parameters:
-
feature_key(FeatureKey) –Feature key to write to
-
df(Frame) –Narwhals-compatible DataFrame with metadata to write
-
**kwargs(Any, default:{}) –Backend-specific parameters
Note: Subclasses implement this for their storage backend.
Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def write_metadata_to_store(
self,
feature_key: FeatureKey,
df: Frame,
**kwargs: Any,
) -> None:
"""
Internal write implementation (backend-specific).
Backends may convert to their specific type if needed (e.g., Polars, Ibis).
Args:
feature_key: Feature key to write to
df: [Narwhals](https://narwhals-dev.github.io/narwhals/)-compatible DataFrame with metadata to write
**kwargs: Backend-specific parameters
Note: Subclasses implement this for their storage backend.
"""
pass
metaxy.MetadataStore.drop_feature_metadata
¶
drop_feature_metadata(feature: CoercibleToFeatureKey) -> None
Drop all metadata for a feature.
This removes all stored metadata for the specified feature from the store. Useful for cleanup in tests or when re-computing feature metadata from scratch.
Warning
This operation is irreversible and will permanently delete all metadata for the specified feature.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature class or key to drop metadata for
Source code in src/metaxy/metadata_store/base.py
def drop_feature_metadata(self, feature: CoercibleToFeatureKey) -> None:
"""Drop all metadata for a feature.
This removes all stored metadata for the specified feature from the store.
Useful for cleanup in tests or when re-computing feature metadata from scratch.
Warning:
This operation is irreversible and will **permanently delete all metadata** for the specified feature.
Args:
feature: Feature class or key to drop metadata for
Example:
```py
store.drop_feature_metadata(MyFeature)
assert not store.has_feature(MyFeature)
```
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
self._drop_feature_metadata_impl(feature_key)
metaxy.MetadataStore.read_metadata_in_store
abstractmethod
¶
read_metadata_in_store(feature: CoercibleToFeatureKey, *, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, **kwargs: Any) -> LazyFrame[Any] | None
Read metadata from THIS store only without using any fallbacks stores.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to read metadata for
-
filters(Sequence[Expr] | None, default:None) –List of Narwhals filter expressions for this specific feature.
-
columns(Sequence[str] | None, default:None) –Subset of columns to return
-
**kwargs(Any, default:{}) –Backend-specific parameters
Returns:
Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def read_metadata_in_store(
self,
feature: CoercibleToFeatureKey,
*,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
**kwargs: Any,
) -> nw.LazyFrame[Any] | None:
"""
Read metadata from THIS store only without using any fallbacks stores.
Args:
feature: Feature to read metadata for
filters: List of Narwhals filter expressions for this specific feature.
columns: Subset of columns to return
**kwargs: Backend-specific parameters
Returns:
Narwhals LazyFrame with metadata, or None if feature not found in the store
"""
pass
metaxy.MetadataStore.has_feature
¶
has_feature(feature: CoercibleToFeatureKey, *, check_fallback: bool = False) -> bool
Check if feature exists in store.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to check
-
check_fallback(bool, default:False) –If True, also check fallback stores
Returns:
-
bool–True if feature exists, False otherwise
Source code in src/metaxy/metadata_store/base.py
def has_feature(
self,
feature: CoercibleToFeatureKey,
*,
check_fallback: bool = False,
) -> bool:
"""
Check if feature exists in store.
Args:
feature: Feature to check
check_fallback: If True, also check fallback stores
Returns:
True if feature exists, False otherwise
"""
self._check_open()
if self.read_metadata_in_store(feature) is not None:
return True
# Check fallback stores
if not check_fallback:
return self._has_feature_impl(feature)
else:
for store in self.fallback_stores:
if store.has_feature(feature, check_fallback=True):
return True
return False
metaxy.MetadataStore.display
abstractmethod
¶
display() -> str
Return a human-readable display string for this store.
Used in warnings, logs, and CLI output to identify the store.
Returns:
-
str–Display string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")
Source code in src/metaxy/metadata_store/base.py
metaxy.MetadataStore.get_store_metadata
¶
get_store_metadata(feature_key: CoercibleToFeatureKey) -> dict[str, Any]
Arbitrary key-value pairs with useful metadata like path in storage.
Useful for logging purposes. This method should not expose sensitive information.
Source code in src/metaxy/metadata_store/base.py
metaxy.MetadataStore.copy_metadata
¶
copy_metadata(from_store: MetadataStore, features: list[CoercibleToFeatureKey] | None = None, *, from_snapshot: str | None = None, filters: Mapping[str, Sequence[Expr]] | None = None, incremental: bool = True) -> dict[str, int]
Copy metadata from another store with fine-grained filtering.
This is a reusable method that can be called programmatically or from CLI/migrations. Copies metadata for specified features, preserving the original snapshot_version.
Parameters:
-
from_store(MetadataStore) –Source metadata store to copy from (must be opened)
-
features(list[CoercibleToFeatureKey] | None, default:None) –List of features to copy. Can be: - None: copies all features from source store - List of FeatureKey or Feature classes: copies specified features
-
from_snapshot(str | None, default:None) –Snapshot version to filter source data by. If None, uses latest snapshot from source store. Only rows with this snapshot_version will be copied. The snapshot_version is preserved in the destination store.
-
filters(Mapping[str, Sequence[Expr]] | None, default:None) –Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions. These filters are applied when reading from the source store. Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
-
incremental(bool, default:True) –If True (default), filter out rows that already exist in the destination store by performing an anti-join on sample_uid for the same snapshot_version.
The implementation uses an anti-join: source LEFT ANTI JOIN destination ON sample_uid filtered by snapshot_version.
Disabling incremental (incremental=False) may improve performance when: - You know the destination is empty or has no overlap with source - The destination store uses deduplication
When incremental=False, it's the user's responsibility to avoid duplicates or configure deduplication at the storage layer.
Returns:
Raises:
-
ValueError–If from_store or self (destination) is not open
-
FeatureNotFoundError–If a specified feature doesn't exist in source store
Examples:
# Simple: copy all features from latest snapshot
stats = dest_store.copy_metadata(from_store=source_store)
# Copy specific features from a specific snapshot
stats = dest_store.copy_metadata(
from_store=source_store,
features=[FeatureKey(["my_feature"])],
from_snapshot="abc123",
)
# Copy with filters
stats = dest_store.copy_metadata(
from_store=source_store,
filters={"my/feature": [nw.col("sample_uid").is_in(["s1", "s2"])]},
)
# Copy specific features with filters
stats = dest_store.copy_metadata(
from_store=source_store,
features=[
FeatureKey(["feature_a"]),
FeatureKey(["feature_b"]),
],
filters={
"feature_a": [nw.col("field_a") > 10, nw.col("sample_uid").is_in(["s1", "s2"])],
"feature_b": [nw.col("field_b") < 30],
},
)
Source code in src/metaxy/metadata_store/base.py
def copy_metadata(
self,
from_store: MetadataStore,
features: list[CoercibleToFeatureKey] | None = None,
*,
from_snapshot: str | None = None,
filters: Mapping[str, Sequence[nw.Expr]] | None = None,
incremental: bool = True,
) -> dict[str, int]:
"""Copy metadata from another store with fine-grained filtering.
This is a reusable method that can be called programmatically or from CLI/migrations.
Copies metadata for specified features, preserving the original snapshot_version.
Args:
from_store: Source metadata store to copy from (must be opened)
features: List of features to copy. Can be:
- None: copies all features from source store
- List of FeatureKey or Feature classes: copies specified features
from_snapshot: Snapshot version to filter source data by. If None, uses latest snapshot
from source store. Only rows with this snapshot_version will be copied.
The snapshot_version is preserved in the destination store.
filters: Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions.
These filters are applied when reading from the source store.
Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
incremental: If True (default), filter out rows that already exist in the destination
store by performing an anti-join on sample_uid for the same snapshot_version.
The implementation uses an anti-join: source LEFT ANTI JOIN destination ON sample_uid
filtered by snapshot_version.
Disabling incremental (incremental=False) may improve performance when:
- You know the destination is empty or has no overlap with source
- The destination store uses deduplication
When incremental=False, it's the user's responsibility to avoid duplicates or
configure deduplication at the storage layer.
Returns:
Dict with statistics: {"features_copied": int, "rows_copied": int}
Raises:
ValueError: If from_store or self (destination) is not open
FeatureNotFoundError: If a specified feature doesn't exist in source store
Examples:
```py
# Simple: copy all features from latest snapshot
stats = dest_store.copy_metadata(from_store=source_store)
```
```py
# Copy specific features from a specific snapshot
stats = dest_store.copy_metadata(
from_store=source_store,
features=[FeatureKey(["my_feature"])],
from_snapshot="abc123",
)
```
```py
# Copy with filters
stats = dest_store.copy_metadata(
from_store=source_store,
filters={"my/feature": [nw.col("sample_uid").is_in(["s1", "s2"])]},
)
```
```py
# Copy specific features with filters
stats = dest_store.copy_metadata(
from_store=source_store,
features=[
FeatureKey(["feature_a"]),
FeatureKey(["feature_b"]),
],
filters={
"feature_a": [nw.col("field_a") > 10, nw.col("sample_uid").is_in(["s1", "s2"])],
"feature_b": [nw.col("field_b") < 30],
},
)
```
"""
import logging
logger = logging.getLogger(__name__)
# Validate destination store is open
if not self._is_open:
raise ValueError(
'Destination store must be opened with store.open("write") before use'
)
# Auto-open source store if not already open
if not from_store._is_open:
with from_store.open("read"):
return self._copy_metadata_impl(
from_store=from_store,
features=features,
from_snapshot=from_snapshot,
filters=filters,
incremental=incremental,
logger=logger,
)
else:
return self._copy_metadata_impl(
from_store=from_store,
features=features,
from_snapshot=from_snapshot,
filters=filters,
incremental=incremental,
logger=logger,
)
metaxy.metadata_store.base.VersioningEngineOptions
module-attribute
¶
VersioningEngineOptions = Literal['auto', 'native', 'polars']
Base Configuration Class¶
The following base configuration class is typically used by child metadata stores:
metaxy.metadata_store.base.MetadataStoreConfig
¶
Bases: BaseSettings
Base configuration class for metadata stores.
This class defines common configuration fields shared by all metadata store types. Store-specific config classes should inherit from this and add their own fields.
Example
Configuration¶
The base MetadataStoreConfig class injects the following configuration options:
fallback_stores¶
List of fallback store names to search when features are not found in the current store.
Type: list[str]
hash_algorithm¶
Hash algorithm for versioning. If None, uses store's default.
Type: metaxy.versioning.types.HashAlgorithm | None
versioning_engine¶
Which versioning engine to use: 'auto' (prefer native), 'native', or 'polars'.
Type: Literal['auto', 'native', 'polars'] | Default: "auto"
Project Write Validation¶
By default, MetadataStore raises a ValueError when attempting to write to a project that doesn't match the expected project from MetaxyConfig.get().project.
For legitimate cross-project operations (such as migrations that need to update features across multiple projects), use MetadataStore.allow_cross_project_writes: