DuckDB Metadata Store¶
DuckDBMetadataStore
¶
DuckDBMetadataStore(database: str | Path, *, config: dict[str, str] | None = None, extensions: Sequence[ExtensionInput] | None = None, fallback_stores: list[MetadataStore] | None = None, ducklake: DuckLakeConfigInput | None = None, **kwargs)
Bases: IbisMetadataStore
DuckDB metadata store using Ibis backend.
With extensions
Parameters:
-
database(str | Path) –Database connection string or path. - File path:
"metadata.db"orPath("metadata.db")-
In-memory:
":memory:" -
MotherDuck:
"md:my_database"or"md:my_database?motherduck_token=..." -
S3:
"s3://bucket/path/database.duckdb"(read-only via ATTACH) -
HTTPS:
"https://example.com/database.duckdb"(read-only via ATTACH) -
Any valid DuckDB connection string
-
-
config(dict[str, str] | None, default:None) –Optional DuckDB configuration settings (e.g., {'threads': '4', 'memory_limit': '4GB'})
-
extensions(Sequence[ExtensionInput] | None, default:None) –List of DuckDB extensions to install and load on open. Supports strings (community repo), mapping-like objects with
name/repositorykeys, or metaxy.metadata_store.duckdb.ExtensionSpec instances.
Optional DuckLake attachment configuration. Provide either a
mapping with 'metadata_backend' and 'storage_backend' entries or a DuckLakeAttachmentConfig instance. When supplied, the DuckDB connection is configured to ATTACH the DuckLake catalog after open(). fallback_stores: Ordered list of read-only fallback stores.
**kwargs: Passed to metaxy.metadata_store.ibis.IbisMetadataStore`
Warning
Parent directories are NOT created automatically. Ensure paths exist before initializing the store.
Source code in src/metaxy/metadata_store/duckdb.py
def __init__(
self,
database: str | Path,
*,
config: dict[str, str] | None = None,
extensions: Sequence[ExtensionInput] | None = None,
fallback_stores: list["MetadataStore"] | None = None,
ducklake: DuckLakeConfigInput | None = None,
**kwargs,
):
"""
Initialize [DuckDB](https://duckdb.org/) metadata store.
Args:
database: Database connection string or path.
- File path: `"metadata.db"` or `Path("metadata.db")`
- In-memory: `":memory:"`
- MotherDuck: `"md:my_database"` or `"md:my_database?motherduck_token=..."`
- S3: `"s3://bucket/path/database.duckdb"` (read-only via ATTACH)
- HTTPS: `"https://example.com/database.duckdb"` (read-only via ATTACH)
- Any valid DuckDB connection string
config: Optional DuckDB configuration settings (e.g., {'threads': '4', 'memory_limit': '4GB'})
extensions: List of DuckDB extensions to install and load on open.
Supports strings (community repo), mapping-like objects with
``name``/``repository`` keys, or [metaxy.metadata_store.duckdb.ExtensionSpec][] instances.
ducklake: Optional DuckLake attachment configuration. Provide either a
mapping with 'metadata_backend' and 'storage_backend' entries or a
DuckLakeAttachmentConfig instance. When supplied, the DuckDB
connection is configured to ATTACH the DuckLake catalog after open().
fallback_stores: Ordered list of read-only fallback stores.
**kwargs: Passed to [metaxy.metadata_store.ibis.IbisMetadataStore][]`
Warning:
Parent directories are NOT created automatically. Ensure paths exist
before initializing the store.
"""
database_str = str(database)
# Build connection params for Ibis DuckDB backend
# Ibis DuckDB backend accepts config params directly (not nested under 'config')
connection_params = {"database": database_str}
if config:
connection_params.update(config)
self.database = database_str
base_extensions: list[NormalisedExtension] = _normalise_extensions(
extensions or []
)
self._ducklake_config: DuckLakeAttachmentConfig | None = None
self._ducklake_attachment: DuckLakeAttachmentManager | None = None
if ducklake is not None:
attachment_config, manager = build_ducklake_attachment(ducklake)
ensure_extensions_with_plugins(base_extensions, attachment_config.plugins)
self._ducklake_config = attachment_config
self._ducklake_attachment = manager
self.extensions = base_extensions
# Auto-add hashfuncs extension if not present (needed for default XXHASH64)
extension_names: list[str] = []
for ext in self.extensions:
if isinstance(ext, str):
extension_names.append(ext)
elif isinstance(ext, ExtensionSpec):
extension_names.append(ext.name)
else:
# After _normalise_extensions, this should not happen
# But keep defensive check for type safety
raise TypeError(
f"Extension must be str or ExtensionSpec after normalization; got {type(ext)}"
)
if "hashfuncs" not in extension_names:
self.extensions.append("hashfuncs")
# Initialize Ibis store with DuckDB backend
super().__init__(
backend="duckdb",
connection_params=connection_params,
fallback_stores=fallback_stores,
**kwargs,
)
Attributes¶
ducklake_attachment
property
¶
DuckLake attachment manager (raises if not configured).
ducklake_attachment_config
property
¶
ducklake_attachment_config: DuckLakeAttachmentConfig
DuckLake attachment configuration (raises if not configured).
ibis_conn
property
¶
Get Ibis backend connection.
Returns:
-
BaseBackend–Active Ibis backend connection
Raises:
-
StoreNotOpenError–If store is not open
conn
property
¶
Get connection (alias for ibis_conn for consistency).
Returns:
-
BaseBackend–Active Ibis backend connection
Raises:
-
StoreNotOpenError–If store is not open
Functions¶
open
¶
Open DuckDB connection and configure optional DuckLake attachment.
Source code in src/metaxy/metadata_store/duckdb.py
def open(self) -> None:
"""Open DuckDB connection and configure optional DuckLake attachment."""
super().open()
if self._ducklake_attachment is not None:
try:
duckdb_conn = self._duckdb_raw_connection()
self._ducklake_attachment.configure(duckdb_conn)
except Exception:
# Ensure connection is closed if DuckLake configuration fails
super().close()
raise
preview_ducklake_sql
¶
close
¶
__enter__
¶
Enter context manager.
Source code in src/metaxy/metadata_store/base.py
def __enter__(self) -> Self:
"""Enter context manager."""
# Track nesting depth
self._context_depth += 1
# Only open on first enter
if self._context_depth == 1:
# Warn if auto_create_tables is enabled (and store wants warnings)
if self.auto_create_tables and self._should_warn_auto_create_tables:
import warnings
warnings.warn(
f"AUTO_CREATE_TABLES is enabled for {self.display()} - "
"do not use in production! "
"Use proper database migration tools like Alembic for production deployments.",
UserWarning,
stacklevel=3, # stacklevel=3 to point to user's 'with store:' line
)
self.open()
self._is_open = True
# Validate after opening (when all components are ready)
self._validate_after_open()
return self
__exit__
¶
validate_hash_algorithm
¶
validate_hash_algorithm(check_fallback_stores: bool = True) -> None
Validate that hash algorithm is supported by this store's components.
Public method - can be called to verify hash compatibility.
Parameters:
-
check_fallback_stores(bool, default:True) –If True, also validate hash is supported by fallback stores (ensures compatibility for future cross-store operations)
Raises:
-
ValueError–If hash algorithm not supported by components or fallback stores
Source code in src/metaxy/metadata_store/base.py
def validate_hash_algorithm(
self,
check_fallback_stores: bool = True,
) -> None:
"""Validate that hash algorithm is supported by this store's components.
Public method - can be called to verify hash compatibility.
Args:
check_fallback_stores: If True, also validate hash is supported by
fallback stores (ensures compatibility for future cross-store operations)
Raises:
ValueError: If hash algorithm not supported by components or fallback stores
"""
# Check if this store can support the algorithm
# Try native field provenance calculations first (if supported), then Polars
supported_algorithms = []
if self._supports_native_components():
try:
_, calculator, _ = self._create_native_components()
supported_algorithms = calculator.supported_algorithms
except Exception:
# If native field provenance calculations fail, fall back to Polars
pass
# If no native support or prefer_native=False, use Polars
if not supported_algorithms:
polars_calc = PolarsProvenanceByFieldCalculator()
supported_algorithms = polars_calc.supported_algorithms
if self.hash_algorithm not in supported_algorithms:
from metaxy.metadata_store.exceptions import (
HashAlgorithmNotSupportedError,
)
raise HashAlgorithmNotSupportedError(
f"Hash algorithm {self.hash_algorithm} not supported by {self.__class__.__name__}. "
f"Supported: {supported_algorithms}"
)
# Check fallback stores
if check_fallback_stores:
for fallback in self.fallback_stores:
fallback.validate_hash_algorithm(check_fallback_stores=False)
allow_cross_project_writes
¶
allow_cross_project_writes() -> Iterator[None]
Context manager to temporarily allow cross-project writes.
This is an escape hatch for legitimate cross-project operations like migrations, where metadata needs to be written to features from different projects.
Example
Yields:
-
None(None) –The context manager temporarily disables project validation
Source code in src/metaxy/metadata_store/base.py
@contextmanager
def allow_cross_project_writes(self) -> Iterator[None]:
"""Context manager to temporarily allow cross-project writes.
This is an escape hatch for legitimate cross-project operations like migrations,
where metadata needs to be written to features from different projects.
Example:
```py
# During migration, allow writing to features from different projects
with store.allow_cross_project_writes():
store.write_metadata(feature_from_project_a, metadata_a)
store.write_metadata(feature_from_project_b, metadata_b)
```
Yields:
None: The context manager temporarily disables project validation
"""
previous_value = self._allow_cross_project_writes
try:
self._allow_cross_project_writes = True
yield
finally:
self._allow_cross_project_writes = previous_value
write_metadata
¶
write_metadata(feature: FeatureKey | type[BaseFeature], df: DataFrame[Any] | DataFrame) -> None
Write metadata for a feature (immutable, append-only).
Automatically adds the canonical system columns (metaxy_feature_version,
metaxy_snapshot_version) unless they already exist in the DataFrame
(useful for migrations).
Parameters:
-
feature(FeatureKey | type[BaseFeature]) –Feature to write metadata for
-
df(DataFrame[Any] | DataFrame) –Narwhals DataFrame or Polars DataFrame containing metadata. Must have
metaxy_provenance_by_fieldcolumn of type Struct with fields matching feature's fields. May optionally containmetaxy_feature_versionandmetaxy_snapshot_version(for migrations).
Raises:
-
MetadataSchemaError–If DataFrame schema is invalid
-
StoreNotOpenError–If store is not open
-
ValueError–If writing to a feature from a different project than expected
Note
- Always writes to current store, never to fallback stores.
- If df already contains the metaxy-managed columns, they will be used as-is (no replacement). This allows migrations to write historical versions. A warning is issued unless suppressed via context manager.
- Project validation is performed unless disabled via allow_cross_project_writes()
Source code in src/metaxy/metadata_store/base.py
def write_metadata(
self,
feature: FeatureKey | type[BaseFeature],
df: nw.DataFrame[Any] | pl.DataFrame,
) -> None:
"""
Write metadata for a feature (immutable, append-only).
Automatically adds the canonical system columns (`metaxy_feature_version`,
`metaxy_snapshot_version`) unless they already exist in the DataFrame
(useful for migrations).
Args:
feature: Feature to write metadata for
df: Narwhals DataFrame or Polars DataFrame containing metadata.
Must have `metaxy_provenance_by_field` column of type Struct with fields matching feature's fields.
May optionally contain `metaxy_feature_version` and `metaxy_snapshot_version` (for migrations).
Raises:
MetadataSchemaError: If DataFrame schema is invalid
StoreNotOpenError: If store is not open
ValueError: If writing to a feature from a different project than expected
Note:
- Always writes to current store, never to fallback stores.
- If df already contains the metaxy-managed columns, they will be used
as-is (no replacement). This allows migrations to write historical
versions. A warning is issued unless suppressed via context manager.
- Project validation is performed unless disabled via allow_cross_project_writes()
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
is_system_table = self._is_system_table(feature_key)
# Validate project for non-system tables
if not is_system_table:
self._validate_project_write(feature)
# Convert Narwhals to Polars if needed
if isinstance(df, nw.DataFrame):
df = df.to_polars()
# nw.DataFrame also matches as DataFrame in some contexts, ensure it's Polars
if not isinstance(df, pl.DataFrame):
# Must be some other type - shouldn't happen but handle defensively
if hasattr(df, "to_polars"):
df = df.to_polars()
elif hasattr(df, "to_pandas"):
df = pl.from_pandas(df.to_pandas())
else:
raise TypeError(f"Cannot convert {type(df)} to Polars DataFrame")
# For system tables, write directly without feature_version tracking
if is_system_table:
self._validate_schema_system_table(df)
self._write_metadata_impl(feature_key, df)
return
# For regular features: add feature_version and snapshot_version, validate, and write
# Check if feature_version and snapshot_version already exist in DataFrame
if FEATURE_VERSION_COL in df.columns and SNAPSHOT_VERSION_COL in df.columns:
# DataFrame already has feature_version and snapshot_version - use as-is
# This is intended for migrations writing historical versions
# Issue a warning unless we're in a suppression context
if not _suppress_feature_version_warning.get():
import warnings
warnings.warn(
f"Writing metadata for {feature_key.to_string()} with existing "
f"{FEATURE_VERSION_COL} and {SNAPSHOT_VERSION_COL} columns. This is intended for migrations only. "
"Normal code should let write_metadata() add the current versions automatically.",
UserWarning,
stacklevel=2,
)
else:
# Get current feature version and snapshot_version from code and add them
if isinstance(feature, type) and issubclass(feature, BaseFeature):
current_feature_version = feature.feature_version() # type: ignore[attr-defined]
else:
from metaxy.models.feature import FeatureGraph
graph = FeatureGraph.get_active()
feature_cls = graph.features_by_key[feature_key]
current_feature_version = feature_cls.feature_version() # type: ignore[attr-defined]
# Get snapshot_version from active graph
from metaxy.models.feature import FeatureGraph
graph = FeatureGraph.get_active()
current_snapshot_version = graph.snapshot_version
df = df.with_columns(
[
pl.lit(current_feature_version).alias(FEATURE_VERSION_COL),
pl.lit(current_snapshot_version).alias(SNAPSHOT_VERSION_COL),
]
)
# Validate schema
self._validate_schema(df)
# Write metadata
self._write_metadata_impl(feature_key, df)
drop_feature_metadata
¶
drop_feature_metadata(feature: FeatureKey | type[BaseFeature]) -> None
Drop all metadata for a feature.
This removes all stored metadata for the specified feature from the store. Useful for cleanup in tests or when re-computing feature metadata from scratch.
Warning
This operation is irreversible and will permanently delete all metadata for the specified feature.
Parameters:
-
feature(FeatureKey | type[BaseFeature]) –Feature class or key to drop metadata for
Source code in src/metaxy/metadata_store/base.py
def drop_feature_metadata(self, feature: FeatureKey | type[BaseFeature]) -> None:
"""Drop all metadata for a feature.
This removes all stored metadata for the specified feature from the store.
Useful for cleanup in tests or when re-computing feature metadata from scratch.
Warning:
This operation is irreversible and will **permanently delete all metadata** for the specified feature.
Args:
feature: Feature class or key to drop metadata for
Example:
```py
store.drop_feature_metadata(MyFeature)
assert not store.has_feature(MyFeature)
```
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
self._drop_feature_metadata_impl(feature_key)
record_feature_graph_snapshot
¶
record_feature_graph_snapshot() -> SnapshotPushResult
Record all features in graph with a graph snapshot version.
This should be called during CD (Continuous Deployment) to record what
feature versions are being deployed. Typically invoked via metaxy graph push.
Records all features in the graph with the same snapshot_version, representing a consistent state of the entire feature graph based on code definitions.
The snapshot_version is a deterministic hash of all feature_version hashes in the graph, making it idempotent - calling multiple times with the same feature definitions produces the same snapshot_version.
This method detects three scenarios: 1. New snapshot (computational changes): No existing rows with this snapshot_version 2. Metadata-only changes: Snapshot exists but some features have different feature_spec_version 3. No changes: Snapshot exists with identical feature_spec_versions for all features
Returns: SnapshotPushResult
Source code in src/metaxy/metadata_store/base.py
def record_feature_graph_snapshot(self) -> SnapshotPushResult:
"""Record all features in graph with a graph snapshot version.
This should be called during CD (Continuous Deployment) to record what
feature versions are being deployed. Typically invoked via `metaxy graph push`.
Records all features in the graph with the same snapshot_version, representing
a consistent state of the entire feature graph based on code definitions.
The snapshot_version is a deterministic hash of all feature_version hashes
in the graph, making it idempotent - calling multiple times with the
same feature definitions produces the same snapshot_version.
This method detects three scenarios:
1. New snapshot (computational changes): No existing rows with this snapshot_version
2. Metadata-only changes: Snapshot exists but some features have different feature_spec_version
3. No changes: Snapshot exists with identical feature_spec_versions for all features
Returns: SnapshotPushResult
"""
from metaxy.models.feature import FeatureGraph
graph = FeatureGraph.get_active()
# Use to_snapshot() to get the snapshot dict
snapshot_dict = graph.to_snapshot()
# Generate deterministic snapshot_version from graph
snapshot_version = graph.snapshot_version
# Read existing feature versions once
try:
existing_versions_lazy = self.read_metadata_in_store(FEATURE_VERSIONS_KEY)
# Materialize to Polars for iteration
existing_versions = (
existing_versions_lazy.collect().to_polars()
if existing_versions_lazy is not None
else None
)
except Exception:
# Table doesn't exist yet
existing_versions = None
# Get project from any feature in the graph (all should have the same project)
# Default to empty string if no features in graph
if graph.features_by_key:
# Get first feature's project
first_feature = next(iter(graph.features_by_key.values()))
project_name = first_feature.project # type: ignore[attr-defined]
else:
project_name = ""
# Check if this exact snapshot already exists for this project
snapshot_already_exists = False
existing_spec_versions: dict[str, str] = {}
if existing_versions is not None:
# Check if project column exists (it may not in old tables)
if "project" in existing_versions.columns:
snapshot_rows = existing_versions.filter(
(pl.col(SNAPSHOT_VERSION_COL) == snapshot_version)
& (pl.col("project") == project_name)
)
else:
# Old table without project column - just check snapshot_version
snapshot_rows = existing_versions.filter(
pl.col(SNAPSHOT_VERSION_COL) == snapshot_version
)
snapshot_already_exists = snapshot_rows.height > 0
if snapshot_already_exists:
# Check if feature_spec_version column exists (backward compatibility)
# Old records (before issue #77) won't have this column
has_spec_version = FEATURE_SPEC_VERSION_COL in snapshot_rows.columns
if has_spec_version:
# Build dict of existing feature_key -> feature_spec_version
for row in snapshot_rows.iter_rows(named=True):
existing_spec_versions[row["feature_key"]] = row[
FEATURE_SPEC_VERSION_COL
]
# If no spec_version column, existing_spec_versions remains empty
# This means we'll treat it as "no metadata changes" (conservative approach)
# Scenario 1: New snapshot (no existing rows)
if not snapshot_already_exists:
# Build records from snapshot_dict
records = []
for feature_key_str in sorted(snapshot_dict.keys()):
feature_data = snapshot_dict[feature_key_str]
# Serialize complete FeatureSpec
feature_spec_json = json.dumps(feature_data["feature_spec"])
# Always record all features for this snapshot (don't skip based on feature_version alone)
# Each snapshot must be complete to support migration detection
records.append(
{
"project": project_name,
"feature_key": feature_key_str,
FEATURE_VERSION_COL: feature_data[FEATURE_VERSION_COL],
FEATURE_SPEC_VERSION_COL: feature_data[
FEATURE_SPEC_VERSION_COL
],
FEATURE_TRACKING_VERSION_COL: feature_data[
FEATURE_TRACKING_VERSION_COL
],
"recorded_at": datetime.now(timezone.utc),
"feature_spec": feature_spec_json,
"feature_class_path": feature_data["feature_class_path"],
SNAPSHOT_VERSION_COL: snapshot_version,
}
)
# Bulk write all new records at once
if records:
version_records = pl.DataFrame(
records,
schema=FEATURE_VERSIONS_SCHEMA,
)
self._write_metadata_impl(FEATURE_VERSIONS_KEY, version_records)
return SnapshotPushResult(
snapshot_version=snapshot_version,
already_recorded=False,
metadata_changed=False,
features_with_spec_changes=[],
)
# Scenario 2 & 3: Snapshot exists - check for metadata changes
features_with_spec_changes = []
for feature_key_str, feature_data in snapshot_dict.items():
current_spec_version = feature_data[FEATURE_SPEC_VERSION_COL]
existing_spec_version = existing_spec_versions.get(feature_key_str)
if existing_spec_version != current_spec_version:
features_with_spec_changes.append(feature_key_str)
# If metadata changed, append new rows for affected features
if features_with_spec_changes:
records = []
for feature_key_str in features_with_spec_changes:
feature_data = snapshot_dict[feature_key_str]
# Serialize complete FeatureSpec
feature_spec_json = json.dumps(feature_data["feature_spec"])
records.append(
{
"project": project_name,
"feature_key": feature_key_str,
FEATURE_VERSION_COL: feature_data[FEATURE_VERSION_COL],
FEATURE_SPEC_VERSION_COL: feature_data[
FEATURE_SPEC_VERSION_COL
],
FEATURE_TRACKING_VERSION_COL: feature_data[
FEATURE_TRACKING_VERSION_COL
],
"recorded_at": datetime.now(timezone.utc),
"feature_spec": feature_spec_json,
"feature_class_path": feature_data["feature_class_path"],
SNAPSHOT_VERSION_COL: snapshot_version,
}
)
# Bulk write updated records (append-only)
if records:
version_records = pl.DataFrame(
records,
schema=FEATURE_VERSIONS_SCHEMA,
)
self._write_metadata_impl(FEATURE_VERSIONS_KEY, version_records)
return SnapshotPushResult(
snapshot_version=snapshot_version,
already_recorded=True,
metadata_changed=True,
features_with_spec_changes=features_with_spec_changes,
)
# Scenario 3: No changes at all
return SnapshotPushResult(
snapshot_version=snapshot_version,
already_recorded=True,
metadata_changed=False,
features_with_spec_changes=[],
)
read_metadata_in_store
¶
read_metadata_in_store(feature: FeatureKey | type[BaseFeature], *, feature_version: str | None = None, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None) -> LazyFrame[Any] | None
Read metadata from this store only (no fallback).
Parameters:
-
feature(FeatureKey | type[BaseFeature]) –Feature to read
-
feature_version(str | None, default:None) –Filter by specific feature_version (applied as SQL WHERE clause)
-
filters(Sequence[Expr] | None, default:None) –List of Narwhals filter expressions (converted to SQL WHERE clauses)
-
columns(Sequence[str] | None, default:None) –Optional list of columns to select
Returns:
Source code in src/metaxy/metadata_store/ibis.py
def read_metadata_in_store(
self,
feature: FeatureKey | type[BaseFeature],
*,
feature_version: str | None = None,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
) -> nw.LazyFrame[Any] | None:
"""
Read metadata from this store only (no fallback).
Args:
feature: Feature to read
feature_version: Filter by specific feature_version (applied as SQL WHERE clause)
filters: List of Narwhals filter expressions (converted to SQL WHERE clauses)
columns: Optional list of columns to select
Returns:
Narwhals LazyFrame with metadata, or None if not found
"""
feature_key = self._resolve_feature_key(feature)
table_name = feature_key.table_name
# Check if table exists
existing_tables = self.conn.list_tables()
if table_name not in existing_tables:
return None
# Get Ibis table reference
table = self.conn.table(table_name)
# Wrap Ibis table with Narwhals (stays lazy in SQL)
nw_lazy: nw.LazyFrame[Any] = nw.from_native(table, eager_only=False)
# Apply feature_version filter (stays in SQL via Narwhals)
if feature_version is not None:
nw_lazy = nw_lazy.filter(
nw.col("metaxy_feature_version") == feature_version
)
# Apply generic Narwhals filters (stays in SQL)
if filters is not None:
for filter_expr in filters:
nw_lazy = nw_lazy.filter(filter_expr)
# Select columns (stays in SQL)
if columns is not None:
nw_lazy = nw_lazy.select(columns)
# Return Narwhals LazyFrame wrapping Ibis table (stays lazy in SQL)
return nw_lazy
read_metadata
¶
read_metadata(feature: FeatureKey | type[BaseFeature], *, feature_version: str | None = None, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, allow_fallback: bool = True, current_only: bool = True) -> LazyFrame[Any]
Read metadata with optional fallback to upstream stores.
Parameters:
-
feature(FeatureKey | type[BaseFeature]) –Feature to read metadata for
-
feature_version(str | None, default:None) –Explicit feature_version to filter by (mutually exclusive with current_only=True)
-
filters(Sequence[Expr] | None, default:None) –Sequence of Narwhals filter expressions to apply to this feature. Example: [nw.col("x") > 10, nw.col("y") < 5]
-
columns(Sequence[str] | None, default:None) –Subset of columns to return
-
allow_fallback(bool, default:True) –If True, check fallback stores on local miss
-
current_only(bool, default:True) –If True, only return rows with current feature_version (default: True for safety)
Returns:
Raises:
-
FeatureNotFoundError–If feature not found in any store
-
ValueError–If both feature_version and current_only=True are provided
Source code in src/metaxy/metadata_store/base.py
def read_metadata(
self,
feature: FeatureKey | type[BaseFeature],
*,
feature_version: str | None = None,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
current_only: bool = True,
) -> nw.LazyFrame[Any]:
"""
Read metadata with optional fallback to upstream stores.
Args:
feature: Feature to read metadata for
feature_version: Explicit feature_version to filter by (mutually exclusive with current_only=True)
filters: Sequence of Narwhals filter expressions to apply to this feature.
Example: [nw.col("x") > 10, nw.col("y") < 5]
columns: Subset of columns to return
allow_fallback: If True, check fallback stores on local miss
current_only: If True, only return rows with current feature_version
(default: True for safety)
Returns:
Narwhals LazyFrame with metadata
Raises:
FeatureNotFoundError: If feature not found in any store
ValueError: If both feature_version and current_only=True are provided
"""
feature_key = self._resolve_feature_key(feature)
is_system_table = self._is_system_table(feature_key)
# Validate mutually exclusive parameters
if feature_version is not None and current_only:
raise ValueError(
"Cannot specify both feature_version and current_only=True. "
"Use current_only=False with feature_version parameter."
)
# Determine which feature_version to use
feature_version_filter = feature_version
if current_only and not is_system_table:
# Get current feature_version
if isinstance(feature, type) and issubclass(feature, BaseFeature):
feature_version_filter = feature.feature_version() # type: ignore[attr-defined]
else:
from metaxy.models.feature import FeatureGraph
graph = FeatureGraph.get_active()
# Only try to get from graph if feature_key exists in graph
# This allows reading system tables or external features not in current graph
if feature_key in graph.features_by_key:
feature_cls = graph.features_by_key[feature_key]
feature_version_filter = feature_cls.feature_version() # type: ignore[attr-defined]
else:
# Feature not in graph - skip feature_version filtering
feature_version_filter = None
# Try local first with filters
lazy_frame = self.read_metadata_in_store(
feature,
feature_version=feature_version_filter,
filters=filters, # Pass filters directly
columns=columns,
)
if lazy_frame is not None:
return lazy_frame
# Try fallback stores
if allow_fallback:
for store in self.fallback_stores:
try:
# Use full read_metadata to handle nested fallback chains
return store.read_metadata(
feature,
feature_version=feature_version,
filters=filters, # Pass through filters directly
columns=columns,
allow_fallback=True,
current_only=current_only, # Pass through current_only
)
except FeatureNotFoundError:
# Try next fallback store
continue
# Not found anywhere
raise FeatureNotFoundError(
f"Feature {feature_key.to_string()} not found in store"
+ (" or fallback stores" if allow_fallback else "")
)
has_feature
¶
has_feature(feature: FeatureKey | type[BaseFeature], *, check_fallback: bool = False) -> bool
Check if feature exists in store.
Parameters:
-
feature(FeatureKey | type[BaseFeature]) –Feature to check
-
check_fallback(bool, default:False) –If True, also check fallback stores
Returns:
-
bool–True if feature exists, False otherwise
Source code in src/metaxy/metadata_store/base.py
def has_feature(
self,
feature: FeatureKey | type[BaseFeature],
*,
check_fallback: bool = False,
) -> bool:
"""
Check if feature exists in store.
Args:
feature: Feature to check
check_fallback: If True, also check fallback stores
Returns:
True if feature exists, False otherwise
"""
# Check local
if self.read_metadata_in_store(feature) is not None:
return True
# Check fallback stores
if check_fallback:
for store in self.fallback_stores:
if store.has_feature(feature, check_fallback=True):
return True
return False
list_features
¶
list_features(*, include_fallback: bool = False) -> list[FeatureKey]
List all features in store.
Parameters:
-
include_fallback(bool, default:False) –If True, include features from fallback stores
Returns:
-
list[FeatureKey]–List of FeatureKey objects
Raises:
-
StoreNotOpenError–If store is not open
Source code in src/metaxy/metadata_store/base.py
def list_features(self, *, include_fallback: bool = False) -> list[FeatureKey]:
"""
List all features in store.
Args:
include_fallback: If True, include features from fallback stores
Returns:
List of FeatureKey objects
Raises:
StoreNotOpenError: If store is not open
"""
self._check_open()
features = self._list_features_local()
if include_fallback:
for store in self.fallback_stores:
features.extend(store.list_features(include_fallback=True))
# Deduplicate
seen = set()
unique_features = []
for feature in features:
key_str = feature.to_string()
if key_str not in seen:
seen.add(key_str)
unique_features.append(feature)
return unique_features
display
¶
display() -> str
Display string for this store.
Source code in src/metaxy/metadata_store/ibis.py
def display(self) -> str:
"""Display string for this store."""
backend_info = self.connection_string or f"{self.backend}"
if self._is_open:
num_features = len(self._list_features_local())
return f"IbisMetadataStore(backend={backend_info}, features={num_features})"
else:
return f"IbisMetadataStore(backend={backend_info})"
read_graph_snapshots
¶
read_graph_snapshots(project: str | None = None) -> DataFrame
Read recorded graph snapshots from the feature_versions system table.
Parameters:
-
project(str | None, default:None) –Project name to filter by. If None, returns snapshots from all projects.
Returns a DataFrame with columns: - snapshot_version: Unique identifier for each graph snapshot - recorded_at: Timestamp when the snapshot was recorded - feature_count: Number of features in this snapshot
Returns:
-
DataFrame–Polars DataFrame with snapshot information, sorted by recorded_at descending
Raises:
-
StoreNotOpenError–If store is not open
Example
Source code in src/metaxy/metadata_store/base.py
def read_graph_snapshots(self, project: str | None = None) -> pl.DataFrame:
"""Read recorded graph snapshots from the feature_versions system table.
Args:
project: Project name to filter by. If None, returns snapshots from all projects.
Returns a DataFrame with columns:
- snapshot_version: Unique identifier for each graph snapshot
- recorded_at: Timestamp when the snapshot was recorded
- feature_count: Number of features in this snapshot
Returns:
Polars DataFrame with snapshot information, sorted by recorded_at descending
Raises:
StoreNotOpenError: If store is not open
Example:
```py
with store:
# Get snapshots for a specific project
snapshots = store.read_graph_snapshots(project="my_project")
latest_snapshot = snapshots[SNAPSHOT_VERSION_COL][0]
print(f"Latest snapshot: {latest_snapshot}")
# Get snapshots across all projects
all_snapshots = store.read_graph_snapshots()
```
"""
self._check_open()
# Build filters based on project parameter
filters = None
if project is not None:
import narwhals as nw
filters = [nw.col("project") == project]
versions_lazy = self.read_metadata_in_store(
FEATURE_VERSIONS_KEY, filters=filters
)
if versions_lazy is None:
# No snapshots recorded yet
return pl.DataFrame(
schema={
SNAPSHOT_VERSION_COL: pl.String,
"recorded_at": pl.Datetime("us"),
"feature_count": pl.UInt32,
}
)
versions_df = versions_lazy.collect().to_polars()
# Group by snapshot_version and get earliest recorded_at and count
snapshots = (
versions_df.group_by(SNAPSHOT_VERSION_COL)
.agg(
[
pl.col("recorded_at").min().alias("recorded_at"),
pl.col("feature_key").count().alias("feature_count"),
]
)
.sort("recorded_at", descending=True)
)
return snapshots
read_features
¶
read_features(*, current: bool = True, snapshot_version: str | None = None, project: str | None = None) -> DataFrame
Read feature version information from the feature_versions system table.
Parameters:
-
current(bool, default:True) –If True, only return features from the current code snapshot. If False, must provide snapshot_version.
-
snapshot_version(str | None, default:None) –Specific snapshot version to filter by. Required if current=False.
-
project(str | None, default:None) –Project name to filter by. Defaults to None.
Returns:
-
DataFrame–Polars DataFrame with columns from FEATURE_VERSIONS_SCHEMA:
-
DataFrame–- feature_key: Feature identifier
-
DataFrame–- feature_version: Version hash of the feature
-
DataFrame–- recorded_at: When this version was recorded
-
DataFrame–- feature_spec: JSON serialized feature specification
-
DataFrame–- feature_class_path: Python import path to the feature class
-
DataFrame–- snapshot_version: Graph snapshot this feature belongs to
Raises:
-
StoreNotOpenError–If store is not open
-
ValueError–If current=False but no snapshot_version provided
Examples:
# Get features from current code
with store:
features = store.read_features(current=True)
print(f"Current graph has {len(features)} features")
# Get features from a specific snapshot
with store:
features = store.read_features(current=False, snapshot_version="abc123")
for row in features.iter_rows(named=True):
print(f"{row['feature_key']}: {row['metaxy_feature_version']}")
Source code in src/metaxy/metadata_store/base.py
def read_features(
self,
*,
current: bool = True,
snapshot_version: str | None = None,
project: str | None = None,
) -> pl.DataFrame:
"""Read feature version information from the feature_versions system table.
Args:
current: If True, only return features from the current code snapshot.
If False, must provide snapshot_version.
snapshot_version: Specific snapshot version to filter by. Required if current=False.
project: Project name to filter by. Defaults to None.
Returns:
Polars DataFrame with columns from FEATURE_VERSIONS_SCHEMA:
- feature_key: Feature identifier
- feature_version: Version hash of the feature
- recorded_at: When this version was recorded
- feature_spec: JSON serialized feature specification
- feature_class_path: Python import path to the feature class
- snapshot_version: Graph snapshot this feature belongs to
Raises:
StoreNotOpenError: If store is not open
ValueError: If current=False but no snapshot_version provided
Examples:
```py
# Get features from current code
with store:
features = store.read_features(current=True)
print(f"Current graph has {len(features)} features")
```
```py
# Get features from a specific snapshot
with store:
features = store.read_features(current=False, snapshot_version="abc123")
for row in features.iter_rows(named=True):
print(f"{row['feature_key']}: {row['metaxy_feature_version']}")
```
"""
self._check_open()
if not current and snapshot_version is None:
raise ValueError("Must provide snapshot_version when current=False")
if current:
# Get current snapshot from active graph
graph = FeatureGraph.get_active()
snapshot_version = graph.snapshot_version
filters = [nw.col(SNAPSHOT_VERSION_COL) == snapshot_version]
if project is not None:
filters.append(nw.col("project") == project)
versions_lazy = self.read_metadata_in_store(
FEATURE_VERSIONS_KEY, filters=filters
)
if versions_lazy is None:
# No features recorded yet
return pl.DataFrame(schema=FEATURE_VERSIONS_SCHEMA)
# Filter by snapshot_version
versions_df = versions_lazy.collect().to_polars()
return versions_df
copy_metadata
¶
copy_metadata(from_store: MetadataStore, features: list[FeatureKey | type[BaseFeature]] | None = None, *, from_snapshot: str | None = None, filters: Mapping[str, Sequence[Expr]] | None = None, incremental: bool = True) -> dict[str, int]
Copy metadata from another store with fine-grained filtering.
This is a reusable method that can be called programmatically or from CLI/migrations. Copies metadata for specified features, preserving the original snapshot_version.
Parameters:
-
from_store(MetadataStore) –Source metadata store to copy from (must be opened)
-
features(list[FeatureKey | type[BaseFeature]] | None, default:None) –List of features to copy. Can be: - None: copies all features from source store - List of FeatureKey or Feature classes: copies specified features
-
from_snapshot(str | None, default:None) –Snapshot version to filter source data by. If None, uses latest snapshot from source store. Only rows with this snapshot_version will be copied. The snapshot_version is preserved in the destination store.
-
filters(Mapping[str, Sequence[Expr]] | None, default:None) –Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions. These filters are applied when reading from the source store. Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
-
incremental(bool, default:True) –If True (default), filter out rows that already exist in the destination store by performing an anti-join on sample_uid for the same snapshot_version.
The implementation uses an anti-join: source LEFT ANTI JOIN destination ON sample_uid filtered by snapshot_version.
Disabling incremental (incremental=False) may improve performance when: - You know the destination is empty or has no overlap with source - The destination store uses deduplication
When incremental=False, it's the user's responsibility to avoid duplicates or configure deduplication at the storage layer.
Returns:
Raises:
-
ValueError–If from_store or self (destination) is not open
-
FeatureNotFoundError–If a specified feature doesn't exist in source store
Examples:
# Simple: copy all features from latest snapshot
stats = dest_store.copy_metadata(from_store=source_store)
# Copy specific features from a specific snapshot
stats = dest_store.copy_metadata(
from_store=source_store,
features=[FeatureKey(["my_feature"])],
from_snapshot="abc123",
)
# Copy with filters
stats = dest_store.copy_metadata(
from_store=source_store,
filters={"my/feature": [nw.col("sample_uid").is_in(["s1", "s2"])]},
)
# Copy specific features with filters
stats = dest_store.copy_metadata(
from_store=source_store,
features=[
FeatureKey(["feature_a"]),
FeatureKey(["feature_b"]),
],
filters={
"feature_a": [nw.col("field_a") > 10, nw.col("sample_uid").is_in(["s1", "s2"])],
"feature_b": [nw.col("field_b") < 30],
},
)
Source code in src/metaxy/metadata_store/base.py
def copy_metadata(
self,
from_store: MetadataStore,
features: list[FeatureKey | type[BaseFeature]] | None = None,
*,
from_snapshot: str | None = None,
filters: Mapping[str, Sequence[nw.Expr]] | None = None,
incremental: bool = True,
) -> dict[str, int]:
"""Copy metadata from another store with fine-grained filtering.
This is a reusable method that can be called programmatically or from CLI/migrations.
Copies metadata for specified features, preserving the original snapshot_version.
Args:
from_store: Source metadata store to copy from (must be opened)
features: List of features to copy. Can be:
- None: copies all features from source store
- List of FeatureKey or Feature classes: copies specified features
from_snapshot: Snapshot version to filter source data by. If None, uses latest snapshot
from source store. Only rows with this snapshot_version will be copied.
The snapshot_version is preserved in the destination store.
filters: Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions.
These filters are applied when reading from the source store.
Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
incremental: If True (default), filter out rows that already exist in the destination
store by performing an anti-join on sample_uid for the same snapshot_version.
The implementation uses an anti-join: source LEFT ANTI JOIN destination ON sample_uid
filtered by snapshot_version.
Disabling incremental (incremental=False) may improve performance when:
- You know the destination is empty or has no overlap with source
- The destination store uses deduplication
When incremental=False, it's the user's responsibility to avoid duplicates or
configure deduplication at the storage layer.
Returns:
Dict with statistics: {"features_copied": int, "rows_copied": int}
Raises:
ValueError: If from_store or self (destination) is not open
FeatureNotFoundError: If a specified feature doesn't exist in source store
Examples:
```py
# Simple: copy all features from latest snapshot
stats = dest_store.copy_metadata(from_store=source_store)
```
```py
# Copy specific features from a specific snapshot
stats = dest_store.copy_metadata(
from_store=source_store,
features=[FeatureKey(["my_feature"])],
from_snapshot="abc123",
)
```
```py
# Copy with filters
stats = dest_store.copy_metadata(
from_store=source_store,
filters={"my/feature": [nw.col("sample_uid").is_in(["s1", "s2"])]},
)
```
```py
# Copy specific features with filters
stats = dest_store.copy_metadata(
from_store=source_store,
features=[
FeatureKey(["feature_a"]),
FeatureKey(["feature_b"]),
],
filters={
"feature_a": [nw.col("field_a") > 10, nw.col("sample_uid").is_in(["s1", "s2"])],
"feature_b": [nw.col("field_b") < 30],
},
)
```
"""
import logging
logger = logging.getLogger(__name__)
# Validate destination store is open
if not self._is_open:
raise ValueError("Destination store must be opened (use context manager)")
# Automatically handle source store context manager
should_close_source = not from_store._is_open
if should_close_source:
from_store.__enter__()
try:
return self._copy_metadata_impl(
from_store=from_store,
features=features,
from_snapshot=from_snapshot,
filters=filters,
incremental=incremental,
logger=logger,
)
finally:
if should_close_source:
from_store.__exit__(None, None, None)
read_upstream_metadata
¶
read_upstream_metadata(feature: FeatureKey | type[BaseFeature], field: FieldKey | None = None, *, filters: Mapping[str, Sequence[Expr]] | None = None, allow_fallback: bool = True, current_only: bool = True) -> dict[str, LazyFrame[Any]]
Read all upstream dependencies for a feature/field.
Parameters:
-
feature(FeatureKey | type[BaseFeature]) –Feature whose dependencies to load
-
field(FieldKey | None, default:None) –Specific field (if None, loads all deps for feature)
-
filters(Mapping[str, Sequence[Expr]] | None, default:None) –Dict mapping feature keys (as strings) to lists of Narwhals filter expressions. Example: {"upstream/feature1": [nw.col("x") > 10], "upstream/feature2": [...]}
-
allow_fallback(bool, default:True) –Whether to check fallback stores
-
current_only(bool, default:True) –If True, only read current feature_version for upstream
Returns:
-
dict[str, LazyFrame[Any]]–Dict mapping upstream feature keys (as strings) to Narwhals LazyFrames.
-
dict[str, LazyFrame[Any]]–Each LazyFrame has a 'metaxy_provenance_by_field' column (Struct).
Raises:
-
DependencyError–If required upstream feature is missing
Source code in src/metaxy/metadata_store/base.py
def read_upstream_metadata(
self,
feature: FeatureKey | type[BaseFeature],
field: FieldKey | None = None,
*,
filters: Mapping[str, Sequence[nw.Expr]] | None = None,
allow_fallback: bool = True,
current_only: bool = True,
) -> dict[str, nw.LazyFrame[Any]]:
"""
Read all upstream dependencies for a feature/field.
Args:
feature: Feature whose dependencies to load
field: Specific field (if None, loads all deps for feature)
filters: Dict mapping feature keys (as strings) to lists of Narwhals filter expressions.
Example: {"upstream/feature1": [nw.col("x") > 10], "upstream/feature2": [...]}
allow_fallback: Whether to check fallback stores
current_only: If True, only read current feature_version for upstream
Returns:
Dict mapping upstream feature keys (as strings) to Narwhals LazyFrames.
Each LazyFrame has a 'metaxy_provenance_by_field' column (Struct).
Raises:
DependencyError: If required upstream feature is missing
"""
plan = self._resolve_feature_plan(feature)
# Get all upstream features we need
upstream_features = set()
if field is None:
# All fields' dependencies
for cont in plan.feature.fields:
upstream_features.update(self._get_field_dependencies(plan, cont.key))
else:
# Specific field's dependencies
upstream_features.update(self._get_field_dependencies(plan, field))
# Load metadata for each upstream feature
# Use the feature's graph to look up upstream feature classes
if isinstance(feature, FeatureKey):
from metaxy.models.feature import FeatureGraph
graph = FeatureGraph.get_active()
else:
graph = feature.graph
upstream_metadata = {}
for upstream_fq_key in upstream_features:
upstream_feature_key = upstream_fq_key.feature
# Extract filters for this specific upstream feature
upstream_filters = None
if filters:
upstream_key_str = upstream_feature_key.to_string()
if upstream_key_str in filters:
upstream_filters = filters[upstream_key_str]
try:
# Look up the Feature class from the graph and pass it to read_metadata
# This way we use the bound graph instead of relying on active context
upstream_feature_cls = graph.features_by_key[upstream_feature_key]
lazy_frame = self.read_metadata(
upstream_feature_cls,
filters=upstream_filters, # Pass extracted filters (Sequence or None)
allow_fallback=allow_fallback,
current_only=current_only, # Pass through current_only
)
# Use string key for dict
upstream_metadata[upstream_feature_key.to_string()] = lazy_frame
except FeatureNotFoundError as e:
raise DependencyError(
f"Missing upstream feature {upstream_feature_key.to_string()} "
f"required by {plan.feature.key.to_string()}"
) from e
return upstream_metadata
resolve_update
¶
resolve_update(feature: type[BaseFeature], *, samples: DataFrame[Any] | LazyFrame[Any] | None = None, filters: Mapping[str, Sequence[Expr]] | None = None, lazy: bool = False, **kwargs: Any) -> Increment | LazyIncrement
Calculate an incremental update for a feature.
Parameters:
-
feature(type[BaseFeature]) –Feature class to resolve updates for
-
samples(DataFrame[Any] | LazyFrame[Any] | None, default:None) –Pre-computed DataFrame with ID columns and
PROVENANCE_BY_FIELD_COLcolumn. When provided,MetadataStoreskips upstream loading, joining, and field provenance calculation.Required for root features (features with no upstream dependencies). Root features don't have upstream to calculate
PROVENANCE_BY_FIELD_COLfrom, so users must provide samples with manually computedPROVENANCE_BY_FIELD_COLcolumn.For non-root features, use this when you want to bypass the automatic upstream loading and field provenance calculation.
Examples:
-
Loading upstream from custom sources
-
Pre-computing field provenances with custom logic
-
Testing specific scenarios
Setting this parameter during normal operations is not required.
-
-
filters(Mapping[str, Sequence[Expr]] | None, default:None) –Dict mapping feature keys (as strings) to lists of Narwhals filter expressions. Applied when reading upstream metadata to filter samples at the source. Example: {"upstream/feature": [nw.col("x") > 10], ...}
-
lazy(bool, default:False) –If
True, return metaxy.data_versioning.diff.LazyIncrement with lazy Narwhals LazyFrames. IfFalse, return metaxy.data_versioning.diff.Increment with eager Narwhals DataFrames. -
**kwargs(Any, default:{}) –Backend-specific parameters
Raises:
-
ValueError–If no
samplesDataFrame has been provided when resolving an update for a root feature.
Examples:
# Root feature - samples required
samples = pl.DataFrame({
"sample_uid": [1, 2, 3],
PROVENANCE_BY_FIELD_COL: [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
})
result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
# Non-root feature - with escape hatch (advanced)
custom_samples = compute_custom_field_provenance(...)
result = store.resolve_update(DownstreamFeature, samples=custom_samples)
Note
Users can then process only added/changed and call write_metadata().
Source code in src/metaxy/metadata_store/base.py
def resolve_update(
self,
feature: type[BaseFeature],
*,
samples: nw.DataFrame[Any] | nw.LazyFrame[Any] | None = None,
filters: Mapping[str, Sequence[nw.Expr]] | None = None,
lazy: bool = False,
**kwargs: Any,
) -> Increment | LazyIncrement:
"""Calculate an incremental update for a feature.
Args:
feature: Feature class to resolve updates for
samples: Pre-computed DataFrame with ID columns
and `PROVENANCE_BY_FIELD_COL` column. When provided, `MetadataStore` skips upstream loading, joining,
and field provenance calculation.
**Required for root features** (features with no upstream dependencies).
Root features don't have upstream to calculate `PROVENANCE_BY_FIELD_COL` from, so users
must provide samples with manually computed `PROVENANCE_BY_FIELD_COL` column.
For non-root features, use this when you
want to bypass the automatic upstream loading and field provenance calculation.
Examples:
- Loading upstream from custom sources
- Pre-computing field provenances with custom logic
- Testing specific scenarios
Setting this parameter during normal operations is not required.
filters: Dict mapping feature keys (as strings) to lists of Narwhals filter expressions.
Applied when reading upstream metadata to filter samples at the source.
Example: {"upstream/feature": [nw.col("x") > 10], ...}
lazy: If `True`, return [metaxy.data_versioning.diff.LazyIncrement][] with lazy Narwhals LazyFrames.
If `False`, return [metaxy.data_versioning.diff.Increment][] with eager Narwhals DataFrames.
**kwargs: Backend-specific parameters
Raises:
ValueError: If no `samples` DataFrame has been provided when resolving an update for a root feature.
Examples:
```py
# Root feature - samples required
samples = pl.DataFrame({
"sample_uid": [1, 2, 3],
PROVENANCE_BY_FIELD_COL: [{"field": "h1"}, {"field": "h2"}, {"field": "h3"}],
})
result = store.resolve_update(RootFeature, samples=nw.from_native(samples))
```
```py
# Non-root feature - automatic (normal usage)
result = store.resolve_update(DownstreamFeature)
```
```py
# Non-root feature - with escape hatch (advanced)
custom_samples = compute_custom_field_provenance(...)
result = store.resolve_update(DownstreamFeature, samples=custom_samples)
```
Note:
Users can then process only added/changed and call write_metadata().
"""
import narwhals as nw
plan = feature.graph.get_feature_plan(feature.spec().key)
# Escape hatch: if samples provided, use them directly (skip join/calculation)
if samples is not None:
import logging
import polars as pl
logger = logging.getLogger(__name__)
# Convert samples to lazy if needed
if isinstance(samples, nw.LazyFrame):
samples_lazy = samples
elif isinstance(samples, nw.DataFrame):
samples_lazy = samples.lazy()
else:
samples_lazy = nw.from_native(samples).lazy()
# Check if samples are Polars-backed (common case for escape hatch)
samples_native = samples_lazy.to_native()
is_polars_samples = isinstance(samples_native, (pl.DataFrame, pl.LazyFrame))
if is_polars_samples and self._supports_native_components():
# User provided Polars samples but store uses native (SQL) backend
# Need to materialize current metadata to Polars for compatibility
logger.warning(
f"Feature {feature.spec().key}: samples parameter is Polars-backed but store uses native SQL backend. "
f"Materializing current metadata to Polars for diff comparison. "
f"For better performance, consider using samples with backend matching the store's backend."
)
# Get current metadata and materialize to Polars
current_lazy_native = self.read_metadata_in_store(
feature, feature_version=feature.feature_version()
)
if current_lazy_native is not None:
# Convert to Polars using Narwhals' built-in method
current_lazy = nw.from_native(
current_lazy_native.collect().to_polars().lazy()
)
else:
current_lazy = None
else:
# Same backend or no conversion needed - direct read
current_lazy = self.read_metadata_in_store(
feature, feature_version=feature.feature_version()
)
# Use diff resolver to compare samples with current
from metaxy.data_versioning.diff.narwhals import NarwhalsDiffResolver
diff_resolver = NarwhalsDiffResolver()
lazy_result = diff_resolver.find_changes(
target_provenance=samples_lazy,
current_metadata=current_lazy,
id_columns=feature.spec().id_columns, # Get ID columns from feature spec
)
return lazy_result if lazy else lazy_result.collect()
# Root features without samples: error (samples required)
if not plan.deps:
raise ValueError(
f"Feature {feature.spec().key} has no upstream dependencies (root feature). "
f"Must provide 'samples' parameter with sample_uid and {PROVENANCE_BY_FIELD_COL} columns. "
f"Root features require manual {PROVENANCE_BY_FIELD_COL} computation."
)
# Non-root features without samples: automatic upstream loading
# Check where upstream data lives
upstream_location = self._check_upstream_location(feature)
if upstream_location == "all_local":
# All upstream in this store - use native field provenance calculations
return self._resolve_update_native(feature, filters=filters, lazy=lazy)
else:
# Some upstream in fallback stores - use Polars components
return self._resolve_update_polars(feature, filters=filters, lazy=lazy)
ExtensionSpec
pydantic-model
¶
Bases: BaseModel
DuckDB extension specification accepted by DuckDBMetadataStore.
Supports additional keys for forward compatibility.
Show JSON schema:
{
"additionalProperties": true,
"description": "DuckDB extension specification accepted by DuckDBMetadataStore.\n\nSupports additional keys for forward compatibility.",
"properties": {
"name": {
"title": "Name",
"type": "string"
},
"repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Repository"
}
},
"required": [
"name"
],
"title": "ExtensionSpec",
"type": "object"
}
Config:
extra:allow
Fields:
DuckLakeConfigInput
module-attribute
¶
DuckLakeConfigInput = DuckLakeAttachmentConfig | Mapping[str, Any]
DuckLakeAttachmentConfig
pydantic-model
¶
Bases: BaseModel
Configuration payload used to attach DuckLake to a DuckDB connection.
Show JSON schema:
{
"additionalProperties": true,
"description": "Configuration payload used to attach DuckLake to a DuckDB connection.",
"properties": {
"metadata_backend": {
"additionalProperties": true,
"title": "Metadata Backend",
"type": "object"
},
"storage_backend": {
"additionalProperties": true,
"title": "Storage Backend",
"type": "object"
},
"alias": {
"default": "ducklake",
"title": "Alias",
"type": "string"
},
"plugins": {
"items": {
"type": "string"
},
"title": "Plugins",
"type": "array"
},
"attach_options": {
"additionalProperties": true,
"title": "Attach Options",
"type": "object"
}
},
"required": [
"metadata_backend",
"storage_backend"
],
"title": "DuckLakeAttachmentConfig",
"type": "object"
}
Config:
arbitrary_types_allowed:Trueextra:allow
Fields:
-
metadata_backend(DuckLakeBackend) -
storage_backend(DuckLakeBackend) -
alias(str) -
plugins(tuple[str, ...]) -
attach_options(dict[str, Any])
Validators:
-
_coerce_backends→metadata_backend,storage_backend -
_coerce_alias→alias -
_coerce_plugins→plugins -
_coerce_attach_options→attach_options