Metadata Stores¶
Metaxy abstracts interactions with metadata behind an interface called MetadataStore.
Users can extend this class to implement support for arbitrary metadata storage such as databases, lakehouse formats, or really any kind of external system.
Here are some of the built-in metadata store types:
Databases¶
-
IbisMetadataStore(a base class) - see Ibis integration
Storage Only¶
The full list can be found here
Metadata Store Interface¶
metaxy.MetadataStore
¶
MetadataStore(*, versioning_engine_cls: type[VersioningEngineT], hash_algorithm: HashAlgorithm | None = None, versioning_engine: VersioningEngineOptions = 'auto', fallback_stores: list[MetadataStore] | None = None, auto_create_tables: bool | None = None, materialization_id: str | None = None)
Bases: ABC
Abstract base class for metadata storage backends.
Parameters:
-
hash_algorithm(HashAlgorithm | None, default:None) –Hash algorithm to use for the versioning engine.
-
versioning_engine(VersioningEngineOptions, default:'auto') –Which versioning engine to use.
-
"auto": Prefer the store's native engine, fall back to Polars if needed
-
"native": Always use the store's native engine, raise
VersioningEngineMismatchErrorif provided dataframes are incompatible -
"polars": Always use the Polars engine
-
-
fallback_stores(list[MetadataStore] | None, default:None) –Ordered list of read-only fallback stores. Used when upstream features are not in this store.
VersioningEngineMismatchErroris not raised when reading from fallback stores. -
auto_create_tables(bool | None, default:None) –If True, automatically create tables when opening the store. If None (default), reads from global MetaxyConfig (which reads from METAXY_AUTO_CREATE_TABLES env var). If False, never auto-create tables.
Warning
Auto-create is intended for development/testing only. Use proper database migration tools like Alembic for production deployments.
-
materialization_id(str | None, default:None) –Optional external orchestration ID. If provided, all metadata writes will include this ID in the
metaxy_materialization_idcolumn. Can be overridden perMetadataStore.write_metadatacall.
Raises:
-
ValueError–If fallback stores use different hash algorithms or truncation lengths
-
VersioningEngineMismatchError–If a user-provided dataframe has a wrong implementation and versioning_engine is set to
native
Source code in src/metaxy/metadata_store/base.py
def __init__(
self,
*,
versioning_engine_cls: type[VersioningEngineT],
hash_algorithm: HashAlgorithm | None = None,
versioning_engine: VersioningEngineOptions = "auto",
fallback_stores: list[MetadataStore] | None = None,
auto_create_tables: bool | None = None,
materialization_id: str | None = None,
):
"""
Initialize the metadata store.
Args:
hash_algorithm: Hash algorithm to use for the versioning engine.
versioning_engine: Which versioning engine to use.
- "auto": Prefer the store's native engine, fall back to Polars if needed
- "native": Always use the store's native engine, raise `VersioningEngineMismatchError`
if provided dataframes are incompatible
- "polars": Always use the Polars engine
fallback_stores: Ordered list of read-only fallback stores.
Used when upstream features are not in this store.
`VersioningEngineMismatchError` is not raised when reading from fallback stores.
auto_create_tables: If True, automatically create tables when opening the store.
If None (default), reads from global MetaxyConfig (which reads from METAXY_AUTO_CREATE_TABLES env var).
If False, never auto-create tables.
!!! warning
Auto-create is intended for development/testing only.
Use proper database migration tools like Alembic for production deployments.
materialization_id: Optional external orchestration ID.
If provided, all metadata writes will include this ID in the `metaxy_materialization_id` column.
Can be overridden per [`MetadataStore.write_metadata`][metaxy.MetadataStore.write_metadata] call.
Raises:
ValueError: If fallback stores use different hash algorithms or truncation lengths
VersioningEngineMismatchError: If a user-provided dataframe has a wrong implementation
and versioning_engine is set to `native`
"""
# Initialize state early so properties can check it
self._is_open = False
self._context_depth = 0
self._versioning_engine = versioning_engine
self._allow_cross_project_writes = False
self._materialization_id = materialization_id
self._open_cm: AbstractContextManager[Self] | None = (
None # Track the open() context manager
)
self.versioning_engine_cls = versioning_engine_cls
# Resolve auto_create_tables from global config if not explicitly provided
if auto_create_tables is None:
from metaxy.config import MetaxyConfig
self.auto_create_tables = MetaxyConfig.get().auto_create_tables
else:
self.auto_create_tables = auto_create_tables
# Use store's default algorithm if not specified
if hash_algorithm is None:
hash_algorithm = self._get_default_hash_algorithm()
self.hash_algorithm = hash_algorithm
self.fallback_stores = fallback_stores or []
Attributes¶
metaxy.MetadataStore.materialization_id
property
¶
materialization_id: str | None
The external orchestration ID for this store instance.
If set, all metadata writes include this ID in the metaxy_materialization_id column,
allowing filtering of rows written during a specific materialization run.
Functions¶
metaxy.MetadataStore.resolve_update
¶
resolve_update(feature: CoercibleToFeatureKey, *, samples: IntoFrame | Frame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: Literal[False] = False, versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> Increment
resolve_update(feature: CoercibleToFeatureKey, *, samples: IntoFrame | Frame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: Literal[True], versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> LazyIncrement
resolve_update(feature: CoercibleToFeatureKey, *, samples: IntoFrame | Frame | None = None, filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, lazy: bool = False, versioning_engine: Literal['auto', 'native', 'polars'] | None = None, skip_comparison: bool = False, **kwargs: Any) -> Increment | LazyIncrement
Calculate an incremental update for a feature.
This is the main workhorse in Metaxy.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature class to resolve updates for
-
samples(IntoFrame | Frame | None, default:None) –A dataframe with joined upstream metadata and
"metaxy_provenance_by_field"column set. When provided,MetadataStoreskips loading upstream feature metadata and provenance calculations.Required for root features
Metaxy doesn't know how to populate input metadata for root features, so
samplesargument for must be provided for them.Tip
For non-root features, use
samplesto customize the automatic upstream loading and field provenance calculation. For example, it can be used to requires processing for specific sample IDs.Setting this parameter during normal operations is not required.
-
filters(Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None, default:None) –A mapping from feature keys to lists of Narwhals filter expressions. Keys can be feature classes, FeatureKey objects, or string paths. Applied at read-time. May filter the current feature, in this case it will also be applied to
samples(if provided). Example:{UpstreamFeature: [nw.col("x") > 10], ...} -
global_filters(Sequence[Expr] | None, default:None) –A list of Narwhals filter expressions applied to all features. These filters are combined with any feature-specific filters from
filters. Useful for filtering by common columns likesample_uidacross all features. Example:[nw.col("sample_uid").is_in(["s1", "s2"])] -
lazy(bool, default:False) –Whether to return a metaxy.versioning.types.LazyIncrement or a metaxy.versioning.types.Increment.
-
versioning_engine(Literal['auto', 'native', 'polars'] | None, default:None) –Override the store's versioning engine for this operation.
-
skip_comparison(bool, default:False) –If True, skip the increment comparison logic and return all upstream samples in
Increment.added. Thechangedandremovedframes will be empty.
Raises:
-
ValueError–If no
samplesdataframe has been provided when resolving an update for a root feature. -
VersioningEngineMismatchError–If
versioning_enginehas been set to"native"and a dataframe of a different implementation has been encountered duringresolve_update.
With a root feature
Source code in src/metaxy/metadata_store/base.py
def resolve_update(
self,
feature: CoercibleToFeatureKey,
*,
samples: IntoFrame | Frame | None = None,
filters: Mapping[CoercibleToFeatureKey, Sequence[nw.Expr]] | None = None,
global_filters: Sequence[nw.Expr] | None = None,
lazy: bool = False,
versioning_engine: Literal["auto", "native", "polars"] | None = None,
skip_comparison: bool = False,
**kwargs: Any,
) -> Increment | LazyIncrement:
"""Calculate an incremental update for a feature.
This is the main workhorse in Metaxy.
Args:
feature: Feature class to resolve updates for
samples: A dataframe with joined upstream metadata and `"metaxy_provenance_by_field"` column set.
When provided, `MetadataStore` skips loading upstream feature metadata and provenance calculations.
!!! info "Required for root features"
Metaxy doesn't know how to populate input metadata for root features,
so `samples` argument for **must** be provided for them.
!!! tip
For non-root features, use `samples` to customize the automatic upstream loading and field provenance calculation.
For example, it can be used to requires processing for specific sample IDs.
Setting this parameter during normal operations is not required.
filters: A mapping from feature keys to lists of Narwhals filter expressions.
Keys can be feature classes, FeatureKey objects, or string paths.
Applied at read-time. May filter the current feature,
in this case it will also be applied to `samples` (if provided).
Example: `{UpstreamFeature: [nw.col("x") > 10], ...}`
global_filters: A list of Narwhals filter expressions applied to all features.
These filters are combined with any feature-specific filters from `filters`.
Useful for filtering by common columns like `sample_uid` across all features.
Example: `[nw.col("sample_uid").is_in(["s1", "s2"])]`
lazy: Whether to return a [metaxy.versioning.types.LazyIncrement][] or a [metaxy.versioning.types.Increment][].
versioning_engine: Override the store's versioning engine for this operation.
skip_comparison: If True, skip the increment comparison logic and return all
upstream samples in `Increment.added`. The `changed` and `removed` frames will
be empty.
Raises:
ValueError: If no `samples` dataframe has been provided when resolving an update for a root feature.
VersioningEngineMismatchError: If `versioning_engine` has been set to `"native"`
and a dataframe of a different implementation has been encountered during `resolve_update`.
!!! example "With a root feature"
```py
samples = pl.DataFrame({
"sample_uid": [1, 2, 3],
"metaxy_provenance_by_field": [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
})
result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
```
"""
import narwhals as nw
# Convert samples to Narwhals frame if not already
samples_nw: nw.DataFrame[Any] | nw.LazyFrame[Any] | None = None
if samples is not None:
if isinstance(samples, (nw.DataFrame, nw.LazyFrame)):
samples_nw = samples
else:
samples_nw = nw.from_native(samples) # ty: ignore[invalid-assignment]
# Normalize filter keys to FeatureKey
normalized_filters: dict[FeatureKey, list[nw.Expr]] = {}
if filters:
for key, exprs in filters.items():
feature_key = self._resolve_feature_key(key)
normalized_filters[feature_key] = list(exprs)
# Convert global_filters to a list for easy concatenation
global_filter_list = list(global_filters) if global_filters else []
feature_key = self._resolve_feature_key(feature)
graph = current_graph()
plan = graph.get_feature_plan(feature_key)
# Root features without samples: error (samples required)
if not plan.deps and samples_nw is None:
raise ValueError(
f"Feature {feature_key} has no upstream dependencies (root feature). "
f"Must provide 'samples' parameter with sample_uid and {METAXY_PROVENANCE_BY_FIELD} columns. "
f"Root features require manual {METAXY_PROVENANCE_BY_FIELD} computation."
)
# Combine feature-specific filters with global filters
current_feature_filters = [
*normalized_filters.get(feature_key, []),
*global_filter_list,
]
# Read current metadata with deduplication (latest_only=True by default)
# Use allow_fallback=False since we only want metadata from THIS store
# to determine what needs to be updated locally
try:
current_metadata: nw.LazyFrame[Any] | None = self.read_metadata(
feature_key,
filters=current_feature_filters if current_feature_filters else None,
allow_fallback=False,
current_only=True, # filters by current feature_version
latest_only=True, # deduplicates by id_columns, keeping latest
)
except FeatureNotFoundError:
current_metadata = None
upstream_by_key: dict[FeatureKey, nw.LazyFrame[Any]] = {}
filters_by_key: dict[FeatureKey, list[nw.Expr]] = {}
# if samples are provided, use them as source of truth for upstream data
if samples_nw is not None:
# Apply filters to samples if any
filtered_samples = samples_nw
if current_feature_filters:
filtered_samples = samples_nw.filter(current_feature_filters)
# fill in METAXY_PROVENANCE column if it's missing (e.g. for root features)
samples_nw = self.hash_struct_version_column(
plan,
df=filtered_samples,
struct_column=METAXY_PROVENANCE_BY_FIELD,
hash_column=METAXY_PROVENANCE,
)
# For root features, add data_version columns if they don't exist
# (root features have no computation, so data_version equals provenance)
# Use collect_schema().names() to avoid PerformanceWarning on lazy frames
if METAXY_DATA_VERSION_BY_FIELD not in samples_nw.collect_schema().names():
samples_nw = samples_nw.with_columns(
nw.col(METAXY_PROVENANCE_BY_FIELD).alias(
METAXY_DATA_VERSION_BY_FIELD
),
nw.col(METAXY_PROVENANCE).alias(METAXY_DATA_VERSION),
)
else:
for upstream_spec in plan.deps or []:
# Combine feature-specific filters with global filters for upstream
upstream_filters = [
*normalized_filters.get(upstream_spec.key, []),
*global_filter_list,
]
upstream_feature_metadata = self.read_metadata(
upstream_spec.key,
filters=upstream_filters,
)
if upstream_feature_metadata is not None:
upstream_by_key[upstream_spec.key] = upstream_feature_metadata
# determine which implementation to use for resolving the increment
# consider (1) whether all upstream metadata has been loaded with the native implementation
# (2) if samples have native implementation
# Use parameter if provided, otherwise use store default
engine_mode = (
versioning_engine
if versioning_engine is not None
else self._versioning_engine
)
# If "polars" mode, force Polars immediately
if engine_mode == "polars":
implementation = nw.Implementation.POLARS
switched_to_polars = True
else:
implementation = self.native_implementation()
switched_to_polars = False
for upstream_key, df in upstream_by_key.items():
if df.implementation != implementation:
switched_to_polars = True
# Only raise error in "native" mode if no fallback stores configured.
# If fallback stores exist, the implementation mismatch indicates data came
# from fallback (different implementation), which is legitimate fallback access.
# If data were local, it would have the native implementation.
if engine_mode == "native" and not self.fallback_stores:
raise VersioningEngineMismatchError(
f"versioning_engine='native' but upstream feature `{upstream_key.to_string()}` "
f"has implementation {df.implementation}, expected {self.native_implementation()}"
)
elif engine_mode == "auto" or (
engine_mode == "native" and self.fallback_stores
):
PolarsMaterializationWarning.warn_on_implementation_mismatch(
expected=self.native_implementation(),
actual=df.implementation,
message=f"Using Polars for resolving the increment instead. This was caused by upstream feature `{upstream_key.to_string()}`.",
)
implementation = nw.Implementation.POLARS
break
if (
samples_nw is not None
and samples_nw.implementation != self.native_implementation()
):
if not switched_to_polars:
if engine_mode == "native":
# Always raise error for samples with wrong implementation, regardless
# of fallback stores, because samples come from user argument, not from fallback
raise VersioningEngineMismatchError(
f"versioning_engine='native' but provided `samples` have implementation {samples_nw.implementation}, "
f"expected {self.native_implementation()}"
)
elif engine_mode == "auto":
PolarsMaterializationWarning.warn_on_implementation_mismatch(
expected=self.native_implementation(),
actual=samples_nw.implementation,
message=f"Provided `samples` have implementation {samples_nw.implementation}. Using Polars for resolving the increment instead.",
)
implementation = nw.Implementation.POLARS
switched_to_polars = True
if switched_to_polars:
if current_metadata:
current_metadata = switch_implementation_to_polars(current_metadata)
if samples_nw:
samples_nw = switch_implementation_to_polars(samples_nw)
for upstream_key, df in upstream_by_key.items():
upstream_by_key[upstream_key] = switch_implementation_to_polars(df)
with self.create_versioning_engine(
plan=plan, implementation=implementation
) as engine:
if skip_comparison:
# Skip comparison: return all upstream samples as added
if samples_nw is not None:
# Root features or user-provided samples: use samples directly
# Note: samples already has metaxy_provenance computed
added = samples_nw.lazy()
input_df = None # Root features have no upstream input
else:
# Non-root features: load all upstream with provenance
added = engine.load_upstream_with_provenance(
upstream=upstream_by_key,
hash_algo=self.hash_algorithm,
filters=filters_by_key,
)
input_df = (
added # Input is the same as added when skipping comparison
)
changed = None
removed = None
else:
added, changed, removed, input_df = (
engine.resolve_increment_with_provenance(
current=current_metadata,
upstream=upstream_by_key,
hash_algorithm=self.hash_algorithm,
filters=filters_by_key,
sample=samples_nw.lazy() if samples_nw is not None else None,
)
)
# Convert None to empty DataFrames
if changed is None:
changed = empty_frame_like(added)
if removed is None:
removed = empty_frame_like(added)
if lazy:
return LazyIncrement(
added=added
if isinstance(added, nw.LazyFrame)
else nw.from_native(added),
changed=changed
if isinstance(changed, nw.LazyFrame)
else nw.from_native(changed),
removed=removed
if isinstance(removed, nw.LazyFrame)
else nw.from_native(removed),
input=input_df
if input_df is None or isinstance(input_df, nw.LazyFrame)
else nw.from_native(input_df),
)
else:
return Increment(
added=added.collect() if isinstance(added, nw.LazyFrame) else added,
changed=changed.collect()
if isinstance(changed, nw.LazyFrame)
else changed,
removed=removed.collect()
if isinstance(removed, nw.LazyFrame)
else removed,
)
metaxy.MetadataStore.compute_provenance
¶
compute_provenance(feature: CoercibleToFeatureKey, df: FrameT) -> FrameT
Compute provenance columns for a DataFrame with pre-joined upstream data.
Note
This method may be useful in very rare cases.
Rely on MetadataStore.resolve_update instead.
Use this method when you perform custom joins outside of Metaxy's auto-join system but still want Metaxy to compute provenance. The method computes metaxy_provenance_by_field, metaxy_provenance, metaxy_data_version_by_field, and metaxy_data_version columns based on the upstream metadata.
Info
The input DataFrame must contain the renamed metaxy_data_version_by_field
columns from each upstream feature. The naming convention follows the pattern
metaxy_data_version_by_field__<feature_key.to_column_suffix()>. For example, for an
upstream feature with key ["video", "raw"], the column should be named
metaxy_data_version_by_field__video_raw.
Parameters:
-
feature(CoercibleToFeatureKey) –The feature to compute provenance for.
-
df(FrameT) –A DataFrame containing pre-joined upstream data with renamed metaxy_data_version_by_field columns from each upstream feature.
Returns:
-
FrameT–The input DataFrame with provenance columns added. Returns the same
-
FrameT–frame type as the input, either an eager DataFrame or a LazyFrame.
Raises:
-
StoreNotOpenError–If the store is not open.
-
ValueError–If required upstream
metaxy_data_version_by_fieldcolumns are missing from the DataFrame.
Example
# Read upstream metadata
video_df = store.read_metadata(VideoFeature).collect()
audio_df = store.read_metadata(AudioFeature).collect()
# Rename data_version_by_field columns to the expected convention
video_df = video_df.rename({
"metaxy_data_version_by_field": "metaxy_data_version_by_field__video_raw"
})
audio_df = audio_df.rename({
"metaxy_data_version_by_field": "metaxy_data_version_by_field__audio_raw"
})
# Perform custom join
joined = video_df.join(audio_df, on="sample_uid", how="inner")
# Compute provenance
with_provenance = store.compute_provenance(MyFeature, joined)
# Pass to resolve_update
increment = store.resolve_update(MyFeature, samples=with_provenance)
Source code in src/metaxy/metadata_store/base.py
def compute_provenance(
self,
feature: CoercibleToFeatureKey,
df: FrameT,
) -> FrameT:
"""Compute provenance columns for a DataFrame with pre-joined upstream data.
!!! note
This method may be useful in very rare cases.
Rely on [`MetadataStore.resolve_update`][metaxy.metadata_store.base.MetadataStore.resolve_update] instead.
Use this method when you perform custom joins outside of Metaxy's auto-join
system but still want Metaxy to compute provenance. The method computes
metaxy_provenance_by_field, metaxy_provenance, metaxy_data_version_by_field,
and metaxy_data_version columns based on the upstream metadata.
!!! info
The input DataFrame must contain the renamed metaxy_data_version_by_field
columns from each upstream feature. The naming convention follows the pattern
`metaxy_data_version_by_field__<feature_key.to_column_suffix()>`. For example, for an
upstream feature with key `["video", "raw"]`, the column should be named
`metaxy_data_version_by_field__video_raw`.
Args:
feature: The feature to compute provenance for.
df: A DataFrame containing pre-joined upstream data with renamed
metaxy_data_version_by_field columns from each upstream feature.
Returns:
The input DataFrame with provenance columns added. Returns the same
frame type as the input, either an eager DataFrame or a LazyFrame.
Raises:
StoreNotOpenError: If the store is not open.
ValueError: If required upstream `metaxy_data_version_by_field` columns
are missing from the DataFrame.
Example:
```py
# Read upstream metadata
video_df = store.read_metadata(VideoFeature).collect()
audio_df = store.read_metadata(AudioFeature).collect()
# Rename data_version_by_field columns to the expected convention
video_df = video_df.rename({
"metaxy_data_version_by_field": "metaxy_data_version_by_field__video_raw"
})
audio_df = audio_df.rename({
"metaxy_data_version_by_field": "metaxy_data_version_by_field__audio_raw"
})
# Perform custom join
joined = video_df.join(audio_df, on="sample_uid", how="inner")
# Compute provenance
with_provenance = store.compute_provenance(MyFeature, joined)
# Pass to resolve_update
increment = store.resolve_update(MyFeature, samples=with_provenance)
```
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
graph = current_graph()
plan = graph.get_feature_plan(feature_key)
# Use native implementation if DataFrame matches, otherwise fall back to Polars
implementation = self.native_implementation()
if df.implementation != implementation:
implementation = nw.Implementation.POLARS
df = switch_implementation_to_polars(df) # ty: ignore[no-matching-overload]
with self.create_versioning_engine(
plan=plan, implementation=implementation
) as engine:
# Validate required upstream columns exist
expected_columns = {
dep.feature: engine.get_renamed_data_version_by_field_col(dep.feature)
for dep in (plan.feature_deps or [])
}
df_columns = set(df.collect_schema().names()) # ty: ignore[invalid-argument-type]
missing_columns = [
f"{col} (from upstream feature {key.to_string()})"
for key, col in expected_columns.items()
if col not in df_columns
]
if missing_columns:
raise ValueError(
f"DataFrame is missing required upstream columns for computing "
f"provenance of feature {feature_key.to_string()}. "
f"Missing columns: {missing_columns}. "
f"Make sure to rename metaxy_data_version_by_field columns from "
f"each upstream feature using the pattern "
f"metaxy_data_version_by_field__<feature_key.table_name>."
)
return engine.compute_provenance_columns(df, hash_algo=self.hash_algorithm) # ty: ignore[invalid-argument-type]
metaxy.MetadataStore.read_metadata
¶
read_metadata(feature: CoercibleToFeatureKey, *, feature_version: str | None = None, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, allow_fallback: bool = True, current_only: bool = True, latest_only: bool = True) -> LazyFrame[Any]
Read metadata with optional fallback to upstream stores.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to read metadata for
-
feature_version(str | None, default:None) –Explicit feature_version to filter by (mutually exclusive with current_only=True)
-
filters(Sequence[Expr] | None, default:None) –Sequence of Narwhals filter expressions to apply to this feature. Example:
[nw.col("x") > 10, nw.col("y") < 5] -
columns(Sequence[str] | None, default:None) –Subset of columns to include. Metaxy's system columns are always included.
-
allow_fallback(bool, default:True) –If
True, check fallback stores on local miss -
current_only(bool, default:True) –If
True, only return rows with current feature_version -
latest_only(bool, default:True) –Whether to deduplicate samples within
id_columnsgroups ordered bymetaxy_created_at.
Returns:
Raises:
-
FeatureNotFoundError–If feature not found in any store
-
SystemDataNotFoundError–When attempting to read non-existent Metaxy system data
-
ValueError–If both feature_version and current_only=True are provided
Info
When this method is called with default arguments, it will return the latest (by metaxy_created_at)
metadata for the current feature version. Therefore, it's perfectly suitable for most use cases.
Warning
The order of rows is not guaranteed.
Source code in src/metaxy/metadata_store/base.py
def read_metadata(
self,
feature: CoercibleToFeatureKey,
*,
feature_version: str | None = None,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
current_only: bool = True,
latest_only: bool = True,
) -> nw.LazyFrame[Any]:
"""
Read metadata with optional fallback to upstream stores.
Args:
feature: Feature to read metadata for
feature_version: Explicit feature_version to filter by (mutually exclusive with current_only=True)
filters: Sequence of Narwhals filter expressions to apply to this feature.
Example: `[nw.col("x") > 10, nw.col("y") < 5]`
columns: Subset of columns to include. Metaxy's system columns are always included.
allow_fallback: If `True`, check fallback stores on local miss
current_only: If `True`, only return rows with current feature_version
latest_only: Whether to deduplicate samples within `id_columns` groups ordered by `metaxy_created_at`.
Returns:
Narwhals LazyFrame with metadata
Raises:
FeatureNotFoundError: If feature not found in any store
SystemDataNotFoundError: When attempting to read non-existent Metaxy system data
ValueError: If both feature_version and current_only=True are provided
!!! info
When this method is called with default arguments, it will return the latest (by `metaxy_created_at`)
metadata for the current feature version. Therefore, it's perfectly suitable for most use cases.
!!! warning
The order of rows is not guaranteed.
"""
self._check_open()
filters = filters or []
columns = columns or []
feature_key = self._resolve_feature_key(feature)
is_system_table = self._is_system_table(feature_key)
# Validate mutually exclusive parameters
if feature_version is not None and current_only:
raise ValueError(
"Cannot specify both feature_version and current_only=True. "
"Use current_only=False with feature_version parameter."
)
# Add feature_version filter only when needed
if current_only or feature_version is not None and not is_system_table:
version_filter = nw.col(METAXY_FEATURE_VERSION) == (
current_graph().get_feature_version(feature_key)
if current_only
else feature_version
)
filters = [version_filter, *filters]
if columns and not is_system_table:
# Add only system columns that aren't already in the user's columns list
columns_set = set(columns)
missing_system_cols = [
c for c in ALL_SYSTEM_COLUMNS if c not in columns_set
]
read_columns = [*columns, *missing_system_cols]
else:
read_columns = None
lazy_frame = None
try:
lazy_frame = self.read_metadata_in_store(
feature, filters=filters, columns=read_columns
)
except FeatureNotFoundError as e:
# do not read system features from fallback stores
if is_system_table:
raise SystemDataNotFoundError(
f"System Metaxy data with key {feature_key} is missing in {self.display()}. Invoke `metaxy graph push` before attempting to read system data."
) from e
# Handle case where read_metadata_in_store returns None (no exception raised)
if lazy_frame is None and is_system_table:
raise SystemDataNotFoundError(
f"System Metaxy data with key {feature_key} is missing in {self.display()}. Invoke `metaxy graph push` before attempting to read system data."
)
if lazy_frame is not None and not is_system_table and latest_only:
from metaxy.models.constants import METAXY_CREATED_AT
# Apply deduplication
lazy_frame = self.versioning_engine_cls.keep_latest_by_group(
df=lazy_frame,
group_columns=list(
self._resolve_feature_plan(feature_key).feature.id_columns
),
timestamp_column=METAXY_CREATED_AT,
)
if lazy_frame is not None:
# After dedup, filter to requested columns if specified
if columns:
lazy_frame = lazy_frame.select(columns)
return lazy_frame
# Try fallback stores (opened on demand)
if allow_fallback:
for store in self.fallback_stores:
try:
# Open fallback store on demand for reading
with store:
# Use full read_metadata to handle nested fallback chains
return store.read_metadata(
feature,
feature_version=feature_version,
filters=filters,
columns=columns,
allow_fallback=True,
current_only=current_only,
latest_only=latest_only,
)
except FeatureNotFoundError:
# Try next fallback store
continue
# Not found anywhere
raise FeatureNotFoundError(
f"Feature {feature_key.to_string()} not found in store"
+ (" or fallback stores" if allow_fallback else "")
)
metaxy.MetadataStore.write_metadata
¶
write_metadata(feature: CoercibleToFeatureKey, df: IntoFrame, materialization_id: str | None = None) -> None
Write metadata for a feature (append-only by design).
Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to write metadata for
-
df(IntoFrame) –Metadata DataFrame of any type supported by Narwhals. Must have
metaxy_provenance_by_fieldcolumn of type Struct with fields matching feature's fields. Optionally, may also containmetaxy_data_version_by_field. -
materialization_id(str | None, default:None) –Optional external orchestration ID for this write. Overrides the store's default
materialization_idif provided. Useful for tracking which orchestration run produced this metadata.
Raises:
-
MetadataSchemaError–If DataFrame schema is invalid
-
StoreNotOpenError–If store is not open
-
ValueError–If writing to a feature from a different project than expected
Note
-
Must be called within a
MetadataStore.open(mode="write")context manager. -
Metaxy always performs an "append" operation. Metadata is never deleted or mutated.
-
Fallback stores are never used for writes.
-
Features from other Metaxy projects cannot be written to, unless project validation has been disabled with MetadataStore.allow_cross_project_writes.
Source code in src/metaxy/metadata_store/base.py
def write_metadata(
self,
feature: CoercibleToFeatureKey,
df: IntoFrame,
materialization_id: str | None = None,
) -> None:
"""
Write metadata for a feature (append-only by design).
Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.
Args:
feature: Feature to write metadata for
df: Metadata DataFrame of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).
Must have `metaxy_provenance_by_field` column of type Struct with fields matching feature's fields.
Optionally, may also contain `metaxy_data_version_by_field`.
materialization_id: Optional external orchestration ID for this write.
Overrides the store's default `materialization_id` if provided.
Useful for tracking which orchestration run produced this metadata.
Raises:
MetadataSchemaError: If DataFrame schema is invalid
StoreNotOpenError: If store is not open
ValueError: If writing to a feature from a different project than expected
Note:
- Must be called within a `MetadataStore.open(mode="write")` context manager.
- Metaxy always performs an "append" operation. Metadata is never deleted or mutated.
- Fallback stores are never used for writes.
- Features from other Metaxy projects cannot be written to, unless project validation has been disabled with [MetadataStore.allow_cross_project_writes][].
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
is_system_table = self._is_system_table(feature_key)
# Validate project for non-system tables
if not is_system_table:
self._validate_project_write(feature)
# Convert Polars to Narwhals to Polars if needed
# if isinstance(df_nw, (pl.DataFrame, pl.LazyFrame)):
df_nw = nw.from_native(df)
assert isinstance(df_nw, (nw.DataFrame, nw.LazyFrame)), (
f"df must be a Narwhals DataFrame, got {type(df_nw)}"
)
# For system tables, write directly without feature_version tracking
if is_system_table:
self._validate_schema_system_table(df_nw)
self.write_metadata_to_store(feature_key, df_nw)
return
# Use collect_schema().names() to avoid PerformanceWarning on lazy frames
if METAXY_PROVENANCE_BY_FIELD not in df_nw.collect_schema().names():
from metaxy.metadata_store.exceptions import MetadataSchemaError
raise MetadataSchemaError(
f"DataFrame must have '{METAXY_PROVENANCE_BY_FIELD}' column"
)
# Add all required system columns
# warning: for dataframes that do not match the native MetadataStore implementation
# and are missing the METAXY_DATA_VERSION column, this call will lead to materializing the equivalent Polars DataFrame
# while calculating the missing METAXY_DATA_VERSION column
df_nw = self._add_system_columns(
df_nw, feature, materialization_id=materialization_id
)
self._validate_schema(df_nw)
self.write_metadata_to_store(feature_key, df_nw)
metaxy.MetadataStore.write_metadata_multi
¶
write_metadata_multi(metadata: Mapping[Any, IntoFrame], materialization_id: str | None = None) -> None
Write metadata for multiple features in reverse topological order.
Processes features so that dependents are written before their dependencies. This ordering ensures that downstream features are written first, which can be useful for certain data consistency requirements or when features need to be processed in a specific order.
Parameters:
-
metadata(Mapping[Any, IntoFrame]) –Mapping from feature keys to metadata DataFrames. Keys can be any type coercible to FeatureKey (string, sequence, FeatureKey, or BaseFeature class). Values must be DataFrames compatible with Narwhals, containing required system columns.
-
materialization_id(str | None, default:None) –Optional external orchestration ID for all writes. Overrides the store's default
materialization_idif provided. Applied to all feature writes in this batch.
Raises:
-
MetadataSchemaError–If any DataFrame schema is invalid
-
StoreNotOpenError–If store is not open
-
ValueError–If writing to a feature from a different project than expected
Note
- Must be called within a
MetadataStore.open(mode="write")context manager. - Empty mappings are handled gracefully (no-op).
- Each feature's metadata is written via
write_metadata, so all validation and system column handling from that method applies.
Example
Source code in src/metaxy/metadata_store/base.py
def write_metadata_multi(
self,
metadata: Mapping[Any, IntoFrame],
materialization_id: str | None = None,
) -> None:
"""
Write metadata for multiple features in reverse topological order.
Processes features so that dependents are written before their dependencies.
This ordering ensures that downstream features are written first, which can
be useful for certain data consistency requirements or when features need
to be processed in a specific order.
Args:
metadata: Mapping from feature keys to metadata DataFrames.
Keys can be any type coercible to FeatureKey (string, sequence,
FeatureKey, or BaseFeature class). Values must be DataFrames
compatible with Narwhals, containing required system columns.
materialization_id: Optional external orchestration ID for all writes.
Overrides the store's default `materialization_id` if provided.
Applied to all feature writes in this batch.
Raises:
MetadataSchemaError: If any DataFrame schema is invalid
StoreNotOpenError: If store is not open
ValueError: If writing to a feature from a different project than expected
Note:
- Must be called within a `MetadataStore.open(mode="write")` context manager.
- Empty mappings are handled gracefully (no-op).
- Each feature's metadata is written via `write_metadata`, so all
validation and system column handling from that method applies.
Example:
```py
with store.open(mode="write"):
store.write_metadata_multi({
ChildFeature: child_df,
ParentFeature: parent_df,
})
# Features are written in reverse topological order:
# ChildFeature first, then ParentFeature
```
"""
if not metadata:
return
# Build mapping from resolved keys to dataframes in one pass
resolved_metadata = {
self._resolve_feature_key(key): df for key, df in metadata.items()
}
# Get reverse topological order (dependents first)
graph = current_graph()
sorted_keys = graph.topological_sort_features(
list(resolved_metadata.keys()), descending=True
)
# Write metadata in reverse topological order
for feature_key in sorted_keys:
self.write_metadata(
feature_key,
resolved_metadata[feature_key],
materialization_id=materialization_id,
)
metaxy.MetadataStore.config_model
abstractmethod
classmethod
¶
config_model() -> type[MetadataStoreConfig]
Return the configuration model class for this store type.
Subclasses must override this to return their specific config class.
Returns:
-
type[MetadataStoreConfig]–The config class type (e.g., DuckDBMetadataStoreConfig)
Note
Subclasses override this with a more specific return type. Type checkers may show a warning about incompatible override, but this is intentional - each store returns its own config type.
Source code in src/metaxy/metadata_store/base.py
@classmethod
@abstractmethod
def config_model(cls) -> type[MetadataStoreConfig]:
"""Return the configuration model class for this store type.
Subclasses must override this to return their specific config class.
Returns:
The config class type (e.g., DuckDBMetadataStoreConfig)
Note:
Subclasses override this with a more specific return type.
Type checkers may show a warning about incompatible override,
but this is intentional - each store returns its own config type.
"""
...
metaxy.MetadataStore.from_config
classmethod
¶
from_config(config: MetadataStoreConfig, **kwargs: Any) -> Self
Create a store instance from a configuration object.
This method creates a store by: 1. Converting the config to a dict 2. Resolving fallback store names to actual store instances 3. Calling the store's init with the config parameters
Parameters:
-
config(MetadataStoreConfig) –Configuration object (should be the type returned by config_model())
-
**kwargs(Any, default:{}) –Additional arguments passed directly to the store constructor (e.g., materialization_id for runtime parameters not in config)
Returns:
-
Self–A new store instance configured according to the config object
Example
Source code in src/metaxy/metadata_store/base.py
@classmethod
def from_config(cls, config: MetadataStoreConfig, **kwargs: Any) -> Self:
"""Create a store instance from a configuration object.
This method creates a store by:
1. Converting the config to a dict
2. Resolving fallback store names to actual store instances
3. Calling the store's __init__ with the config parameters
Args:
config: Configuration object (should be the type returned by config_model())
**kwargs: Additional arguments passed directly to the store constructor
(e.g., materialization_id for runtime parameters not in config)
Returns:
A new store instance configured according to the config object
Example:
```python
from metaxy.metadata_store.duckdb import (
DuckDBMetadataStore,
DuckDBMetadataStoreConfig,
)
config = DuckDBMetadataStoreConfig(
database="metadata.db",
fallback_stores=["prod"],
)
store = DuckDBMetadataStore.from_config(config)
```
"""
# Convert config to dict, excluding unset values
config_dict = config.model_dump(exclude_unset=True)
# Pop and resolve fallback store names to actual store instances
fallback_store_names = config_dict.pop("fallback_stores", [])
fallback_stores = [
MetaxyConfig.get().get_store(name) for name in fallback_store_names
]
# Create store with resolved fallback stores, config, and extra kwargs
return cls(fallback_stores=fallback_stores, **config_dict, **kwargs)
metaxy.MetadataStore.native_implementation
¶
native_implementation() -> Implementation
metaxy.MetadataStore.create_versioning_engine
¶
create_versioning_engine(plan: FeaturePlan, implementation: Implementation) -> Iterator[VersioningEngine | PolarsVersioningEngine]
Creates an appropriate provenance engine.
Falls back to Polars implementation if the required implementation differs from the store's native implementation.
Parameters:
-
plan(FeaturePlan) –The feature plan.
-
implementation(Implementation) –The desired engine implementation.
Returns:
-
Iterator[VersioningEngine | PolarsVersioningEngine]–An appropriate provenance engine.
Source code in src/metaxy/metadata_store/base.py
@contextmanager
def create_versioning_engine(
self, plan: FeaturePlan, implementation: nw.Implementation
) -> Iterator[VersioningEngine | PolarsVersioningEngine]:
"""
Creates an appropriate provenance engine.
Falls back to Polars implementation if the required implementation differs from the store's native implementation.
Args:
plan: The feature plan.
implementation: The desired engine implementation.
Returns:
An appropriate provenance engine.
"""
if implementation == nw.Implementation.POLARS:
cm = self._create_polars_versioning_engine(plan)
elif implementation == self.native_implementation():
cm = self._create_versioning_engine(plan)
else:
cm = self._create_polars_versioning_engine(plan)
with cm as engine:
yield engine
metaxy.MetadataStore.open
abstractmethod
¶
open(mode: AccessMode = 'read') -> Iterator[Self]
Open/initialize the store for operations.
Context manager that opens the store with specified access mode.
Called internally by __enter__.
Child classes should implement backend-specific connection setup/teardown here.
Parameters:
-
mode(AccessMode, default:'read') –Access mode for this connection session.
Yields:
-
Self(Self) –The store instance with connection open
Note
Users should prefer using with store: pattern except when write access mode is needed.
Source code in src/metaxy/metadata_store/base.py
@abstractmethod
@contextmanager
def open(self, mode: AccessMode = "read") -> Iterator[Self]:
"""Open/initialize the store for operations.
Context manager that opens the store with specified access mode.
Called internally by `__enter__`.
Child classes should implement backend-specific connection setup/teardown here.
Args:
mode: Access mode for this connection session.
Yields:
Self: The store instance with connection open
Note:
Users should prefer using `with store:` pattern except when write access mode is needed.
"""
...
metaxy.MetadataStore.__enter__
¶
Enter context manager - opens store in READ mode by default.
Use MetadataStore.open for write access mode instead.
Returns:
-
Self(Self) –The opened store instance
Source code in src/metaxy/metadata_store/base.py
def __enter__(self) -> Self:
"""Enter context manager - opens store in READ mode by default.
Use [`MetadataStore.open`][metaxy.metadata_store.base.MetadataStore.open] for write access mode instead.
Returns:
Self: The opened store instance
"""
# Determine mode based on auto_create_tables
mode = "write" if self.auto_create_tables else "read"
# Open the store (open() manages _context_depth internally)
self._open_cm = self.open(mode) # ty: ignore[invalid-assignment]
self._open_cm.__enter__() # ty: ignore[possibly-missing-attribute]
return self
metaxy.MetadataStore.validate_hash_algorithm
¶
validate_hash_algorithm(check_fallback_stores: bool = True) -> None
Validate that hash algorithm is supported by this store's components.
Public method - can be called to verify hash compatibility.
Parameters:
-
check_fallback_stores(bool, default:True) –If True, also validate hash is supported by fallback stores (ensures compatibility for future cross-store operations)
Raises:
-
ValueError–If hash algorithm not supported by components or fallback stores
Source code in src/metaxy/metadata_store/base.py
def validate_hash_algorithm(
self,
check_fallback_stores: bool = True,
) -> None:
"""Validate that hash algorithm is supported by this store's components.
Public method - can be called to verify hash compatibility.
Args:
check_fallback_stores: If True, also validate hash is supported by
fallback stores (ensures compatibility for future cross-store operations)
Raises:
ValueError: If hash algorithm not supported by components or fallback stores
"""
# Validate hash algorithm support without creating a full engine
# (engine creation requires a graph which isn't available during store init)
self._validate_hash_algorithm_support()
# Check fallback stores
if check_fallback_stores:
for fallback in self.fallback_stores:
fallback.validate_hash_algorithm(check_fallback_stores=False)
metaxy.MetadataStore.allow_cross_project_writes
¶
allow_cross_project_writes() -> Iterator[None]
Context manager to temporarily allow cross-project writes.
This is an escape hatch for legitimate cross-project operations like migrations, where metadata needs to be written to features from different projects.
Example
Yields:
-
None(None) –The context manager temporarily disables project validation
Source code in src/metaxy/metadata_store/base.py
@contextmanager
def allow_cross_project_writes(self) -> Iterator[None]:
"""Context manager to temporarily allow cross-project writes.
This is an escape hatch for legitimate cross-project operations like migrations,
where metadata needs to be written to features from different projects.
Example:
```py
# During migration, allow writing to features from different projects
with store.allow_cross_project_writes():
store.write_metadata(feature_from_project_a, metadata_a)
store.write_metadata(feature_from_project_b, metadata_b)
```
Yields:
None: The context manager temporarily disables project validation
"""
previous_value = self._allow_cross_project_writes
try:
self._allow_cross_project_writes = True
yield
finally:
self._allow_cross_project_writes = previous_value
metaxy.MetadataStore.write_metadata_to_store
abstractmethod
¶
write_metadata_to_store(feature_key: FeatureKey, df: Frame, **kwargs: Any) -> None
Internal write implementation (backend-specific).
Backends may convert to their specific type if needed (e.g., Polars, Ibis).
Parameters:
-
feature_key(FeatureKey) –Feature key to write to
-
df(Frame) –Narwhals-compatible DataFrame with metadata to write
-
**kwargs(Any, default:{}) –Backend-specific parameters
Note: Subclasses implement this for their storage backend.
Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def write_metadata_to_store(
self,
feature_key: FeatureKey,
df: Frame,
**kwargs: Any,
) -> None:
"""
Internal write implementation (backend-specific).
Backends may convert to their specific type if needed (e.g., Polars, Ibis).
Args:
feature_key: Feature key to write to
df: [Narwhals](https://narwhals-dev.github.io/narwhals/)-compatible DataFrame with metadata to write
**kwargs: Backend-specific parameters
Note: Subclasses implement this for their storage backend.
"""
pass
metaxy.MetadataStore.drop_feature_metadata
¶
drop_feature_metadata(feature: CoercibleToFeatureKey) -> None
Drop all metadata for a feature.
This removes all stored metadata for the specified feature from the store. Useful for cleanup in tests or when re-computing feature metadata from scratch.
Warning
This operation is irreversible and will permanently delete all metadata for the specified feature.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature class or key to drop metadata for
Source code in src/metaxy/metadata_store/base.py
def drop_feature_metadata(self, feature: CoercibleToFeatureKey) -> None:
"""Drop all metadata for a feature.
This removes all stored metadata for the specified feature from the store.
Useful for cleanup in tests or when re-computing feature metadata from scratch.
Warning:
This operation is irreversible and will **permanently delete all metadata** for the specified feature.
Args:
feature: Feature class or key to drop metadata for
Example:
```py
store.drop_feature_metadata(MyFeature)
assert not store.has_feature(MyFeature)
```
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
self._drop_feature_metadata_impl(feature_key)
metaxy.MetadataStore.read_metadata_in_store
abstractmethod
¶
read_metadata_in_store(feature: CoercibleToFeatureKey, *, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, **kwargs: Any) -> LazyFrame[Any] | None
Read metadata from THIS store only without using any fallbacks stores.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to read metadata for
-
filters(Sequence[Expr] | None, default:None) –List of Narwhals filter expressions for this specific feature.
-
columns(Sequence[str] | None, default:None) –Subset of columns to return
-
**kwargs(Any, default:{}) –Backend-specific parameters
Returns:
Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def read_metadata_in_store(
self,
feature: CoercibleToFeatureKey,
*,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
**kwargs: Any,
) -> nw.LazyFrame[Any] | None:
"""
Read metadata from THIS store only without using any fallbacks stores.
Args:
feature: Feature to read metadata for
filters: List of Narwhals filter expressions for this specific feature.
columns: Subset of columns to return
**kwargs: Backend-specific parameters
Returns:
Narwhals LazyFrame with metadata, or None if feature not found in the store
"""
pass
metaxy.MetadataStore.has_feature
¶
has_feature(feature: CoercibleToFeatureKey, *, check_fallback: bool = False) -> bool
Check if feature exists in store.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to check
-
check_fallback(bool, default:False) –If True, also check fallback stores
Returns:
-
bool–True if feature exists, False otherwise
Source code in src/metaxy/metadata_store/base.py
def has_feature(
self,
feature: CoercibleToFeatureKey,
*,
check_fallback: bool = False,
) -> bool:
"""
Check if feature exists in store.
Args:
feature: Feature to check
check_fallback: If True, also check fallback stores
Returns:
True if feature exists, False otherwise
"""
self._check_open()
if self.read_metadata_in_store(feature) is not None:
return True
# Check fallback stores
if not check_fallback:
return self._has_feature_impl(feature)
else:
for store in self.fallback_stores:
if store.has_feature(feature, check_fallback=True):
return True
return False
metaxy.MetadataStore.display
abstractmethod
¶
display() -> str
Return a human-readable display string for this store.
Used in warnings, logs, and CLI output to identify the store.
Returns:
-
str–Display string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")
Source code in src/metaxy/metadata_store/base.py
metaxy.MetadataStore.find_store_for_feature
¶
find_store_for_feature(feature_key: CoercibleToFeatureKey, *, check_fallback: bool = True) -> MetadataStore | None
Find the store that contains the given feature.
Parameters:
-
feature_key(CoercibleToFeatureKey) –Feature to find
-
check_fallback(bool, default:True) –Whether to check fallback stores when the feature is not found in the current store
Returns:
-
MetadataStore | None–The store containing the feature, or None if not found
Source code in src/metaxy/metadata_store/base.py
def find_store_for_feature(
self,
feature_key: CoercibleToFeatureKey,
*,
check_fallback: bool = True,
) -> MetadataStore | None:
"""Find the store that contains the given feature.
Args:
feature_key: Feature to find
check_fallback: Whether to check fallback stores when the feature
is not found in the current store
Returns:
The store containing the feature, or None if not found
"""
self._check_open()
# Check if feature exists in this store
if self.has_feature(feature_key):
return self
# Try fallback stores if enabled (opened on demand)
if check_fallback:
for store in self.fallback_stores:
with store:
found = store.find_store_for_feature(
feature_key, check_fallback=True
)
if found is not None:
return found
return None
metaxy.MetadataStore.get_store_metadata
¶
get_store_metadata(feature_key: CoercibleToFeatureKey, *, check_fallback: bool = True) -> dict[str, Any]
Arbitrary key-value pairs with useful metadata for logging purposes (like a path in storage).
This method should not expose sensitive information.
Parameters:
-
feature_key(CoercibleToFeatureKey) –Feature to get metadata for
-
check_fallback(bool, default:True) –Whether to check fallback stores when the feature is not found in the current store
Returns:
Source code in src/metaxy/metadata_store/base.py
def get_store_metadata(
self,
feature_key: CoercibleToFeatureKey,
*,
check_fallback: bool = True,
) -> dict[str, Any]:
"""Arbitrary key-value pairs with useful metadata for logging purposes (like a path in storage).
This method should not expose sensitive information.
Args:
feature_key: Feature to get metadata for
check_fallback: Whether to check fallback stores when the feature
is not found in the current store
Returns:
Dictionary with store-specific metadata (e.g., `"display"`, `"table_name"`, `"uri"`)
"""
store = self.find_store_for_feature(feature_key, check_fallback=check_fallback)
if store is None:
return {}
return {
"display": store.display(),
**store._get_store_metadata_impl(feature_key),
}
metaxy.MetadataStore.calculate_input_progress
¶
calculate_input_progress(lazy_increment: LazyIncrement, feature_key: CoercibleToFeatureKey) -> float | None
Calculate progress percentage from lazy increment.
Uses the input field from LazyIncrement to count total input units
and compares with added to determine how many are missing.
Progress represents the percentage of input units that have been processed
at least once. Stale samples (in changed) are counted as processed since
they have existing metadata, even though they may need re-processing due to
upstream changes.
Parameters:
-
lazy_increment(LazyIncrement) –The lazy increment containing input and added dataframes.
-
feature_key(CoercibleToFeatureKey) –The feature key to look up lineage information.
Returns:
-
float | None–Progress percentage (0-100), or None if input is not available.
Source code in src/metaxy/metadata_store/base.py
def calculate_input_progress(
self,
lazy_increment: LazyIncrement,
feature_key: CoercibleToFeatureKey,
) -> float | None:
"""Calculate progress percentage from lazy increment.
Uses the `input` field from LazyIncrement to count total input units
and compares with `added` to determine how many are missing.
Progress represents the percentage of input units that have been processed
at least once. Stale samples (in `changed`) are counted as processed since
they have existing metadata, even though they may need re-processing due to
upstream changes.
Args:
lazy_increment: The lazy increment containing input and added dataframes.
feature_key: The feature key to look up lineage information.
Returns:
Progress percentage (0-100), or None if input is not available.
"""
if lazy_increment.input is None:
return None
key = self._resolve_feature_key(feature_key)
graph = current_graph()
plan = graph.get_feature_plan(key)
# Get the columns that define input units from the feature plan
input_id_columns = plan.input_id_columns
# Count distinct input units using two separate queries
# We can't use concat because input and added may have different schemas
# (e.g., nullable vs non-nullable columns)
total_units: int = (
lazy_increment.input.select(input_id_columns)
.unique()
.select(nw.len())
.collect()
.item()
)
if total_units == 0:
return None # No input available from upstream
missing_units: int = (
lazy_increment.added.select(input_id_columns)
.unique()
.select(nw.len())
.collect()
.item()
)
processed_units = total_units - missing_units
return (processed_units / total_units) * 100
metaxy.MetadataStore.copy_metadata
¶
copy_metadata(from_store: MetadataStore, features: list[CoercibleToFeatureKey] | None = None, *, filters: Mapping[str, Sequence[Expr]] | None = None, global_filters: Sequence[Expr] | None = None, current_only: bool = False, latest_only: bool = True) -> dict[str, int]
Copy metadata from another store with fine-grained filtering.
This is a reusable method that can be called programmatically or from CLI/migrations. Copies metadata for specified features, preserving the original snapshot_version.
Parameters:
-
from_store(MetadataStore) –Source metadata store to copy from (must be opened for reading)
-
features(list[CoercibleToFeatureKey] | None, default:None) –List of features to copy. Can be: - None: copies all features from active graph - List of FeatureKey or Feature classes: copies specified features
-
filters(Mapping[str, Sequence[Expr]] | None, default:None) –Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions. These filters are applied when reading from the source store. Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
-
global_filters(Sequence[Expr] | None, default:None) –Sequence of Narwhals filter expressions applied to all features. These filters are combined with any feature-specific filters from
filters. Example: [nw.col("sample_uid").is_in(["s1", "s2"])] -
current_only(bool, default:False) –If True, only copy rows with the current feature_version (as defined in the loaded feature graph). Defaults to False to copy all versions.
-
latest_only(bool, default:True) –If True (default), deduplicate samples within
id_columnsgroups by keeping only the latest row per group (ordered bymetaxy_created_at).
Returns:
Raises:
-
ValueError–If source or destination store is not open
-
FeatureNotFoundError–If a specified feature doesn't exist in source store
Examples:
# Copy all features
with source_store.open("read"), dest_store.open("write"):
stats = dest_store.copy_metadata(from_store=source_store)
# Copy specific features
with source_store.open("read"), dest_store.open("write"):
stats = dest_store.copy_metadata(
from_store=source_store,
features=[FeatureKey(["my_feature"])],
)
# Copy with global filters applied to all features
with source_store.open("read"), dest_store.open("write"):
stats = dest_store.copy_metadata(
from_store=source_store,
global_filters=[nw.col("sample_uid").is_in(["s1", "s2"])],
)
# Copy specific features with per-feature filters
with source_store.open("read"), dest_store.open("write"):
stats = dest_store.copy_metadata(
from_store=source_store,
features=[
FeatureKey(["feature_a"]),
FeatureKey(["feature_b"]),
],
filters={
"feature_a": [nw.col("field_a") > 10],
"feature_b": [nw.col("field_b") < 30],
},
)
Source code in src/metaxy/metadata_store/base.py
def copy_metadata(
self,
from_store: MetadataStore,
features: list[CoercibleToFeatureKey] | None = None,
*,
filters: Mapping[str, Sequence[nw.Expr]] | None = None,
global_filters: Sequence[nw.Expr] | None = None,
current_only: bool = False,
latest_only: bool = True,
) -> dict[str, int]:
"""Copy metadata from another store with fine-grained filtering.
This is a reusable method that can be called programmatically or from CLI/migrations.
Copies metadata for specified features, preserving the original snapshot_version.
Args:
from_store: Source metadata store to copy from (must be opened for reading)
features: List of features to copy. Can be:
- None: copies all features from active graph
- List of FeatureKey or Feature classes: copies specified features
filters: Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions.
These filters are applied when reading from the source store.
Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
global_filters: Sequence of Narwhals filter expressions applied to all features.
These filters are combined with any feature-specific filters from `filters`.
Example: [nw.col("sample_uid").is_in(["s1", "s2"])]
current_only: If True, only copy rows with the current feature_version (as defined
in the loaded feature graph). Defaults to False to copy all versions.
latest_only: If True (default), deduplicate samples within `id_columns` groups
by keeping only the latest row per group (ordered by `metaxy_created_at`).
Returns:
Dict with statistics: {"features_copied": int, "rows_copied": int}
Raises:
ValueError: If source or destination store is not open
FeatureNotFoundError: If a specified feature doesn't exist in source store
Examples:
```py
# Copy all features
with source_store.open("read"), dest_store.open("write"):
stats = dest_store.copy_metadata(from_store=source_store)
```
```py
# Copy specific features
with source_store.open("read"), dest_store.open("write"):
stats = dest_store.copy_metadata(
from_store=source_store,
features=[FeatureKey(["my_feature"])],
)
```
```py
# Copy with global filters applied to all features
with source_store.open("read"), dest_store.open("write"):
stats = dest_store.copy_metadata(
from_store=source_store,
global_filters=[nw.col("sample_uid").is_in(["s1", "s2"])],
)
```
```py
# Copy specific features with per-feature filters
with source_store.open("read"), dest_store.open("write"):
stats = dest_store.copy_metadata(
from_store=source_store,
features=[
FeatureKey(["feature_a"]),
FeatureKey(["feature_b"]),
],
filters={
"feature_a": [nw.col("field_a") > 10],
"feature_b": [nw.col("field_b") < 30],
},
)
```
"""
import logging
logger = logging.getLogger(__name__)
# Validate both stores are open
if not self._is_open:
raise ValueError(
'Destination store must be opened with store.open("write") before use'
)
if not from_store._is_open:
raise ValueError(
'Source store must be opened with store.open("read") before use'
)
return self._copy_metadata_impl(
from_store=from_store,
features=features,
filters=filters,
global_filters=global_filters,
current_only=current_only,
latest_only=latest_only,
logger=logger,
)
metaxy.metadata_store.base.VersioningEngineOptions
module-attribute
¶
VersioningEngineOptions = Literal['auto', 'native', 'polars']
Base Configuration Class¶
The following base configuration class is typically used by child metadata stores:
metaxy.metadata_store.base.MetadataStoreConfig
¶
Bases: BaseSettings
Base configuration class for metadata stores.
This class defines common configuration fields shared by all metadata store types. Store-specific config classes should inherit from this and add their own fields.
Example
Configuration¶
The base MetadataStoreConfig class injects the following configuration options:
fallback_stores¶
List of fallback store names to search when features are not found in the current store.
Type: list[str]
hash_algorithm¶
Hash algorithm for versioning. If None, uses store's default.
Type: metaxy.versioning.types.HashAlgorithm | None
versioning_engine¶
Which versioning engine to use: 'auto' (prefer native), 'native', or 'polars'.
Type: Literal['auto', 'native', 'polars'] | Default: "auto"
Project Write Validation¶
By default, MetadataStore raises a ValueError when attempting to write to a project that doesn't match the expected project from MetaxyConfig.get().project.
For legitimate cross-project operations (such as migrations that need to update features across multiple projects), use MetadataStore.allow_cross_project_writes: