Delta Lake API Reference¶
metaxy.metadata_store.delta.DeltaMetadataStore
¶
DeltaMetadataStore(root_path: str | Path, *, storage_options: dict[str, Any] | None = None, fallback_stores: list[MetadataStore] | None = None, layout: Literal['flat', 'nested'] = 'nested', delta_write_options: dict[str, Any] | None = None, **kwargs: Any)
Bases: MetadataStore
Delta Lake metadata store backed by delta-rs.
It stores feature metadata in Delta Lake tables located under root_path.
It uses the Polars versioning engine for provenance calculations.
Example:
```py
from metaxy.metadata_store.delta import DeltaMetadataStore
store = DeltaMetadataStore(
root_path="s3://my-bucket/metaxy",
storage_options={"AWS_REGION": "us-west-2"},
)
```
Parameters:
-
root_path(str | Path) –Base directory or URI where feature tables are stored. Supports local paths (
/path/to/dir),s3://URLs, and other object store URIs. -
storage_options(dict[str, Any] | None, default:None) –Storage backend options passed to delta-rs. Example:
{"AWS_REGION": "us-west-2", "AWS_ACCESS_KEY_ID": "...", ...}See https://delta-io.github.io/delta-rs/ for details on supported options. -
fallback_stores(list[MetadataStore] | None, default:None) –Ordered list of read-only fallback stores.
-
layout(Literal['flat', 'nested'], default:'nested') –Directory layout for feature tables. Options:
-
"nested": Feature tables stored in nested directories{part1}/{part2}.delta -
"flat": Feature tables stored as{part1}__{part2}.delta
-
-
delta_write_options(dict[str, Any] | None, default:None) –Additional options passed to deltalake.write_deltalake() - see https://delta-io.github.io/delta-rs/upgrade-guides/guide-1.0.0/#write_deltalake-api. Overrides default {"schema_mode": "merge"}. Example: {"max_workers": 4}
-
**kwargs(Any, default:{}) –Forwarded to metaxy.metadata_store.base.MetadataStore.
Source code in src/metaxy/metadata_store/delta.py
def __init__(
self,
root_path: str | Path,
*,
storage_options: dict[str, Any] | None = None,
fallback_stores: list[MetadataStore] | None = None,
layout: Literal["flat", "nested"] = "nested",
delta_write_options: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
Initialize Delta Lake metadata store.
Args:
root_path: Base directory or URI where feature tables are stored.
Supports local paths (`/path/to/dir`), `s3://` URLs, and other object store URIs.
storage_options: Storage backend options passed to delta-rs.
Example: `{"AWS_REGION": "us-west-2", "AWS_ACCESS_KEY_ID": "...", ...}`
See https://delta-io.github.io/delta-rs/ for details on supported options.
fallback_stores: Ordered list of read-only fallback stores.
layout: Directory layout for feature tables. Options:
- `"nested"`: Feature tables stored in nested directories `{part1}/{part2}.delta`
- `"flat"`: Feature tables stored as `{part1}__{part2}.delta`
delta_write_options: Additional options passed to deltalake.write_deltalake() - see https://delta-io.github.io/delta-rs/upgrade-guides/guide-1.0.0/#write_deltalake-api.
Overrides default {"schema_mode": "merge"}. Example: {"max_workers": 4}
**kwargs: Forwarded to [metaxy.metadata_store.base.MetadataStore][metaxy.metadata_store.base.MetadataStore].
"""
self.storage_options = storage_options or {}
if layout not in ("flat", "nested"):
raise ValueError(f"Invalid layout: {layout}. Must be 'flat' or 'nested'.")
self.layout = layout
self.delta_write_options = delta_write_options or {}
root_str = str(root_path)
self._is_remote = not is_local_path(root_str)
if self._is_remote:
# Remote path (S3, Azure, GCS, etc.)
self._root_uri = root_str.rstrip("/")
else:
# Local path (including file:// and local:// URLs)
if root_str.startswith("file://"):
# Strip file:// prefix
root_str = root_str[7:]
elif root_str.startswith("local://"):
# Strip local:// prefix
root_str = root_str[8:]
local_path = Path(root_str).expanduser().resolve()
self._root_uri = str(local_path)
super().__init__(
fallback_stores=fallback_stores,
versioning_engine_cls=PolarsVersioningEngine,
versioning_engine="polars",
**kwargs,
)
Attributes¶
metaxy.metadata_store.delta.DeltaMetadataStore.default_delta_write_options
cached
property
¶
Default write options for Delta Lake operations.
Merges base defaults with user-provided delta_write_options. Base defaults: mode="append", schema_mode="merge", storage_options.
Functions¶
metaxy.metadata_store.delta.DeltaMetadataStore.open
¶
open(mode: AccessMode = 'read') -> Iterator[Self]
Open the Delta Lake store.
Delta-rs opens connections lazily per operation, so no connection state management needed.
Parameters:
-
mode(AccessMode, default:'read') –Access mode for this connection session (accepted for consistency but not used).
Yields:
-
Self(Self) –The store instance with connection open
Source code in src/metaxy/metadata_store/delta.py
@contextmanager
def open(self, mode: AccessMode = "read") -> Iterator[Self]: # noqa: ARG002
"""Open the Delta Lake store.
Delta-rs opens connections lazily per operation, so no connection state management needed.
Args:
mode: Access mode for this connection session (accepted for consistency but not used).
Yields:
Self: The store instance with connection open
"""
# Increment context depth to support nested contexts
self._context_depth += 1
try:
# Only perform actual open on first entry
if self._context_depth == 1:
# Mark store as open and validate
# Note: Delta auto-creates tables on first write, no need to pre-create them
self._is_open = True
self._validate_after_open()
yield self
finally:
# Decrement context depth
self._context_depth -= 1
# Only perform actual close on last exit
if self._context_depth == 0:
self._is_open = False
metaxy.metadata_store.delta.DeltaMetadataStore.write_metadata_to_store
¶
write_metadata_to_store(feature_key: FeatureKey, df: Frame, **kwargs: Any) -> None
Append metadata to the Delta table for a feature.
Parameters:
-
feature_key(FeatureKey) –Feature key to write to
-
df(Frame) –DataFrame with metadata (already validated)
-
**kwargs(Any, default:{}) –Backend-specific parameters (currently unused)
Source code in src/metaxy/metadata_store/delta.py
def write_metadata_to_store(
self,
feature_key: FeatureKey,
df: Frame,
**kwargs: Any,
) -> None:
"""Append metadata to the Delta table for a feature.
Args:
feature_key: Feature key to write to
df: DataFrame with metadata (already validated)
**kwargs: Backend-specific parameters (currently unused)
"""
table_uri = self._feature_uri(feature_key)
# Delta Lake auto-creates tables on first write, no need to check existence
# Convert to Polars and collect lazy frames
df_polars = switch_implementation_to_polars(df)
# Collect lazy frames, keep eager frames as-is
if isinstance(df_polars, nw.LazyFrame):
df_native = df_polars.collect().to_native()
else:
df_native = df_polars.to_native()
assert isinstance(df_native, pl.DataFrame)
# Cast Enum columns to String to avoid delta-rs Utf8View incompatibility
# (delta-rs parquet writer cannot handle Utf8View dictionary values)
df_native = df_native.with_columns(pl.selectors.by_dtype(pl.Enum).cast(pl.Utf8))
# Prepare write parameters for Polars write_delta
# Extract mode and storage_options as top-level parameters
write_opts = self.default_delta_write_options.copy()
mode = write_opts.pop("mode", "append")
storage_options = write_opts.pop("storage_options", None)
# Write using Polars DataFrame.write_delta
df_native.write_delta(
table_uri,
mode=mode,
storage_options=storage_options,
delta_write_options=write_opts or None,
)
metaxy.metadata_store.delta.DeltaMetadataStore.read_metadata_in_store
¶
read_metadata_in_store(feature: CoercibleToFeatureKey, *, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, **kwargs: Any) -> LazyFrame[Any] | None
Read metadata stored in Delta for a single feature using lazy evaluation.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to read metadata for
-
filters(Sequence[Expr] | None, default:None) –List of Narwhals filter expressions
-
columns(Sequence[str] | None, default:None) –Subset of columns to return
-
**kwargs(Any, default:{}) –Backend-specific parameters (currently unused)
Source code in src/metaxy/metadata_store/delta.py
def read_metadata_in_store(
self,
feature: CoercibleToFeatureKey,
*,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
**kwargs: Any,
) -> nw.LazyFrame[Any] | None:
"""Read metadata stored in Delta for a single feature using lazy evaluation.
Args:
feature: Feature to read metadata for
filters: List of Narwhals filter expressions
columns: Subset of columns to return
**kwargs: Backend-specific parameters (currently unused)
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
table_uri = self._feature_uri(feature_key)
if not self._table_exists(table_uri):
return None
# Use scan_delta for lazy evaluation
lf = pl.scan_delta(
table_uri,
storage_options=self.storage_options or None,
)
# Convert to Narwhals
nw_lazy = nw.from_native(lf)
# Apply filters (unpack list, skip if empty)
if filters:
nw_lazy = nw_lazy.filter(*filters)
# Apply column selection
if columns is not None:
nw_lazy = nw_lazy.select(columns)
return nw_lazy
metaxy.metadata_store.delta.DeltaMetadataStore.display
¶
display() -> str
Return human-readable representation of the store.
metaxy.metadata_store.delta.DeltaMetadataStore.get_store_metadata
¶
get_store_metadata(feature_key: CoercibleToFeatureKey) -> dict[str, Any]
Arbitrary key-value pairs with useful metadata like path in storage.
Useful for logging purposes. This method should not expose sensitive information.
metaxy.metadata_store.delta.DeltaMetadataStore.config_model
classmethod
¶
config_model() -> type[DeltaMetadataStoreConfig]
Return the configuration model class for this store type.
Subclasses must override this to return their specific config class.
Returns:
-
type[MetadataStoreConfig]–The config class type (e.g., DuckDBMetadataStoreConfig)
Note
Subclasses override this with a more specific return type. Type checkers may show a warning about incompatible override, but this is intentional - each store returns its own config type.