Skip to content

Delta Lake API Reference

metaxy.metadata_store.delta.DeltaMetadataStore

DeltaMetadataStore(root_path: str | Path, *, storage_options: dict[str, Any] | None = None, fallback_stores: list[MetadataStore] | None = None, layout: Literal['flat', 'nested'] = 'nested', delta_write_options: dict[str, Any] | None = None, **kwargs: Any)

Bases: MetadataStore

Delta Lake metadata store backed by delta-rs.

It stores feature metadata in Delta Lake tables located under root_path. It uses the Polars versioning engine for provenance calculations.

Example:

```py
from metaxy.metadata_store.delta import DeltaMetadataStore

store = DeltaMetadataStore(
    root_path="s3://my-bucket/metaxy",
    storage_options={"AWS_REGION": "us-west-2"},
)
```

Parameters:

  • root_path (str | Path) –

    Base directory or URI where feature tables are stored. Supports local paths (/path/to/dir), s3:// URLs, and other object store URIs.

  • storage_options (dict[str, Any] | None, default: None ) –

    Storage backend options passed to delta-rs. Example: {"AWS_REGION": "us-west-2", "AWS_ACCESS_KEY_ID": "...", ...} See https://delta-io.github.io/delta-rs/ for details on supported options.

  • fallback_stores (list[MetadataStore] | None, default: None ) –

    Ordered list of read-only fallback stores.

  • layout (Literal['flat', 'nested'], default: 'nested' ) –

    Directory layout for feature tables. Options:

    • "nested": Feature tables stored in nested directories {part1}/{part2}.delta

    • "flat": Feature tables stored as {part1}__{part2}.delta

  • delta_write_options (dict[str, Any] | None, default: None ) –

    Additional options passed to deltalake.write_deltalake() - see https://delta-io.github.io/delta-rs/upgrade-guides/guide-1.0.0/#write_deltalake-api. Overrides default {"schema_mode": "merge"}. Example: {"max_workers": 4}

  • **kwargs (Any, default: {} ) –
Source code in src/metaxy/metadata_store/delta.py
def __init__(
    self,
    root_path: str | Path,
    *,
    storage_options: dict[str, Any] | None = None,
    fallback_stores: list[MetadataStore] | None = None,
    layout: Literal["flat", "nested"] = "nested",
    delta_write_options: dict[str, Any] | None = None,
    **kwargs: Any,
) -> None:
    """
    Initialize Delta Lake metadata store.

    Args:
        root_path: Base directory or URI where feature tables are stored.
            Supports local paths (`/path/to/dir`), `s3://` URLs, and other object store URIs.
        storage_options: Storage backend options passed to delta-rs.
            Example: `{"AWS_REGION": "us-west-2", "AWS_ACCESS_KEY_ID": "...", ...}`
            See https://delta-io.github.io/delta-rs/ for details on supported options.
        fallback_stores: Ordered list of read-only fallback stores.
        layout: Directory layout for feature tables. Options:

            - `"nested"`: Feature tables stored in nested directories `{part1}/{part2}.delta`

            - `"flat"`: Feature tables stored as `{part1}__{part2}.delta`

        delta_write_options: Additional options passed to deltalake.write_deltalake() - see https://delta-io.github.io/delta-rs/upgrade-guides/guide-1.0.0/#write_deltalake-api.
            Overrides default {"schema_mode": "merge"}. Example: {"max_workers": 4}
        **kwargs: Forwarded to [metaxy.metadata_store.base.MetadataStore][metaxy.metadata_store.base.MetadataStore].
    """
    self.storage_options = storage_options or {}
    if layout not in ("flat", "nested"):
        raise ValueError(f"Invalid layout: {layout}. Must be 'flat' or 'nested'.")
    self.layout = layout
    self.delta_write_options = delta_write_options or {}

    root_str = str(root_path)
    self._is_remote = not is_local_path(root_str)

    if self._is_remote:
        # Remote path (S3, Azure, GCS, etc.)
        self._root_uri = root_str.rstrip("/")
    else:
        # Local path (including file:// and local:// URLs)
        if root_str.startswith("file://"):
            # Strip file:// prefix
            root_str = root_str[7:]
        elif root_str.startswith("local://"):
            # Strip local:// prefix
            root_str = root_str[8:]
        local_path = Path(root_str).expanduser().resolve()
        self._root_uri = str(local_path)

    super().__init__(
        fallback_stores=fallback_stores,
        versioning_engine_cls=PolarsVersioningEngine,
        versioning_engine="polars",
        **kwargs,
    )

Attributes

metaxy.metadata_store.delta.DeltaMetadataStore.default_delta_write_options cached property

default_delta_write_options: dict[str, Any]

Default write options for Delta Lake operations.

Merges base defaults with user-provided delta_write_options. Base defaults: mode="append", schema_mode="merge", storage_options.

Functions

metaxy.metadata_store.delta.DeltaMetadataStore.open

open(mode: AccessMode = 'read') -> Iterator[Self]

Open the Delta Lake store.

Delta-rs opens connections lazily per operation, so no connection state management needed.

Parameters:

  • mode (AccessMode, default: 'read' ) –

    Access mode for this connection session (accepted for consistency but not used).

Yields:

  • Self ( Self ) –

    The store instance with connection open

Source code in src/metaxy/metadata_store/delta.py
@contextmanager
def open(self, mode: AccessMode = "read") -> Iterator[Self]:  # noqa: ARG002
    """Open the Delta Lake store.

    Delta-rs opens connections lazily per operation, so no connection state management needed.

    Args:
        mode: Access mode for this connection session (accepted for consistency but not used).

    Yields:
        Self: The store instance with connection open
    """
    # Increment context depth to support nested contexts
    self._context_depth += 1

    try:
        # Only perform actual open on first entry
        if self._context_depth == 1:
            # Mark store as open and validate
            # Note: Delta auto-creates tables on first write, no need to pre-create them
            self._is_open = True
            self._validate_after_open()

        yield self
    finally:
        # Decrement context depth
        self._context_depth -= 1

        # Only perform actual close on last exit
        if self._context_depth == 0:
            self._is_open = False

metaxy.metadata_store.delta.DeltaMetadataStore.write_metadata_to_store

write_metadata_to_store(feature_key: FeatureKey, df: Frame, **kwargs: Any) -> None

Append metadata to the Delta table for a feature.

Parameters:

  • feature_key (FeatureKey) –

    Feature key to write to

  • df (Frame) –

    DataFrame with metadata (already validated)

  • **kwargs (Any, default: {} ) –

    Backend-specific parameters (currently unused)

Source code in src/metaxy/metadata_store/delta.py
def write_metadata_to_store(
    self,
    feature_key: FeatureKey,
    df: Frame,
    **kwargs: Any,
) -> None:
    """Append metadata to the Delta table for a feature.

    Args:
        feature_key: Feature key to write to
        df: DataFrame with metadata (already validated)
        **kwargs: Backend-specific parameters (currently unused)
    """
    table_uri = self._feature_uri(feature_key)

    # Delta Lake auto-creates tables on first write, no need to check existence
    # Convert to Polars and collect lazy frames
    df_polars = switch_implementation_to_polars(df)

    # Collect lazy frames, keep eager frames as-is
    if isinstance(df_polars, nw.LazyFrame):
        df_native = df_polars.collect().to_native()
    else:
        df_native = df_polars.to_native()

    assert isinstance(df_native, pl.DataFrame)

    # Cast Enum columns to String to avoid delta-rs Utf8View incompatibility
    # (delta-rs parquet writer cannot handle Utf8View dictionary values)
    df_native = df_native.with_columns(pl.selectors.by_dtype(pl.Enum).cast(pl.Utf8))

    # Prepare write parameters for Polars write_delta
    # Extract mode and storage_options as top-level parameters
    write_opts = self.default_delta_write_options.copy()
    mode = write_opts.pop("mode", "append")
    storage_options = write_opts.pop("storage_options", None)

    # Write using Polars DataFrame.write_delta
    df_native.write_delta(
        table_uri,
        mode=mode,
        storage_options=storage_options,
        delta_write_options=write_opts or None,
    )

metaxy.metadata_store.delta.DeltaMetadataStore.read_metadata_in_store

read_metadata_in_store(feature: CoercibleToFeatureKey, *, filters: Sequence[Expr] | None = None, columns: Sequence[str] | None = None, **kwargs: Any) -> LazyFrame[Any] | None

Read metadata stored in Delta for a single feature using lazy evaluation.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to read metadata for

  • filters (Sequence[Expr] | None, default: None ) –

    List of Narwhals filter expressions

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to return

  • **kwargs (Any, default: {} ) –

    Backend-specific parameters (currently unused)

Source code in src/metaxy/metadata_store/delta.py
def read_metadata_in_store(
    self,
    feature: CoercibleToFeatureKey,
    *,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    **kwargs: Any,
) -> nw.LazyFrame[Any] | None:
    """Read metadata stored in Delta for a single feature using lazy evaluation.

    Args:
        feature: Feature to read metadata for
        filters: List of Narwhals filter expressions
        columns: Subset of columns to return
        **kwargs: Backend-specific parameters (currently unused)
    """
    self._check_open()

    feature_key = self._resolve_feature_key(feature)
    table_uri = self._feature_uri(feature_key)
    if not self._table_exists(table_uri):
        return None

    # Use scan_delta for lazy evaluation
    lf = pl.scan_delta(
        table_uri,
        storage_options=self.storage_options or None,
    )

    # Convert to Narwhals
    nw_lazy = nw.from_native(lf)

    # Apply filters (unpack list, skip if empty)
    if filters:
        nw_lazy = nw_lazy.filter(*filters)

    # Apply column selection
    if columns is not None:
        nw_lazy = nw_lazy.select(columns)

    return nw_lazy

metaxy.metadata_store.delta.DeltaMetadataStore.display

display() -> str

Return human-readable representation of the store.

Source code in src/metaxy/metadata_store/delta.py
def display(self) -> str:
    """Return human-readable representation of the store."""
    details = [f"path={self._root_uri}"]
    details.append(f"layout={self.layout}")
    return f"DeltaMetadataStore({', '.join(details)})"

metaxy.metadata_store.delta.DeltaMetadataStore.get_store_metadata

get_store_metadata(feature_key: CoercibleToFeatureKey) -> dict[str, Any]

Arbitrary key-value pairs with useful metadata like path in storage.

Useful for logging purposes. This method should not expose sensitive information.

Source code in src/metaxy/metadata_store/delta.py
def get_store_metadata(self, feature_key: CoercibleToFeatureKey) -> dict[str, Any]:
    return {"path": self._feature_uri(self._resolve_feature_key(feature_key))}

metaxy.metadata_store.delta.DeltaMetadataStore.config_model classmethod

config_model() -> type[DeltaMetadataStoreConfig]

Return the configuration model class for this store type.

Subclasses must override this to return their specific config class.

Returns:

Note

Subclasses override this with a more specific return type. Type checkers may show a warning about incompatible override, but this is intentional - each store returns its own config type.

Source code in src/metaxy/metadata_store/delta.py
@classmethod
def config_model(cls) -> type[DeltaMetadataStoreConfig]:  # pyright: ignore[reportIncompatibleMethodOverride]
    return DeltaMetadataStoreConfig