Skip to content

Dagster API Reference

Decorators

metaxy.ext.dagster.metaxify.metaxify

metaxify(_asset: _T | None = None, *, feature: CoercibleToFeatureKey | None = None, key: CoercibleToAssetKey | None = None, key_prefix: CoercibleToAssetKeyPrefix | None = None, inherit_feature_key_as_asset_key: bool = True, inject_metaxy_kind: bool = True, inject_code_version: bool = True, set_description: bool = True, inject_column_schema: bool = True, inject_column_lineage: bool = True)

Inject Metaxy metadata into a Dagster AssetsDefinition or AssetSpec.

Modifies assets that have metaxy/feature metadata set or when feature argument is provided. Can be used with or without parentheses.

The decorated asset or spec is enriched with Metaxy:

  • Asset key is determined as follows:

    1. If key argument is provided, it overrides everything.

    2. Otherwise, the key is resolved using existing logic:

      • If "dagster/attributes": {"asset_key": ...} is set on the feature spec, that's used.
      • Else if inherit_feature_key_as_asset_key is True, the Metaxy feature key is used.
      • Else the original Dagster asset key is kept.
    3. If key_prefix is provided, it's prepended to the resolved key.

  • Dependencies for upstream Metaxy features are injected into deps. The dep keys follow the same logic.

  • Code version from the feature spec is appended to the asset's code version in the format metaxy:<version>.

  • Metadata from the feature spec is injected into the Dagster asset metadata.

  • Description from the feature class docstring is used if the asset spec doesn't have a description set.

  • Kind "metaxy" is injected into asset kinds if inject_metaxy_kind is True and there are less than 3 kinds currently.

  • Tags metaxy/feature and metaxy/project are injected into the asset tags.

  • Arbitrary asset attributes from "dagster/attributes" in the feature spec metadata (such as group_name, owners, tags) are applied to the asset spec (with replacement).

  • Column schema from Pydantic fields is injected into the asset metadata under dagster/column_schema. Field types are converted to strings, and field descriptions are used as column descriptions. If the asset already has a column schema defined, Metaxy columns are appended (user-defined columns take precedence for columns with the same name).

Warning

Pydantic feature schema may not match the corresponding table schema in the metadata store.

  • Column lineage is injected into the asset metadata under dagster/column_lineage. Tracks which upstream columns each downstream column depends on by analyzing:

    • Direct pass-through: Columns with the same name in both upstream and downstream features.

    • FeatureDep.rename: Renamed columns trace back to their original upstream column names.

    • FeatureSpec.lineage: ID column relationships based on lineage type (identity, aggregation, expansion).

    Column lineage is derived from Pydantic model fields on the feature class. If the asset already has column lineage defined, Metaxy lineage is merged with user-defined lineage (user-defined dependencies are appended to Metaxy-detected dependencies for each column).

Parameters:

  • feature (CoercibleToFeatureKey | None, default: None ) –

    The Metaxy feature to associate with the asset. If both feature and metaxy/feature metadata are set and don't match, an error is raised.

  • key (CoercibleToAssetKey | None, default: None ) –

    Explicit asset key that overrides all other key resolution logic. Cannot be used with key_prefix or with multi-asset definitions that produce multiple outputs.

  • key_prefix (CoercibleToAssetKeyPrefix | None, default: None ) –

    Prefix to prepend to the resolved asset key. Also applied to upstream dependency keys. Cannot be used with key.

  • inherit_feature_key_as_asset_key (bool, default: True ) –

    If True (default), use the Metaxy feature key as the Dagster asset key (unless key is provided or dagster/attributes.asset_key is set). This ensures consistent key resolution between assets and their upstream dependencies.

  • inject_metaxy_kind (bool, default: True ) –

    Whether to inject "metaxy" kind into asset kinds. Currently, kinds count is limited by 3, and metaxify will skip kind injection if there are already 3 kinds on the asset.

  • inject_code_version (bool, default: True ) –

    Whether to inject the Metaxy feature code version into the asset's code version. The version is appended in the format metaxy:<version>.

  • set_description (bool, default: True ) –

    Whether to set the asset description from the feature class docstring if the asset doesn't already have a description.

  • inject_column_schema (bool, default: True ) –

    Whether to inject Pydantic field definitions as Dagster column schema. Field types are converted to strings, and field descriptions are used as column descriptions.

  • inject_column_lineage (bool, default: True ) –

    Whether to inject column-level lineage into the asset metadata under dagster/column_lineage. Uses Pydantic model fields to track column provenance via FeatureDep.rename, FeatureSpec.lineage, and direct pass-through.

Note

Multiple Dagster assets can contribute to the same Metaxy feature by setting the same "metaxy/feature" metadata. This is a perfectly valid setup since Metaxy writes are append-only.

Using @metaxify with multi assets

The feature argument cannot be used with @dg.multi_asset that produces multiple outputs. Instead, set "metaxy/feature" metadata on the right output's AssetSpec:

@mxd.metaxify()
@dg.multi_asset(
    specs=[
        dg.AssetSpec("output_a", metadata={"metaxy/feature": "feature/a"}),
        dg.AssetSpec("output_b", metadata={"metaxy/feature": "feature/b"}),
    ]
)
def my_multi_asset():
    ...

Apply to dagster.AssetDefinition

import dagster as dg
import metaxy.ext.dagster as mxd
from myproject.features import MyFeature

@mxd.metaxify(feature=MyFeature)
@dg.asset
def my_asset():
    ...
Apply to dagster.AssetSpec
import dagster as dg
import metaxy.ext.dagster as mxd

asset_spec = dg.AssetSpec(key="my_asset")
asset_spec = mxd.metaxify(feature=MyFeature)(asset_spec)
Use "metaxy/feature" asset metadata key
import dagster as dg
import metaxy as mx
import metaxy.ext.dagster as mxd

@mxd.metaxify
@dg.asset(metadata={"metaxy/feature": "my/feature/key"})
def my_asset(store: mx.MetadataStore):
    with store:
        increment = store.resolve_update("my/feature/key")

    ...
Source code in src/metaxy/ext/dagster/metaxify.py
def __init__(
    self,
    _asset: "_T | None" = None,
    *,
    feature: mx.CoercibleToFeatureKey | None = None,
    key: CoercibleToAssetKey | None = None,
    key_prefix: CoercibleToAssetKeyPrefix | None = None,
    inherit_feature_key_as_asset_key: bool = True,
    inject_metaxy_kind: bool = True,
    inject_code_version: bool = True,
    set_description: bool = True,
    inject_column_schema: bool = True,
    inject_column_lineage: bool = True,
) -> None:
    # Actual initialization happens in __new__, but we set defaults here for type checkers
    self.feature = (
        mx.coerce_to_feature_key(feature) if feature is not None else None
    )
    self.key = dg.AssetKey.from_coercible(key) if key is not None else None
    self.key_prefix = (
        dg.AssetKey.from_coercible(key_prefix) if key_prefix is not None else None
    )
    self.inherit_feature_key_as_asset_key = inherit_feature_key_as_asset_key
    self.inject_metaxy_kind = inject_metaxy_kind
    self.inject_code_version = inject_code_version
    self.set_description = set_description
    self.inject_column_schema = inject_column_schema
    self.inject_column_lineage = inject_column_lineage

metaxy.ext.dagster.observable.observable_metaxy_asset

observable_metaxy_asset(feature: CoercibleToFeatureKey, *, store_resource_key: str = 'store', inherit_feature_key_as_asset_key: bool = False, inject_metaxy_kind: bool = True, inject_code_version: bool = True, set_description: bool = True, **observable_kwargs: Any)

Decorator to create an observable source asset for a Metaxy feature.

The observation reads the feature's metadata from the store, counts rows, and uses mean(metaxy_created_at) as the data version to track changes. Using mean ensures that both additions and deletions are detected.

The decorated function receives (context, store, lazy_df) and can return a dict of additional metadata to include in the observation.

Parameters:

  • feature (CoercibleToFeatureKey) –

    The Metaxy feature to observe.

  • store_resource_key (str, default: 'store' ) –

    Resource key for the MetadataStore (default: "store").

  • inherit_feature_key_as_asset_key (bool, default: False ) –

    If True, use the Metaxy feature key as the Dagster asset key.

  • inject_metaxy_kind (bool, default: True ) –

    Whether to inject "metaxy" kind into asset kinds.

  • inject_code_version (bool, default: True ) –

    Whether to inject the Metaxy feature code version.

  • set_description (bool, default: True ) –

    Whether to set description from feature class docstring.

  • **observable_kwargs (Any, default: {} ) –

    Passed to @observable_source_asset (key, group_name, tags, metadata, description, partitions_def, etc.)

Example
import metaxy.ext.dagster as mxd
from myproject.features import ExternalFeature

@mxd.observable_metaxy_asset(feature=ExternalFeature, key="external_data")
def external_data(context, store, lazy_df):
    pass

# With custom metadata - return a dict
@mxd.observable_metaxy_asset(feature=ExternalFeature, key="external_data")
def external_data_with_metrics(context, store, lazy_df):
    # Run aggregations in the database
    total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
    return {"custom/total": total}
Note

observable_source_asset does not support deps. Upstream Metaxy feature dependencies from the feature spec are not propagated to the SourceAsset.

Source code in src/metaxy/ext/dagster/observable.py
def observable_metaxy_asset(
    feature: mx.CoercibleToFeatureKey,
    *,
    store_resource_key: str = "store",
    # metaxify kwargs
    inherit_feature_key_as_asset_key: bool = False,
    inject_metaxy_kind: bool = True,
    inject_code_version: bool = True,
    set_description: bool = True,
    # observable_source_asset kwargs
    **observable_kwargs: Any,
):
    """Decorator to create an observable source asset for a Metaxy feature.

    The observation reads the feature's metadata from the store, counts rows,
    and uses `mean(metaxy_created_at)` as the data version to track changes.
    Using mean ensures that both additions and deletions are detected.

    The decorated function receives `(context, store, lazy_df)` and can return
    a dict of additional metadata to include in the observation.

    Args:
        feature: The Metaxy feature to observe.
        store_resource_key: Resource key for the MetadataStore (default: `"store"`).
        inherit_feature_key_as_asset_key: If True, use the Metaxy feature key as the
            Dagster asset key.
        inject_metaxy_kind: Whether to inject `"metaxy"` kind into asset kinds.
        inject_code_version: Whether to inject the Metaxy feature code version.
        set_description: Whether to set description from feature class docstring.
        **observable_kwargs: Passed to `@observable_source_asset`
            (key, group_name, tags, metadata, description, partitions_def, etc.)

    Example:
        ```python
        import metaxy.ext.dagster as mxd
        from myproject.features import ExternalFeature

        @mxd.observable_metaxy_asset(feature=ExternalFeature, key="external_data")
        def external_data(context, store, lazy_df):
            pass

        # With custom metadata - return a dict
        @mxd.observable_metaxy_asset(feature=ExternalFeature, key="external_data")
        def external_data_with_metrics(context, store, lazy_df):
            # Run aggregations in the database
            total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
            return {"custom/total": total}
        ```

    Note:
        `observable_source_asset` does not support `deps`. Upstream Metaxy feature
        dependencies from the feature spec are not propagated to the SourceAsset.
    """
    feature_key = mx.coerce_to_feature_key(feature)

    def decorator(fn: Callable[..., Any]) -> dg.SourceAsset:
        # Build an AssetSpec from kwargs and enrich with metaxify
        spec = dg.AssetSpec(
            key=observable_kwargs.pop("key", None) or fn.__name__,
            group_name=observable_kwargs.pop("group_name", None),
            tags=observable_kwargs.pop("tags", None),
            metadata=observable_kwargs.pop("metadata", None),
            description=observable_kwargs.pop("description", None),
        )
        enriched = metaxify(
            feature=feature_key,
            inherit_feature_key_as_asset_key=inherit_feature_key_as_asset_key,
            inject_metaxy_kind=inject_metaxy_kind,
            inject_code_version=inject_code_version,
            set_description=set_description,
        )(spec)

        def _observe(context: dg.AssetExecutionContext) -> dg.ObserveResult:
            store: mx.MetadataStore = getattr(context.resources, store_resource_key)

            with store:
                lazy_df = store.read_metadata(feature_key)
                stats = compute_stats_from_lazy_frame(lazy_df)

                # Call the user's function - it can return additional metadata
                extra_metadata = fn(context, store, lazy_df) or {}

            metadata: dict[str, Any] = {"dagster/row_count": stats.row_count}
            metadata.update(extra_metadata)

            return dg.ObserveResult(
                data_version=stats.data_version,
                metadata=metadata,
            )

        # Apply observable_source_asset decorator
        return dg.observable_source_asset(
            key=enriched.key,
            description=enriched.description,
            group_name=enriched.group_name,
            tags=dict(enriched.tags) if enriched.tags else None,
            metadata=dict(enriched.metadata) if enriched.metadata else None,
            required_resource_keys={store_resource_key},
            **observable_kwargs,
        )(_observe)

    return decorator

Dagster Types

metaxy.ext.dagster.dagster_type.feature_to_dagster_type

feature_to_dagster_type(feature: CoercibleToFeatureKey, *, name: str | None = None, description: str | None = None, inject_column_schema: bool = True, inject_column_lineage: bool = True, metadata: Mapping[str, Any] | None = None) -> DagsterType

Build a Dagster type from a Metaxy feature.

Creates a dagster.DagsterType that validates outputs are MetaxyOutput (i.e., narwhals-compatible dataframes or None) and includes metadata derived from the feature's Pydantic model fields.

Parameters:

  • feature (CoercibleToFeatureKey) –

    The Metaxy feature to create a type for. Can be a feature class, feature key, or string that can be coerced to a feature key.

  • name (str | None, default: None ) –

    Optional custom name for the DagsterType. Defaults to the feature's table name (e.g., "project__feature_name").

  • description (str | None, default: None ) –

    Optional custom description. Defaults to the feature class docstring or a generated description.

  • inject_column_schema (bool, default: True ) –

    Whether to inject the column schema as metadata. The schema is derived from Pydantic model fields.

  • inject_column_lineage (bool, default: True ) –

    Whether to inject column lineage as metadata. The lineage is derived from feature dependencies.

  • metadata (Mapping[str, Any] | None, default: None ) –

    Optional custom metadata to inject into the DagsterType.

Returns:

  • DagsterType

    A DagsterType configured for the Metaxy feature with appropriate

  • DagsterType

    type checking and metadata.

Tip

This is automatically injected by @metaxify

Example
import dagster as dg
import polars as pl
import metaxy.ext.dagster as mxd
from myproject.features import MyFeature  # Your Metaxy feature class

@mxd.metaxify(feature=MyFeature)
@dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
def my_asset():
    return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})

See also

  • metaxify: Decorator for injecting Metaxy metadata into Dagster assets.
  • MetaxyOutput: The type alias for valid Metaxy outputs.
Source code in src/metaxy/ext/dagster/dagster_type.py
def feature_to_dagster_type(
    feature: mx.CoercibleToFeatureKey,
    *,
    name: str | None = None,
    description: str | None = None,
    inject_column_schema: bool = True,
    inject_column_lineage: bool = True,
    metadata: Mapping[str, Any] | None = None,
) -> dg.DagsterType:
    """Build a Dagster type from a Metaxy feature.

    Creates a `dagster.DagsterType` that validates outputs are
    [`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput] (i.e., narwhals-compatible
    dataframes or `None`) and includes metadata derived from the feature's Pydantic
    model fields.

    Args:
        feature: The Metaxy feature to create a type for. Can be a feature class,
            feature key, or string that can be coerced to a feature key.
        name: Optional custom name for the DagsterType. Defaults to the feature's
            table name (e.g., "project__feature_name").
        description: Optional custom description. Defaults to the feature class
            docstring or a generated description.
        inject_column_schema: Whether to inject the column schema as metadata.
            The schema is derived from Pydantic model fields.
        inject_column_lineage: Whether to inject column lineage as metadata.
            The lineage is derived from feature dependencies.
        metadata: Optional custom metadata to inject into the DagsterType.

    Returns:
        A DagsterType configured for the Metaxy feature with appropriate
        type checking and metadata.

    !!! tip
        This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]

    Example:
        ```python
        import dagster as dg
        import polars as pl
        import metaxy.ext.dagster as mxd
        from myproject.features import MyFeature  # Your Metaxy feature class

        @mxd.metaxify(feature=MyFeature)
        @dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
        def my_asset():
            return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
        ```

    !!! info "See also"
        - [`metaxify`][metaxy.ext.dagster.metaxify.metaxify]: Decorator for injecting
          Metaxy metadata into Dagster assets.
        - [`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput]: The type alias for valid
          Metaxy outputs.
    """
    from metaxy.ext.dagster.io_manager import MetaxyOutput

    feature_key = mx.coerce_to_feature_key(feature)
    feature_cls = mx.get_feature_by_key(feature_key)

    # Determine name
    type_name = name or feature_key.table_name

    # Determine description
    if description is None:
        if feature_cls.__doc__:
            import inspect

            description = inspect.cleandoc(feature_cls.__doc__)
        else:
            description = f"Metaxy feature '{feature_key.to_string()}'."

    # Build metadata - start with custom metadata if provided
    final_metadata: dict[str, Any] = dict(metadata) if metadata else {}
    final_metadata[DAGSTER_METAXY_INFO_METADATA_KEY] = build_feature_info_metadata(
        feature_key
    )
    if inject_column_schema:
        column_schema = build_column_schema(feature_cls)
        if column_schema is not None:
            final_metadata[DAGSTER_COLUMN_SCHEMA_METADATA_KEY] = column_schema

    if inject_column_lineage:
        column_lineage = build_column_lineage(feature_cls)
        if column_lineage is not None:
            final_metadata[DAGSTER_COLUMN_LINEAGE_METADATA_KEY] = column_lineage

    dagster_type = dg.DagsterType(
        type_check_fn=_create_type_check_fn(feature_key),
        name=type_name,
        description=description,
        typing_type=MetaxyOutput,
        metadata=final_metadata,
    )

    return dagster_type

Dagster Event Generators

metaxy.ext.dagster.utils.generate_materialize_results

generate_materialize_results(context: AssetExecutionContext, store: MetadataStore | MetaxyStoreFromConfigResource, specs: Sequence[AssetSpec]) -> Iterator[MaterializeResult[None]]

Generate dagster.MaterializeResult events for assets in topological order.

Yields a MaterializeResult for each asset spec, sorted by their associated Metaxy features in topological order (dependencies before dependents). Each result includes the row count as "dagster/row_count" metadata.

Parameters:

Yields:

  • MaterializeResult[None]

    Materialization result for each asset in topological order.

Example
specs = [
    dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
    dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]

@metaxify
@dg.multi_asset(specs=specs)
def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
    # ... compute and write data ...
    yield from generate_materialize_results(context, store, context.asset_defs.specs)
Source code in src/metaxy/ext/dagster/utils.py
def generate_materialize_results(
    context: dg.AssetExecutionContext,
    store: mx.MetadataStore | MetaxyStoreFromConfigResource,
    specs: Sequence[dg.AssetSpec],
) -> Iterator[dg.MaterializeResult[None]]:
    """Generate `dagster.MaterializeResult` events for assets in topological order.

    Yields a `MaterializeResult` for each asset spec, sorted by their associated
    Metaxy features in topological order (dependencies before dependents).
    Each result includes the row count as `"dagster/row_count"` metadata.

    Args:
        context: The Dagster asset execution context.
        store: The Metaxy metadata store to read from.
        specs: Sequence of asset specs with `"metaxy/feature"` metadata set.

    Yields:
        Materialization result for each asset in topological order.

    Example:
        ```python
        specs = [
            dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
            dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
        ]

        @metaxify
        @dg.multi_asset(specs=specs)
        def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
            # ... compute and write data ...
            yield from generate_materialize_results(context, store, context.asset_defs.specs)
        ```
    """
    # Build mapping from feature key to asset spec
    spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
    for spec in specs:
        feature_key_raw = spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY)
        if feature_key_raw is None:
            raise ValueError(
                f"AssetSpec {spec.key} missing '{DAGSTER_METAXY_FEATURE_METADATA_KEY}' metadata"
            )
        feature_key = mx.coerce_to_feature_key(feature_key_raw)
        spec_by_feature_key[feature_key] = spec

    # Sort by topological order of feature keys
    graph = mx.FeatureGraph.get_active()
    sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))

    for key in sorted_keys:
        asset_spec = spec_by_feature_key[key]
        partition_filters = get_partition_filter(context, asset_spec)

        with store:
            # Get total stats (partition-filtered)
            lazy_df = store.read_metadata(key, filters=partition_filters)
            stats = compute_stats_from_lazy_frame(lazy_df)

            # Get materialized-in-run count if materialization_id is set
            materialized_in_run: int | None = None
            if store.materialization_id is not None:
                mat_filters = partition_filters + [
                    nw.col(METAXY_MATERIALIZATION_ID) == store.materialization_id
                ]
                mat_df = store.read_metadata(key, filters=mat_filters)
                materialized_in_run = mat_df.select(nw.len()).collect().item(0, 0)

        metadata: dict[str, int] = {"dagster/row_count": stats.row_count}
        if materialized_in_run is not None:
            metadata["metaxy/materialized_in_run"] = materialized_in_run

        yield dg.MaterializeResult(
            value=None,
            asset_key=asset_spec.key,
            metadata=metadata,
            data_version=stats.data_version,
        )

metaxy.ext.dagster.utils.generate_observe_results

Generate dagster.ObserveResult events for assets in topological order.

Yields an ObserveResult for each asset spec, sorted by their associated Metaxy features in topological order. Each result includes the row count as "dagster/row_count" metadata.

Parameters:

Yields:

  • ObserveResult

    Observation result for each asset in topological order.

Example
specs = [
    dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
    dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]

@metaxify
@dg.multi_observable_source_asset(specs=specs)
def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
    yield from generate_observe_results(context, store, context.asset_defs.specs)
Source code in src/metaxy/ext/dagster/utils.py
def generate_observe_results(
    context: dg.AssetExecutionContext,
    store: mx.MetadataStore | MetaxyStoreFromConfigResource,
    specs: Sequence[dg.AssetSpec],
) -> Iterator[dg.ObserveResult]:
    """Generate `dagster.ObserveResult` events for assets in topological order.

    Yields an `ObserveResult` for each asset spec, sorted by their associated
    Metaxy features in topological order.
    Each result includes the row count as `"dagster/row_count"` metadata.

    Args:
        context: The Dagster asset execution context.
        store: The Metaxy metadata store to read from.
        specs: Sequence of asset specs with `"metaxy/feature"` metadata set.

    Yields:
        Observation result for each asset in topological order.

    Example:
        ```python
        specs = [
            dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
            dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
        ]

        @metaxify
        @dg.multi_observable_source_asset(specs=specs)
        def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
            yield from generate_observe_results(context, store, context.asset_defs.specs)
        ```
    """
    # Build mapping from feature key to asset spec
    spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
    for spec in specs:
        feature_key_raw = spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY)
        if feature_key_raw is None:
            raise ValueError(
                f"AssetSpec {spec.key} missing '{DAGSTER_METAXY_FEATURE_METADATA_KEY}' metadata"
            )
        feature_key = mx.coerce_to_feature_key(feature_key_raw)
        spec_by_feature_key[feature_key] = spec

    # Sort by topological order of feature keys
    graph = mx.FeatureGraph.get_active()
    sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))

    for key in sorted_keys:
        asset_spec = spec_by_feature_key[key]
        filters = get_partition_filter(context, asset_spec)

        with store:
            lazy_df = store.read_metadata(key, filters=filters)
            stats = compute_stats_from_lazy_frame(lazy_df)

        yield dg.ObserveResult(
            asset_key=asset_spec.key,
            metadata={"dagster/row_count": stats.row_count},
            data_version=stats.data_version,
        )

metaxy.ext.dagster.utils.build_feature_info_metadata

build_feature_info_metadata(feature: CoercibleToFeatureKey) -> dict[str, Any]

Build feature info metadata dict for Dagster assets.

Creates a dictionary with information about the Metaxy feature that can be used as Dagster asset metadata under the "metaxy/feature_info" key.

Parameters:

Returns:

  • dict[str, Any]

    A nested dictionary containing:

  • dict[str, Any]
    • feature: Feature information
    • project: The project name
    • spec: The full feature spec as a dict (via model_dump())
    • version: The feature version string
    • type: The feature class module path
  • dict[str, Any]
    • metaxy: Metaxy library information
    • version: The metaxy library version

Tip

This is automatically injected by @metaxify

Example
from metaxy.ext.dagster.utils import build_feature_info_metadata

info = build_feature_info_metadata(MyFeature)
# {
#     "feature": {
#         "project": "my_project",
#         "spec": {...},  # Full FeatureSpec model_dump()
#         "version": "my__feature@abc123",
#         "type": "myproject.features",
#     },
#     "metaxy": {
#         "version": "0.1.0",
#     },
# }
Source code in src/metaxy/ext/dagster/utils.py
def build_feature_info_metadata(
    feature: mx.CoercibleToFeatureKey,
) -> dict[str, Any]:
    """Build feature info metadata dict for Dagster assets.

    Creates a dictionary with information about the Metaxy feature that can be
    used as Dagster asset metadata under the `"metaxy/feature_info"` key.

    Args:
        feature: The Metaxy feature (class, key, or string).

    Returns:
        A nested dictionary containing:

        - `feature`: Feature information
            - `project`: The project name
            - `spec`: The full feature spec as a dict (via `model_dump()`)
            - `version`: The feature version string
            - `type`: The feature class module path
        - `metaxy`: Metaxy library information
            - `version`: The metaxy library version

    !!! tip
        This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]

    Example:
        ```python
        from metaxy.ext.dagster.utils import build_feature_info_metadata

        info = build_feature_info_metadata(MyFeature)
        # {
        #     "feature": {
        #         "project": "my_project",
        #         "spec": {...},  # Full FeatureSpec model_dump()
        #         "version": "my__feature@abc123",
        #         "type": "myproject.features",
        #     },
        #     "metaxy": {
        #         "version": "0.1.0",
        #     },
        # }
        ```
    """
    feature_key = mx.coerce_to_feature_key(feature)
    feature_cls = mx.get_feature_by_key(feature_key)

    return {
        "feature": {
            "project": feature_cls.project,
            "spec": feature_cls.spec().model_dump(mode="json"),
            "version": feature_cls.feature_version(),
            "type": feature_cls.__module__,
        },
        "metaxy": {
            "version": mx.__version__,
            "plugins": mx.MetaxyConfig.get().plugins,
        },
    }

Resources

metaxy.ext.dagster.MetaxyIOManager

Bases: ConfigurableIOManager

MetaxyIOManager is a Dagster IOManager that reads and writes data to/from Metaxy's MetadataStore.

It automatically attaches Metaxy feature and store metadata to Dagster materialization events and handles partitioned assets.

Always set "metaxy/feature" Dagster metadata

This IOManager is using "metaxy/feature" Dagster metadata key to map Dagster assets into Metaxy features. It expects it to be set on the assets being loaded or materialized.

Example
import dagster as dg

@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
    }
)
def my_asset():
    ...

Defining Partitioned Assets

To tell Metaxy which column to use when filtering partitioned assets, set "partition_by" Dagster metadata key.

Example
import dagster as dg

@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
        "partition_by": "date",
    }
)
def my_partitioned_asset():
    ...

This key is commonly used to configure partitioning behavior by various Dagster IO managers.

Functions

metaxy.ext.dagster.MetaxyIOManager.load_input
load_input(context: InputContext) -> LazyFrame[Any]

Load feature metadata from MetadataStore.

Reads metadata for the feature specified in the asset's "metaxy/feature" metadata. For partitioned assets, filters to the current partition using the column specified in "partition_by" metadata.

Parameters:

  • context (InputContext) –

    Dagster input context containing asset metadata.

Returns:

  • LazyFrame[Any]

    A narwhals LazyFrame with the feature metadata.

Source code in src/metaxy/ext/dagster/io_manager.py
def load_input(self, context: "dg.InputContext") -> nw.LazyFrame[Any]:
    """Load feature metadata from [`MetadataStore`][metaxy.MetadataStore].

    Reads metadata for the feature specified in the asset's `"metaxy/feature"` metadata.
    For partitioned assets, filters to the current partition using the column specified
    in `"partition_by"` metadata.

    Args:
        context: Dagster input context containing asset metadata.

    Returns:
        A narwhals LazyFrame with the feature metadata.
    """
    with self.metadata_store:
        context.log.debug(
            f"Reading metadata for Metaxy feature {self._feature_key_from_context(context).to_string()} from {self.metadata_store.display()}"
        )

        # Build partition filter if applicable
        partition_col = (
            context.definition_metadata.get(DAGSTER_METAXY_PARTITION_KEY)
            if context.has_asset_partitions
            else None
        )
        partition_key = (
            context.asset_partition_key if context.has_asset_partitions else None
        )
        filters = build_partition_filter(
            partition_col,  # pyright: ignore[reportArgumentType]
            partition_key,
        )

        return self.metadata_store.read_metadata(
            feature=self._feature_key_from_context(context),
            filters=filters,
        )
metaxy.ext.dagster.MetaxyIOManager.handle_output
handle_output(context: OutputContext, obj: MetaxyOutput) -> None

Write feature metadata to MetadataStore.

Writes the output dataframe to the metadata store for the feature specified in the asset's "metaxy/feature" metadata. Also logs metadata about the feature and store to Dagster's materialization events.

If obj is None, only metadata logging is performed (no data is written).

Parameters:

  • context (OutputContext) –

    Dagster output context containing asset metadata.

  • obj (MetaxyOutput) –

    A narwhals-compatible dataframe to write, or None to skip writing.

Source code in src/metaxy/ext/dagster/io_manager.py
def handle_output(self, context: "dg.OutputContext", obj: MetaxyOutput) -> None:
    """Write feature metadata to [`MetadataStore`][metaxy.MetadataStore].

    Writes the output dataframe to the metadata store for the feature specified
    in the asset's `"metaxy/feature"` metadata. Also logs metadata about the
    feature and store to Dagster's materialization events.

    If `obj` is `None`, only metadata logging is performed (no data is written).

    Args:
        context: Dagster output context containing asset metadata.
        obj: A narwhals-compatible dataframe to write, or None to skip writing.
    """
    assert DAGSTER_METAXY_FEATURE_METADATA_KEY in context.definition_metadata, (
        f'Missing `"{DAGSTER_METAXY_FEATURE_METADATA_KEY}"` key in asset metadata'
    )
    key = self._feature_key_from_context(context)
    feature = mx.get_feature_by_key(key)

    if obj is not None:
        context.log.debug(
            f'Writing metadata for Metaxy feature "{key.to_string()}" into {self.metadata_store.display()}'
        )
        with self.metadata_store.open("write"):
            self.metadata_store.write_metadata(feature=feature, df=obj)
        context.log.debug(
            f'Metadata written for Metaxy feature "{key.to_string()}" into {self.metadata_store.display()}'
        )
    else:
        context.log.debug(
            f'The output corresponds to Metaxy feature "{key.to_string()}" stored in {self.metadata_store.display()}'
        )

    self._log_output_metadata(context)

metaxy.ext.dagster.MetaxyStoreFromConfigResource

Bases: ConfigurableResource[MetadataStore]

This resource creates a metaxy.MetadataStore based on the current Metaxy configuration (metaxy.toml).

Functions

metaxy.ext.dagster.MetaxyStoreFromConfigResource.create_resource
create_resource(context: InitResourceContext) -> MetadataStore

Create a MetadataStore from the Metaxy configuration.

Parameters:

Returns:

  • MetadataStore

    A MetadataStore configured with the Dagster run ID as the materialization ID.

Source code in src/metaxy/ext/dagster/resources.py
def create_resource(self, context: dg.InitResourceContext) -> mx.MetadataStore:
    """Create a MetadataStore from the Metaxy configuration.

    Args:
        context: Dagster resource initialization context.

    Returns:
        A MetadataStore configured with the Dagster run ID as the materialization ID.
    """
    assert context.run is not None
    return mx.MetaxyConfig.get().get_store(
        self.name, materialization_id=context.run.run_id
    )

Helpers

metaxy.ext.dagster.utils.FeatureStats

Bases: NamedTuple

Statistics about a feature's metadata for Dagster events.

metaxy.ext.dagster.selection.select_metaxy_assets

select_metaxy_assets(*, project: str | None = None, feature: CoercibleToFeatureKey | None = None) -> AssetSelection

Select Metaxy assets by project and/or feature.

This helper creates an AssetSelection that filters assets tagged by @metaxify.

Parameters:

  • project (str | None, default: None ) –

    Filter by project name. If None, uses MetaxyConfig.get().project.

  • feature (CoercibleToFeatureKey | None, default: None ) –

    Filter by specific feature key. If provided, further narrows the selection.

Returns:

  • AssetSelection

    An AssetSelection that can be used with dg.define_asset_job,

  • AssetSelection

    dg.materialize, or AssetSelection operations like | and &.

Select all Metaxy assets in current project
import metaxy.ext.dagster as mxd

all_metaxy = mxd.select_metaxy_assets()
Select assets for a specific project
prod_assets = mxd.select_metaxy_assets(project="production")
Select a specific feature's assets
feature_assets = mxd.select_metaxy_assets(feature="my/feature/key")
Use with asset jobs
metaxy_job = dg.define_asset_job(
    name="materialize_metaxy",
    selection=mxd.select_metaxy_assets(),
)
Combine with other selections
# All metaxy assets plus some other assets
combined = mxd.select_metaxy_assets() | dg.AssetSelection.keys("other_asset")

# Metaxy assets that are also in a specific group
filtered = mxd.select_metaxy_assets() & dg.AssetSelection.groups("my_group")
Source code in src/metaxy/ext/dagster/selection.py
def select_metaxy_assets(
    *,
    project: str | None = None,
    feature: mx.CoercibleToFeatureKey | None = None,
) -> dg.AssetSelection:
    """Select Metaxy assets by project and/or feature.

    This helper creates an `AssetSelection` that filters assets tagged by `@metaxify`.

    Args:
        project: Filter by project name. If None, uses `MetaxyConfig.get().project`.
        feature: Filter by specific feature key. If provided, further narrows the selection.

    Returns:
        An `AssetSelection` that can be used with `dg.define_asset_job`,
        `dg.materialize`, or `AssetSelection` operations like `|` and `&`.

    Example: Select all Metaxy assets in current project
        ```python
        import metaxy.ext.dagster as mxd

        all_metaxy = mxd.select_metaxy_assets()
        ```

    Example: Select assets for a specific project
        ```python
        prod_assets = mxd.select_metaxy_assets(project="production")
        ```

    Example: Select a specific feature's assets
        ```python
        feature_assets = mxd.select_metaxy_assets(feature="my/feature/key")
        ```

    Example: Use with asset jobs
        ```python
        metaxy_job = dg.define_asset_job(
            name="materialize_metaxy",
            selection=mxd.select_metaxy_assets(),
        )
        ```

    Example: Combine with other selections
        ```python
        # All metaxy assets plus some other assets
        combined = mxd.select_metaxy_assets() | dg.AssetSelection.keys("other_asset")

        # Metaxy assets that are also in a specific group
        filtered = mxd.select_metaxy_assets() & dg.AssetSelection.groups("my_group")
        ```
    """
    resolved_project = project if project is not None else mx.MetaxyConfig.get().project

    selection = dg.AssetSelection.tag(DAGSTER_METAXY_PROJECT_TAG_KEY, resolved_project)

    if feature is not None:
        feature_key = mx.coerce_to_feature_key(feature)
        selection = selection & dg.AssetSelection.tag(
            DAGSTER_METAXY_FEATURE_METADATA_KEY, str(feature_key)
        )

    return selection

Types

metaxy.ext.dagster.MetaxyOutput module-attribute

MetaxyOutput = IntoFrame | None

Constants

metaxy.ext.dagster.DAGSTER_METAXY_FEATURE_METADATA_KEY module-attribute

DAGSTER_METAXY_FEATURE_METADATA_KEY = 'metaxy/feature'

metaxy.ext.dagster.DAGSTER_METAXY_KIND module-attribute

DAGSTER_METAXY_KIND = 'metaxy'

metaxy.ext.dagster.DAGSTER_METAXY_INFO_METADATA_KEY module-attribute

DAGSTER_METAXY_INFO_METADATA_KEY = 'metaxy/info'

metaxy.ext.dagster.DAGSTER_METAXY_PARTITION_KEY module-attribute

DAGSTER_METAXY_PARTITION_KEY = 'partition_by'