Skip to content

Dagster Integration API

Integration docs


metaxy.ext.dagster

Decorators

metaxy.ext.dagster.metaxify.metaxify

metaxify(_asset: _T | None = None, *, key: CoercibleToAssetKey | None = None, key_prefix: CoercibleToAssetKeyPrefix | None = None, inject_metaxy_kind: bool = True, inject_code_version: bool = True, set_description: bool = True, inject_column_schema: bool = True, inject_column_lineage: bool = True)

Inject Metaxy metadata into a Dagster AssetsDefinition or AssetSpec.

Affects assets with metaxy/feature metadata set.

Learn more about @metaxify and see example screenshots here.

Parameters:

  • key (CoercibleToAssetKey | None, default: None ) –

    Explicit asset key that overrides all other key resolution logic. Cannot be used with key_prefix or with multi-asset definitions that produce multiple outputs.

  • key_prefix (CoercibleToAssetKeyPrefix | None, default: None ) –

    Prefix to prepend to the resolved asset key. Also applied to upstream dependency keys. Cannot be used with key.

  • inject_metaxy_kind (bool, default: True ) –

    Whether to inject "metaxy" kind into asset kinds.

  • inject_code_version (bool, default: True ) –

    Whether to inject the Metaxy feature code version into the asset's code version. The version is appended in the format metaxy:<version>.

  • set_description (bool, default: True ) –

    Whether to set the asset description from the feature class docstring if the asset doesn't already have a description.

  • inject_column_schema (bool, default: True ) –

    Whether to inject Pydantic field definitions as Dagster column schema. Field types are converted to strings, and field descriptions are used as column descriptions.

  • inject_column_lineage (bool, default: True ) –

    Whether to inject column-level lineage into the asset metadata under dagster/column_lineage. Uses Pydantic model fields to track column provenance via FeatureDep.rename, FeatureDep.lineage, and direct pass-through.

Tip

Multiple Dagster assets can contribute to the same Metaxy feature. This is a perfectly valid setup since Metaxy writes are append-only. In order to do this, set the following metadata keys:

- `"metaxy/feature"` pointing to the same Metaxy feature key
- `"metaxy/partition"` should be set to a dictionary mapping column names to values produced by the specific Dagster asset

Example

import dagster as dg
import metaxy as mx
import metaxy.ext.dagster as mxd

@mxd.metaxify()
@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key"
    },
)
def my_asset(store: mx.MetadataStore):
    with store:
        increment = store.resolve_update("my/feature/key")
    ...
With @multi_asset

Multiple Metaxy features can be produced by the same @multi_asset. (1)

  1. Typically, they are produced independently of each other
@mxd.metaxify()
@dg.multi_asset(
    specs=[
        dg.AssetSpec("output_a", metadata={"metaxy/feature": "feature/a"}),
        dg.AssetSpec("output_b", metadata={"metaxy/feature": "feature/b"}),
    ]
)
def my_multi_asset():
    ...
With dagster.AssetSpec
asset_spec = dg.AssetSpec(
    key="my_asset",
    metadata={"metaxy/feature": "my/feature/key"},
)
asset_spec = mxd.metaxify()(asset_spec)
Multiple Dagster assets contributing to the same Metaxy feature
@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
        "metaxy/partition": {"dataset": "a"},
    },
)
def my_feature_dataset_a():
    ...

@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
        "metaxy/partition": {"dataset": "b"},
    },
)
def my_feature_dataset_b():
    ...
Source code in src/metaxy/ext/dagster/metaxify.py
def __init__(
    self,
    _asset: "_T | None" = None,
    *,
    key: CoercibleToAssetKey | None = None,
    key_prefix: CoercibleToAssetKeyPrefix | None = None,
    inject_metaxy_kind: bool = True,
    inject_code_version: bool = True,
    set_description: bool = True,
    inject_column_schema: bool = True,
    inject_column_lineage: bool = True,
) -> None:
    # Actual initialization happens in __new__, but we set defaults here for type checkers
    self.key = dg.AssetKey.from_coercible(key) if key is not None else None
    self.key_prefix = (
        dg.AssetKey.from_coercible(key_prefix) if key_prefix is not None else None
    )
    self.inject_metaxy_kind = inject_metaxy_kind
    self.inject_code_version = inject_code_version
    self.set_description = set_description
    self.inject_column_schema = inject_column_schema
    self.inject_column_lineage = inject_column_lineage

metaxy.ext.dagster.observable.observable_metaxy_asset

observable_metaxy_asset(feature: CoercibleToFeatureKey, *, store_resource_key: str = 'store', inject_metaxy_kind: bool = True, inject_code_version: bool = True, set_description: bool = True, **observable_kwargs: Any)

Decorator to create an observable source asset for a Metaxy feature.

The observation reads the feature's metadata from the store, counts rows, and uses mean(metaxy_created_at) as the data version to track changes. Using mean ensures that both additions and deletions are detected.

The decorated function receives (context, store, lazy_df) and can return a dict of additional metadata to include in the observation.

Parameters:

  • feature (CoercibleToFeatureKey) –

    The Metaxy feature to observe.

  • store_resource_key (str, default: 'store' ) –

    Resource key for the MetadataStore (default: "store").

  • inject_metaxy_kind (bool, default: True ) –

    Whether to inject "metaxy" kind into asset kinds.

  • inject_code_version (bool, default: True ) –

    Whether to inject the Metaxy feature code version.

  • set_description (bool, default: True ) –

    Whether to set description from feature class docstring.

  • **observable_kwargs (Any, default: {} ) –

    Passed to @observable_source_asset (key, group_name, tags, metadata, description, partitions_def, etc.)

Example
import metaxy.ext.dagster as mxd
from myproject.features import ExternalFeature

@mxd.observable_metaxy_asset(feature=ExternalFeature)
def external_data(context, store, lazy_df):
    pass

# With custom metadata - return a dict
@mxd.observable_metaxy_asset(feature=ExternalFeature)
def external_data_with_metrics(context, store, lazy_df):
    # Run aggregations in the database
    total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
    return {"custom/total": total}
Note

observable_source_asset does not support deps. Upstream Metaxy feature dependencies from the feature spec are not propagated to the SourceAsset.

Source code in src/metaxy/ext/dagster/observable.py
def observable_metaxy_asset(
    feature: mx.CoercibleToFeatureKey,
    *,
    store_resource_key: str = "store",
    # metaxify kwargs
    inject_metaxy_kind: bool = True,
    inject_code_version: bool = True,
    set_description: bool = True,
    # observable_source_asset kwargs
    **observable_kwargs: Any,
):
    """Decorator to create an observable source asset for a Metaxy feature.

    The observation reads the feature's metadata from the store, counts rows,
    and uses `mean(metaxy_created_at)` as the data version to track changes.
    Using mean ensures that both additions and deletions are detected.

    The decorated function receives `(context, store, lazy_df)` and can return
    a dict of additional metadata to include in the observation.

    Args:
        feature: The Metaxy feature to observe.
        store_resource_key: Resource key for the MetadataStore (default: `"store"`).
        inject_metaxy_kind: Whether to inject `"metaxy"` kind into asset kinds.
        inject_code_version: Whether to inject the Metaxy feature code version.
        set_description: Whether to set description from feature class docstring.
        **observable_kwargs: Passed to `@observable_source_asset`
            (key, group_name, tags, metadata, description, partitions_def, etc.)

    Example:
        ```python
        import metaxy.ext.dagster as mxd
        from myproject.features import ExternalFeature

        @mxd.observable_metaxy_asset(feature=ExternalFeature)
        def external_data(context, store, lazy_df):
            pass

        # With custom metadata - return a dict
        @mxd.observable_metaxy_asset(feature=ExternalFeature)
        def external_data_with_metrics(context, store, lazy_df):
            # Run aggregations in the database
            total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
            return {"custom/total": total}
        ```

    Note:
        `observable_source_asset` does not support `deps`. Upstream Metaxy feature
        dependencies from the feature spec are not propagated to the SourceAsset.
    """
    feature_key = mx.coerce_to_feature_key(feature)

    def decorator(fn: Callable[..., Any]) -> dg.SourceAsset:
        # Build an AssetSpec from kwargs and enrich with metaxify
        # Merge user metadata with metaxy/feature
        user_metadata = observable_kwargs.pop("metadata", None) or {}
        spec = dg.AssetSpec(
            key=observable_kwargs.pop("key", None) or fn.__name__,  # ty: ignore[unresolved-attribute]
            group_name=observable_kwargs.pop("group_name", None),
            tags=observable_kwargs.pop("tags", None),
            metadata={
                **user_metadata,
                DAGSTER_METAXY_FEATURE_METADATA_KEY: feature_key.to_string(),
            },
            description=observable_kwargs.pop("description", None),
        )
        enriched = metaxify(
            inject_metaxy_kind=inject_metaxy_kind,
            inject_code_version=inject_code_version,
            set_description=set_description,
        )(spec)

        def _observe(context: dg.AssetExecutionContext) -> dg.ObserveResult:
            store: mx.MetadataStore = getattr(context.resources, store_resource_key)

            # Check for metaxy/partition metadata to apply filtering
            metaxy_partition = enriched.metadata.get(
                DAGSTER_METAXY_PARTITION_METADATA_KEY
            )
            filters = build_metaxy_partition_filter(metaxy_partition)

            with store:
                lazy_df = store.read_metadata(feature_key, filters=filters)
                stats = compute_stats_from_lazy_frame(lazy_df)

                # Call the user's function - it can return additional metadata
                extra_metadata = fn(context, store, lazy_df) or {}

            metadata: dict[str, Any] = {"dagster/row_count": stats.row_count}
            metadata.update(extra_metadata)

            return dg.ObserveResult(
                data_version=stats.data_version,
                metadata=metadata,
                tags=build_feature_event_tags(feature_key),
            )

        # Apply observable_source_asset decorator
        return dg.observable_source_asset(
            key=enriched.key,
            description=enriched.description,
            group_name=enriched.group_name,
            tags=dict(enriched.tags) if enriched.tags else None,
            metadata=dict(enriched.metadata) if enriched.metadata else None,
            required_resource_keys={store_resource_key},
            **observable_kwargs,
        )(_observe)

    return decorator

IO Manager

metaxy.ext.dagster.MetaxyIOManager

Bases: ConfigurableIOManager

MetaxyIOManager is a Dagster IOManager that reads and writes data to/from Metaxy's MetadataStore.

It automatically attaches Metaxy feature and store metadata to Dagster materialization events and handles partitioned assets.

Always set "metaxy/feature" Dagster metadata

This IOManager is using "metaxy/feature" Dagster metadata key to map Dagster assets into Metaxy features. It expects it to be set on the assets being loaded or materialized.

Example
import dagster as dg

@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
    }
)
def my_asset():
    ...

Defining Partitioned Assets

To tell Metaxy which column to use when filtering partitioned assets, set "partition_by" Dagster metadata key.

Example
import dagster as dg

@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
        "partition_by": "date",
    }
)
def my_partitioned_asset():
    ...

This key is commonly used to configure partitioning behavior by various Dagster IO managers.

Functions

metaxy.ext.dagster.MetaxyIOManager.load_input
load_input(context: InputContext) -> LazyFrame[Any]

Load feature metadata from MetadataStore.

Reads metadata for the feature specified in the asset's "metaxy/feature" metadata. For partitioned assets, filters to the current partition using the column specified in "partition_by" metadata.

Parameters:

  • context (InputContext) –

    Dagster input context containing asset metadata.

Returns:

  • LazyFrame[Any]

    A narwhals LazyFrame with the feature metadata.

Source code in src/metaxy/ext/dagster/io_manager.py
def load_input(self, context: "dg.InputContext") -> nw.LazyFrame[Any]:
    """Load feature metadata from [`MetadataStore`][metaxy.MetadataStore].

    Reads metadata for the feature specified in the asset's `"metaxy/feature"` metadata.
    For partitioned assets, filters to the current partition using the column specified
    in `"partition_by"` metadata.

    Args:
        context: Dagster input context containing asset metadata.

    Returns:
        A narwhals LazyFrame with the feature metadata.
    """
    with self.metadata_store:
        feature_key = self._feature_key_from_context(context)
        store_metadata = self.metadata_store.get_store_metadata(feature_key)

        # Build input metadata, transforming special keys to dagster standard format
        input_metadata: dict[str, Any] = {}
        for key, value in store_metadata.items():
            if key == "display":
                input_metadata["metaxy/store"] = value
            elif key == "table_name":
                input_metadata["dagster/table_name"] = value
            elif key == "uri":
                input_metadata["dagster/uri"] = dg.MetadataValue.path(value)
            else:
                input_metadata[key] = value

        # Only add input metadata if we have exactly one partition key
        # (add_input_metadata internally uses asset_partition_key which fails with multiple)
        # TODO: raise an issue in Dagter
        # or implement our own observation logging for multiple partition keys
        has_single_partition = (
            context.has_asset_partitions
            and len(list(context.asset_partition_keys)) == 1
        )
        if input_metadata and (
            not context.has_asset_partitions or has_single_partition
        ):
            context.add_input_metadata(
                input_metadata, description="Metadata Store Info"
            )

        # Build partition filters from context (handles partition_by and metaxy/partition)
        filters = build_partition_filter_from_input_context(context)

        return self.metadata_store.read_metadata(
            feature=feature_key,
            filters=filters,
        )
metaxy.ext.dagster.MetaxyIOManager.handle_output
handle_output(context: OutputContext, obj: MetaxyOutput) -> None

Write feature metadata to MetadataStore.

Writes the output dataframe to the metadata store for the feature specified in the asset's "metaxy/feature" metadata. Also logs metadata about the feature and store to Dagster's materialization events.

If obj is None, only metadata logging is performed (no data is written).

Parameters:

  • context (OutputContext) –

    Dagster output context containing asset metadata.

  • obj (MetaxyOutput) –

    A narwhals-compatible dataframe to write, or None to skip writing.

Source code in src/metaxy/ext/dagster/io_manager.py
def handle_output(self, context: "dg.OutputContext", obj: MetaxyOutput) -> None:
    """Write feature metadata to [`MetadataStore`][metaxy.MetadataStore].

    Writes the output dataframe to the metadata store for the feature specified
    in the asset's `"metaxy/feature"` metadata. Also logs metadata about the
    feature and store to Dagster's materialization events.

    If `obj` is `None`, only metadata logging is performed (no data is written).

    Args:
        context: Dagster output context containing asset metadata.
        obj: A narwhals-compatible dataframe to write, or None to skip writing.
    """
    assert DAGSTER_METAXY_FEATURE_METADATA_KEY in context.definition_metadata, (
        f'Missing `"{DAGSTER_METAXY_FEATURE_METADATA_KEY}"` key in asset metadata'
    )
    key = self._feature_key_from_context(context)
    feature = mx.get_feature_by_key(key)

    if obj is not None:
        context.log.debug(
            f'Writing metadata for Metaxy feature "{key.to_string()}" into {self.metadata_store.display()}'
        )
        with self.metadata_store.open("write"):
            self.metadata_store.write_metadata(feature=feature, df=obj)
        context.log.debug(
            f'Metadata written for Metaxy feature "{key.to_string()}" into {self.metadata_store.display()}'
        )
    else:
        context.log.debug(
            f'The output corresponds to Metaxy feature "{key.to_string()}" stored in {self.metadata_store.display()}'
        )

    self._log_output_metadata(context)

Dagster Types

metaxy.ext.dagster.dagster_type.feature_to_dagster_type

feature_to_dagster_type(feature: CoercibleToFeatureKey, *, name: str | None = None, description: str | None = None, inject_column_schema: bool = True, inject_column_lineage: bool = True, metadata: Mapping[str, Any] | None = None) -> DagsterType

Build a Dagster type from a Metaxy feature.

Creates a dagster.DagsterType that validates outputs are MetaxyOutput (i.e., narwhals-compatible dataframes or None) and includes metadata derived from the feature's Pydantic model fields.

Parameters:

  • feature (CoercibleToFeatureKey) –

    The Metaxy feature to create a type for. Can be a feature class, feature key, or string that can be coerced to a feature key.

  • name (str | None, default: None ) –

    Optional custom name for the DagsterType. Defaults to the feature's table name (e.g., "project__feature_name").

  • description (str | None, default: None ) –

    Optional custom description. Defaults to the feature class docstring or a generated description.

  • inject_column_schema (bool, default: True ) –

    Whether to inject the column schema as metadata. The schema is derived from Pydantic model fields.

  • inject_column_lineage (bool, default: True ) –

    Whether to inject column lineage as metadata. The lineage is derived from feature dependencies.

  • metadata (Mapping[str, Any] | None, default: None ) –

    Optional custom metadata to inject into the DagsterType.

Returns:

  • DagsterType

    A DagsterType configured for the Metaxy feature with appropriate

  • DagsterType

    type checking and metadata.

Tip

This is automatically injected by @metaxify

Example
import dagster as dg
import polars as pl
import metaxy.ext.dagster as mxd
from myproject.features import MyFeature  # Your Metaxy feature class

@mxd.metaxify(feature=MyFeature)
@dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
def my_asset():
    return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})

See also

  • metaxify: Decorator for injecting Metaxy metadata into Dagster assets.
  • MetaxyOutput: The type alias for valid Metaxy outputs.
Source code in src/metaxy/ext/dagster/dagster_type.py
def feature_to_dagster_type(
    feature: mx.CoercibleToFeatureKey,
    *,
    name: str | None = None,
    description: str | None = None,
    inject_column_schema: bool = True,
    inject_column_lineage: bool = True,
    metadata: Mapping[str, Any] | None = None,
) -> dg.DagsterType:
    """Build a Dagster type from a Metaxy feature.

    Creates a `dagster.DagsterType` that validates outputs are
    [`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput] (i.e., narwhals-compatible
    dataframes or `None`) and includes metadata derived from the feature's Pydantic
    model fields.

    Args:
        feature: The Metaxy feature to create a type for. Can be a feature class,
            feature key, or string that can be coerced to a feature key.
        name: Optional custom name for the DagsterType. Defaults to the feature's
            table name (e.g., "project__feature_name").
        description: Optional custom description. Defaults to the feature class
            docstring or a generated description.
        inject_column_schema: Whether to inject the column schema as metadata.
            The schema is derived from Pydantic model fields.
        inject_column_lineage: Whether to inject column lineage as metadata.
            The lineage is derived from feature dependencies.
        metadata: Optional custom metadata to inject into the DagsterType.

    Returns:
        A DagsterType configured for the Metaxy feature with appropriate
        type checking and metadata.

    !!! tip
        This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]

    Example:
        ```python
        import dagster as dg
        import polars as pl
        import metaxy.ext.dagster as mxd
        from myproject.features import MyFeature  # Your Metaxy feature class

        @mxd.metaxify(feature=MyFeature)
        @dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
        def my_asset():
            return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
        ```

    !!! info "See also"
        - [`metaxify`][metaxy.ext.dagster.metaxify.metaxify]: Decorator for injecting
          Metaxy metadata into Dagster assets.
        - [`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput]: The type alias for valid
          Metaxy outputs.
    """
    from metaxy.ext.dagster.io_manager import MetaxyOutput

    feature_key = mx.coerce_to_feature_key(feature)
    feature_cls = mx.get_feature_by_key(feature_key)

    # Determine name
    type_name = name or feature_key.table_name

    # Determine description
    if description is None:
        if feature_cls.__doc__:
            import inspect

            description = inspect.cleandoc(feature_cls.__doc__)
        else:
            description = f"Metaxy feature '{feature_key.to_string()}'."

    # Build metadata - start with custom metadata if provided
    final_metadata: dict[str, Any] = dict(metadata) if metadata else {}
    final_metadata[DAGSTER_METAXY_INFO_METADATA_KEY] = build_feature_info_metadata(
        feature_key
    )
    if inject_column_schema:
        column_schema = build_column_schema(feature_cls)
        if column_schema is not None:
            final_metadata[DAGSTER_COLUMN_SCHEMA_METADATA_KEY] = column_schema

    if inject_column_lineage:
        column_lineage = build_column_lineage(feature_cls)
        if column_lineage is not None:
            final_metadata[DAGSTER_COLUMN_LINEAGE_METADATA_KEY] = column_lineage

    dagster_type = dg.DagsterType(
        type_check_fn=_create_type_check_fn(feature_key),
        name=type_name,
        description=description,
        typing_type=MetaxyOutput,
        metadata=final_metadata,
    )

    return dagster_type

Dagster Event Generators

metaxy.ext.dagster.utils.generate_materialize_results

generate_materialize_results(context: AssetExecutionContext, store: MetadataStore | MetaxyStoreFromConfigResource, specs: Iterable[AssetSpec] | None = None) -> Iterator[MaterializeResult[None]]

Generate dagster.MaterializeResult events for assets in topological order.

Yields a MaterializeResult for each asset spec, sorted by their associated Metaxy features in topological order (dependencies before dependents). Each result includes the row count as "dagster/row_count" metadata.

Parameters:

Yields:

  • MaterializeResult[None]

    Materialization result for each asset in topological order.

Example
specs = [
    dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
    dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]

@metaxify
@dg.multi_asset(specs=specs)
def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
    # ... compute and write data ...
    yield from generate_materialize_results(context, store)
Source code in src/metaxy/ext/dagster/utils.py
def generate_materialize_results(
    context: dg.AssetExecutionContext,
    store: mx.MetadataStore | MetaxyStoreFromConfigResource,
    specs: Iterable[dg.AssetSpec] | None = None,
) -> Iterator[dg.MaterializeResult[None]]:
    """Generate `dagster.MaterializeResult` events for assets in topological order.

    Yields a `MaterializeResult` for each asset spec, sorted by their associated
    Metaxy features in topological order (dependencies before dependents).
    Each result includes the row count as `"dagster/row_count"` metadata.

    Args:
        context: The Dagster asset execution context.
        store: The Metaxy metadata store to read from.
        specs: Optional, concrete Dagster asset specs.
            If missing, specs will be taken from the context.

    Yields:
        Materialization result for each asset in topological order.

    Example:
        ```python
        specs = [
            dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
            dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
        ]

        @metaxify
        @dg.multi_asset(specs=specs)
        def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
            # ... compute and write data ...
            yield from generate_materialize_results(context, store)
        ```
    """
    # Build mapping from feature key to asset spec
    spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
    specs = specs or context.assets_def.specs
    for spec in specs:
        if feature_key_raw := spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY):
            feature_key = mx.coerce_to_feature_key(feature_key_raw)
            spec_by_feature_key[feature_key] = spec

    # Sort by topological order of feature keys
    graph = mx.FeatureGraph.get_active()
    sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))

    for key in sorted_keys:
        asset_spec = spec_by_feature_key[key]
        partition_col = asset_spec.metadata.get(DAGSTER_METAXY_PARTITION_KEY)
        metaxy_partition = asset_spec.metadata.get(
            DAGSTER_METAXY_PARTITION_METADATA_KEY
        )

        with store:  # ty: ignore[invalid-context-manager]
            try:
                # Build runtime metadata (handles reading, filtering, and stats internally)
                metadata, stats = build_runtime_feature_metadata(
                    key,
                    store,
                    context,
                    partition_col=partition_col,
                    metaxy_partition=metaxy_partition,
                )
            except FeatureNotFoundError:
                context.log.exception(
                    f"Feature {key.to_string()} not found in store, skipping materialization result"
                )
                continue

            # Get materialized-in-run count if materialization_id is set
            if store.materialization_id is not None:  # ty: ignore[possibly-missing-attribute]
                mat_df = store.read_metadata(  # ty: ignore[possibly-missing-attribute]
                    key,
                    filters=[
                        nw.col(METAXY_MATERIALIZATION_ID) == store.materialization_id  # ty: ignore[possibly-missing-attribute]
                    ],
                )
                metadata["metaxy/materialized_in_run"] = (
                    mat_df.select(nw.len()).collect().item(0, 0)
                )

        yield dg.MaterializeResult(
            value=None,
            asset_key=asset_spec.key,
            metadata=metadata,
            data_version=stats.data_version,
            tags=build_feature_event_tags(key),
        )

metaxy.ext.dagster.utils.generate_observe_results

generate_observe_results(context: AssetExecutionContext, store: MetadataStore | MetaxyStoreFromConfigResource, specs: Iterable[AssetSpec] | None = None) -> Iterator[ObserveResult]

Generate dagster.ObserveResult events for assets in topological order.

Yields an ObserveResult for each asset spec that has "metaxy/feature" metadata key set, sorted by their associated Metaxy features in topological order. Each result includes the row count as "dagster/row_count" metadata.

Parameters:

Yields:

  • ObserveResult

    Observation result for each asset in topological order.

Example
specs = [
    dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
    dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]

@metaxify
@dg.multi_observable_source_asset(specs=specs)
def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
    yield from generate_observe_results(context, store)
Source code in src/metaxy/ext/dagster/utils.py
def generate_observe_results(
    context: dg.AssetExecutionContext,
    store: mx.MetadataStore | MetaxyStoreFromConfigResource,
    specs: Iterable[dg.AssetSpec] | None = None,
) -> Iterator[dg.ObserveResult]:
    """Generate `dagster.ObserveResult` events for assets in topological order.

    Yields an `ObserveResult` for each asset spec that has `"metaxy/feature"` metadata key set, sorted by their associated
    Metaxy features in topological order.
    Each result includes the row count as `"dagster/row_count"` metadata.

    Args:
        context: The Dagster asset execution context.
        store: The Metaxy metadata store to read from.
        specs: Optional, concrete Dagster asset specs.
            If missing, this function will take the current specs from the context.

    Yields:
        Observation result for each asset in topological order.

    Example:
        ```python
        specs = [
            dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
            dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
        ]

        @metaxify
        @dg.multi_observable_source_asset(specs=specs)
        def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
            yield from generate_observe_results(context, store)
        ```
    """
    # Build mapping from feature key to asset spec
    spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
    specs = specs or context.assets_def.specs

    for spec in specs:
        if feature_key_raw := spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY):
            feature_key = mx.coerce_to_feature_key(feature_key_raw)
            spec_by_feature_key[feature_key] = spec

    # Sort by topological order of feature keys
    graph = mx.FeatureGraph.get_active()
    sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))

    for key in sorted_keys:
        asset_spec = spec_by_feature_key[key]
        partition_col = asset_spec.metadata.get(DAGSTER_METAXY_PARTITION_KEY)
        metaxy_partition = asset_spec.metadata.get(
            DAGSTER_METAXY_PARTITION_METADATA_KEY
        )

        with store:  # ty: ignore[invalid-context-manager]
            try:
                # Build runtime metadata (handles reading, filtering, and stats internally)
                # For observers with no metaxy_partition, this reads all data
                metadata, stats = build_runtime_feature_metadata(
                    key,
                    store,
                    context,
                    partition_col=partition_col,
                    metaxy_partition=metaxy_partition,
                )
            except FeatureNotFoundError:
                context.log.exception(
                    f"Feature {key.to_string()} not found in store, skipping observation result"
                )
                continue

        yield dg.ObserveResult(
            asset_key=asset_spec.key,
            metadata=metadata,
            data_version=stats.data_version,
            tags=build_feature_event_tags(key),
        )

metaxy.ext.dagster.utils.build_feature_info_metadata

build_feature_info_metadata(feature: CoercibleToFeatureKey) -> dict[str, Any]

Build feature info metadata dict for Dagster assets.

Creates a dictionary with information about the Metaxy feature that can be used as Dagster asset metadata under the "metaxy/feature_info" key.

Parameters:

Returns:

  • dict[str, Any]

    A nested dictionary containing:

  • dict[str, Any]
    • feature: Feature information
    • project: The project name
    • spec: The full feature spec as a dict (via model_dump())
    • version: The feature version string
    • type: The feature class module path
  • dict[str, Any]
    • metaxy: Metaxy library information
    • version: The metaxy library version

Tip

This is automatically injected by @metaxify

Example
from metaxy.ext.dagster.utils import build_feature_info_metadata

info = build_feature_info_metadata(MyFeature)
# {
#     "feature": {
#         "project": "my_project",
#         "spec": {...},  # Full FeatureSpec model_dump()
#         "version": "my__feature@abc123",
#         "type": "myproject.features",
#     },
#     "metaxy": {
#         "version": "0.1.0",
#     },
# }
Source code in src/metaxy/ext/dagster/utils.py
def build_feature_info_metadata(
    feature: mx.CoercibleToFeatureKey,
) -> dict[str, Any]:
    """Build feature info metadata dict for Dagster assets.

    Creates a dictionary with information about the Metaxy feature that can be
    used as Dagster asset metadata under the `"metaxy/feature_info"` key.

    Args:
        feature: The Metaxy feature (class, key, or string).

    Returns:
        A nested dictionary containing:

        - `feature`: Feature information
            - `project`: The project name
            - `spec`: The full feature spec as a dict (via `model_dump()`)
            - `version`: The feature version string
            - `type`: The feature class module path
        - `metaxy`: Metaxy library information
            - `version`: The metaxy library version

    !!! tip
        This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]

    Example:
        ```python
        from metaxy.ext.dagster.utils import build_feature_info_metadata

        info = build_feature_info_metadata(MyFeature)
        # {
        #     "feature": {
        #         "project": "my_project",
        #         "spec": {...},  # Full FeatureSpec model_dump()
        #         "version": "my__feature@abc123",
        #         "type": "myproject.features",
        #     },
        #     "metaxy": {
        #         "version": "0.1.0",
        #     },
        # }
        ```
    """
    feature_key = mx.coerce_to_feature_key(feature)
    feature_cls = mx.get_feature_by_key(feature_key)

    return {
        "feature": {
            "project": feature_cls.project,
            "spec": feature_cls.spec().model_dump(mode="json"),
            "version": feature_cls.feature_version(),
            "type": feature_cls.__module__,
        },
        "metaxy": {
            "version": mx.__version__,
            "plugins": mx.MetaxyConfig.get().plugins,
        },
    }

Observation Jobs

metaxy.ext.dagster.observation_job.build_metaxy_multi_observation_job

build_metaxy_multi_observation_job(name: str, *, asset_selection: AssetSelection | None = None, defs: Definitions | None = None, assets: Sequence[AssetSpec | AssetsDefinition | SourceAsset] | None = None, store_resource_key: str = 'store', tags: Mapping[str, str] | None = None, **kwargs: Any) -> JobDefinition

Build a dynamic Dagster job that observes multiple Metaxy feature assets.

Creates a job that dynamically spawns one op per asset, yielding AssetObservation events. Uses Dagster's dynamic orchestration to process multiple assets in parallel.

Tip

This is a very powerful way to observe all your Metaxy features at once. Use it in combination with a Dagster schedule to run it periodically.

Provide either: - asset_selection and defs: Select assets from a Definitions object

  • assets: Direct list of assets to observe

Note

All selected assets must share the same partitioning (if any).

Parameters:

  • name (str) –

    Name for the job.

  • asset_selection (AssetSelection | None, default: None ) –

    An AssetSelection specifying which assets to observe. Must be used together with defs.

  • defs (Definitions | None, default: None ) –

    The Definitions object to resolve the selection against. Must be used together with asset_selection.

  • assets (Sequence[AssetSpec | AssetsDefinition | SourceAsset] | None, default: None ) –

    Direct sequence of assets to observe. Each item can be an AssetSpec, AssetsDefinition, or SourceAsset. Cannot be used together with asset_selection/defs.

  • store_resource_key (str, default: 'store' ) –

    Resource key for the MetadataStore (default: "store").

  • tags (Mapping[str, str] | None, default: None ) –

    Optional tags to apply to the job.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the @job decorator.

Returns:

  • JobDefinition

    A Dagster job definition that observes all matching Metaxy assets.

Raises:

  • ValueError

    If no specs have metaxy/feature metadata, if assets have inconsistent partitions_def, or if invalid argument combinations are provided.

Example
import dagster as dg
import metaxy.ext.dagster as mxd

@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature_a"})
def feature_a():
    ...

@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature_b"})
def feature_b():
    ...

# Option 1: Using asset_selection + defs
my_defs = dg.Definitions(assets=[feature_a, feature_b])
observation_job = mxd.build_metaxy_multi_observation_job(
    name="observe_my_features",
    asset_selection=dg.AssetSelection.kind("metaxy"),
    defs=my_defs,
)

# Option 2: Using direct assets list
observation_job = mxd.build_metaxy_multi_observation_job(
    name="observe_my_features",
    assets=[feature_a, feature_b],
)
Source code in src/metaxy/ext/dagster/observation_job.py
def build_metaxy_multi_observation_job(
    name: str,
    *,
    asset_selection: dg.AssetSelection | None = None,
    defs: dg.Definitions | None = None,
    assets: Sequence[dg.AssetSpec | dg.AssetsDefinition | dg.SourceAsset] | None = None,
    store_resource_key: str = "store",
    tags: Mapping[str, str] | None = None,
    **kwargs: Any,
) -> dg.JobDefinition:
    """Build a dynamic Dagster job that observes multiple Metaxy feature assets.

    Creates a job that dynamically spawns one op per asset, yielding
    [`AssetObservation`](https://docs.dagster.io/api/python-api/ops#dagster.AssetObservation) events.
    Uses Dagster's dynamic orchestration to process multiple assets in parallel.

    !!! tip
        This is a very powerful way to observe all your Metaxy features at once.
        Use it in combination with a [Dagster schedule](https://docs.dagster.io/concepts/schedules)
        to run it periodically.

    Provide either:
    - `asset_selection` and `defs`: Select assets from a
      [`Definitions`](https://docs.dagster.io/api/python-api/definitions#dagster.Definitions) object

    - `assets`: Direct list of assets to observe

    !!! note
        All selected assets must share the same partitioning (if any).

    Args:
        name: Name for the job.
        asset_selection: An `AssetSelection` specifying which assets to observe.
            Must be used together with `defs`.
        defs: The `Definitions` object to resolve the selection against.
            Must be used together with `asset_selection`.
        assets: Direct sequence of assets to observe. Each item can be an
            `AssetSpec`, `AssetsDefinition`, or `SourceAsset`.
            Cannot be used together with `asset_selection`/`defs`.
        store_resource_key: Resource key for the MetadataStore (default: `"store"`).
        tags: Optional tags to apply to the job.
        **kwargs: Additional keyword arguments passed to the
            [`@job`](https://docs.dagster.io/api/python-api/jobs#dagster.job) decorator.

    Returns:
        A Dagster job definition that observes all matching Metaxy assets.

    Raises:
        ValueError: If no specs have `metaxy/feature` metadata, if assets have
            inconsistent `partitions_def`, or if invalid argument combinations
            are provided.

    Example:
        ```python
        import dagster as dg
        import metaxy.ext.dagster as mxd

        @mxd.metaxify()
        @dg.asset(metadata={"metaxy/feature": "my/feature_a"})
        def feature_a():
            ...

        @mxd.metaxify()
        @dg.asset(metadata={"metaxy/feature": "my/feature_b"})
        def feature_b():
            ...

        # Option 1: Using asset_selection + defs
        my_defs = dg.Definitions(assets=[feature_a, feature_b])
        observation_job = mxd.build_metaxy_multi_observation_job(
            name="observe_my_features",
            asset_selection=dg.AssetSelection.kind("metaxy"),
            defs=my_defs,
        )

        # Option 2: Using direct assets list
        observation_job = mxd.build_metaxy_multi_observation_job(
            name="observe_my_features",
            assets=[feature_a, feature_b],
        )
        ```
    """
    tags = tags or {}

    # Validate argument combinations
    has_selection = asset_selection is not None or defs is not None
    has_assets = assets is not None

    if has_selection and has_assets:
        raise ValueError(
            "Cannot provide both 'assets' and 'asset_selection'/'defs'. "
            "Use either asset_selection + defs, or assets alone."
        )

    if not has_selection and not has_assets:
        raise ValueError("Must provide either 'asset_selection' + 'defs', or 'assets'.")

    if has_selection:
        if asset_selection is None:
            raise ValueError("'defs' requires 'asset_selection' to be provided.")
        if defs is None:
            raise ValueError("'asset_selection' requires 'defs' to be provided.")

        # Resolve selection using defs
        all_assets_defs = list(defs.resolve_asset_graph().assets_defs)
        selected_keys = asset_selection.resolve(all_assets_defs)

        # Get specs for selected keys, with partitions_def
        metaxy_specs: list[dg.AssetSpec] = []
        partitions_defs: list[dg.PartitionsDefinition | None] = []

        for asset_def in all_assets_defs:
            for spec in asset_def.specs:
                if spec.key in selected_keys:
                    if (
                        spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY)
                        is not None
                    ):
                        metaxy_specs.append(spec)
                        partitions_defs.append(asset_def.partitions_def)
    else:
        # Direct assets list
        assert assets is not None
        metaxy_specs = []
        partitions_defs = []

        for asset in assets:
            if isinstance(asset, dg.AssetSpec):
                if asset.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY) is not None:
                    metaxy_specs.append(asset)
                    partitions_defs.append(asset.partitions_def)
            elif isinstance(asset, dg.AssetsDefinition):
                for spec in asset.specs:
                    if (
                        spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY)
                        is not None
                    ):
                        metaxy_specs.append(spec)
                        partitions_defs.append(asset.partitions_def)
            elif isinstance(asset, dg.SourceAsset):
                # SourceAsset doesn't have metaxy/feature metadata typically
                pass
            else:
                raise TypeError(
                    f"Expected AssetSpec, AssetsDefinition, or SourceAsset, "
                    f"got {type(asset).__name__}"
                )

    if not metaxy_specs:
        raise ValueError(
            "No assets have specs with 'metaxy/feature' metadata. "
            "Ensure your assets have metadata={'metaxy/feature': 'feature/key'}."
        )

    # Validate all specs have the same partitions_def
    first_partitions_def = partitions_defs[0]
    for i, pdef in enumerate(partitions_defs[1:], start=1):
        if pdef != first_partitions_def:
            raise ValueError(
                f"All assets must have the same partitions_def. "
                f"Asset 0 has {first_partitions_def}, but asset {i} has {pdef}."
            )
    partitions_def = first_partitions_def

    # Build feature keys for description (may have duplicates when multiple assets share a feature)
    feature_keys = [
        mx.coerce_to_feature_key(spec.metadata[DAGSTER_METAXY_FEATURE_METADATA_KEY])
        for spec in metaxy_specs
    ]

    # Build a mapping of asset key -> spec for the dynamic op
    # This ensures each asset gets its own op, even if multiple assets share the same feature
    spec_by_asset_key = {spec.key.to_user_string(): spec for spec in metaxy_specs}

    # Op that emits dynamic outputs for each asset
    @dg.op(name=f"{name}_fanout", out=dg.DynamicOut(str))
    def fanout_assets() -> Any:
        for asset_key_str in spec_by_asset_key:
            # Use asset key (with / replaced by __) as mapping key for Dagster identifiers
            safe_mapping_key = asset_key_str.replace("/", "__")
            yield dg.DynamicOutput(asset_key_str, mapping_key=safe_mapping_key)

    # Build the shared observation op
    observe_op = _build_observation_op_for_specs(
        name=f"{name}_observe",
        spec_by_asset_key=spec_by_asset_key,
        store_resource_key=store_resource_key,
    )

    # Build job metadata with asset references
    job_metadata: dict[str, Any] = {
        "metaxy/features": [fk.to_string() for fk in feature_keys],
    }
    for spec in metaxy_specs:
        job_metadata[f"metaxy/asset/{spec.key.to_user_string()}"] = (
            dg.MetadataValue.asset(spec.key)
        )

    # Build description as markdown list showing both assets and features
    asset_list = "\n".join(
        f"- `{spec.key.to_user_string()}` → `{spec.metadata[DAGSTER_METAXY_FEATURE_METADATA_KEY]}`"
        for spec in metaxy_specs
    )
    description = f"Observe {len(metaxy_specs)} Metaxy assets:\n\n{asset_list}"

    @dg.job(
        name=name,
        partitions_def=partitions_def,
        tags=tags,
        description=description,
        metadata=job_metadata,
        **kwargs,
    )
    def observation_job() -> None:
        asset_keys_dynamic = fanout_assets()
        asset_keys_dynamic.map(observe_op)

    return observation_job

metaxy.ext.dagster.observation_job.build_metaxy_observation_job

build_metaxy_observation_job(asset: AssetSpec | AssetsDefinition, *, store_resource_key: str = 'store', tags: dict[str, str] | None = None) -> list[JobDefinition]

Build Dagster job(s) that observe Metaxy feature asset(s).

Creates job(s) that yield AssetObservation events for the given asset. The job can be run independently from asset materialization, e.g., on a schedule.

Returns one job per metaxy/feature spec found in the asset.

Jobs are constructed with matching partitions definitions. Job names are always derived as observe_<FeatureKey.table_name()>.

Parameters:

  • asset (AssetSpec | AssetsDefinition) –

    Asset spec or asset definition to observe. Must have metaxy/feature metadata on at least one spec.

  • store_resource_key (str, default: 'store' ) –

    Resource key for the MetadataStore (default: "store").

  • tags (dict[str, str] | None, default: None ) –

    Optional tags to apply to the job(s).

Returns:

  • list[JobDefinition]

    List of Dagster job definitions, one per metaxy/feature spec.

Raises:

  • ValueError

    If no specs have metaxy/feature metadata.

Example
import dagster as dg
import metaxy.ext.dagster as mxd

@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature"})
def my_asset():
    ...

# Build the observation job - partitions_def is extracted automatically
observation_job = mxd.build_metaxy_observation_job(my_asset)

# Include in your Definitions
defs = dg.Definitions(
    jobs=[observation_job],
    resources={"store": my_store_resource},
)
Source code in src/metaxy/ext/dagster/observation_job.py
def build_metaxy_observation_job(
    asset: dg.AssetSpec | dg.AssetsDefinition,
    *,
    store_resource_key: str = "store",
    tags: dict[str, str] | None = None,
) -> list[dg.JobDefinition]:
    """Build Dagster job(s) that observe Metaxy feature asset(s).

    Creates job(s) that yield `AssetObservation` events for the given asset.
    The job can be run independently from asset materialization, e.g., on a schedule.

    Returns one job per `metaxy/feature` spec found in the asset.

    Jobs are constructed with matching partitions definitions.
    Job names are always derived as `observe_<FeatureKey.table_name()>`.

    Args:
        asset: Asset spec or asset definition to observe. Must have `metaxy/feature`
            metadata on at least one spec.
        store_resource_key: Resource key for the MetadataStore (default: `"store"`).
        tags: Optional tags to apply to the job(s).

    Returns:
        List of Dagster job definitions, one per `metaxy/feature` spec.

    Raises:
        ValueError: If no specs have `metaxy/feature` metadata.

    Example:
        ```python
        import dagster as dg
        import metaxy.ext.dagster as mxd

        @mxd.metaxify()
        @dg.asset(metadata={"metaxy/feature": "my/feature"})
        def my_asset():
            ...

        # Build the observation job - partitions_def is extracted automatically
        observation_job = mxd.build_metaxy_observation_job(my_asset)

        # Include in your Definitions
        defs = dg.Definitions(
            jobs=[observation_job],
            resources={"store": my_store_resource},
        )
        ```
    """
    # Extract specs and partitions_def from asset
    if isinstance(asset, dg.AssetSpec):
        specs = [asset]
        partitions_def = None
    elif isinstance(asset, dg.AssetsDefinition):
        specs = list(asset.specs)
        partitions_def = asset.partitions_def
    else:
        raise TypeError(
            f"Expected AssetSpec or AssetsDefinition, got {type(asset).__name__}"
        )

    # Filter to specs with metaxy/feature metadata
    metaxy_specs = [
        spec
        for spec in specs
        if spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY) is not None
    ]

    if not metaxy_specs:
        raise ValueError(
            "Asset has no specs with 'metaxy/feature' metadata. "
            "Ensure your asset has metadata={'metaxy/feature': 'feature/key'}."
        )

    # Build jobs for each metaxy spec
    jobs = [
        _build_observation_job_for_spec(
            spec,
            partitions_def=partitions_def,
            store_resource_key=store_resource_key,
            tags=tags,
        )
        for spec in metaxy_specs
    ]

    return jobs

Resources

metaxy.ext.dagster.MetaxyStoreFromConfigResource

Bases: ConfigurableResource[MetadataStore]

This resource creates a metaxy.MetadataStore based on the current Metaxy configuration (metaxy.toml).

If name is not provided, the default store will be used. It can be set with store = "my_name" in metaxy.toml or with$METAXY_STORE environment variable.

Functions

metaxy.ext.dagster.MetaxyStoreFromConfigResource.create_resource
create_resource(context: InitResourceContext) -> MetadataStore

Create a MetadataStore from the Metaxy configuration.

Parameters:

Returns:

  • MetadataStore

    A MetadataStore configured with the Dagster run ID as the materialization ID.

Source code in src/metaxy/ext/dagster/resources.py
def create_resource(self, context: dg.InitResourceContext) -> mx.MetadataStore:
    """Create a MetadataStore from the Metaxy configuration.

    Args:
        context: Dagster resource initialization context.

    Returns:
        A MetadataStore configured with the Dagster run ID as the materialization ID.
    """
    assert context.run is not None
    return mx.MetaxyConfig.get().get_store(
        self.name, materialization_id=context.run.run_id
    )

Helpers

metaxy.ext.dagster.utils.FeatureStats

Bases: NamedTuple

Statistics about a feature's metadata for Dagster events.

metaxy.ext.dagster.selection.select_metaxy_assets

select_metaxy_assets(*, project: str | None = None, feature: CoercibleToFeatureKey | None = None) -> AssetSelection

Select Metaxy assets by project and/or feature.

This helper creates an AssetSelection that filters assets tagged by @metaxify.

Parameters:

  • project (str | None, default: None ) –

    Filter by project name. If None, uses MetaxyConfig.get().project.

  • feature (CoercibleToFeatureKey | None, default: None ) –

    Filter by specific feature key. If provided, further narrows the selection.

Returns:

  • AssetSelection

    An AssetSelection that can be used with dg.define_asset_job,

  • AssetSelection

    dg.materialize, or AssetSelection operations like | and &.

Select all Metaxy assets in current project
import metaxy.ext.dagster as mxd

all_metaxy = mxd.select_metaxy_assets()
Select assets for a specific project
prod_assets = mxd.select_metaxy_assets(project="production")
Select a specific feature's assets
feature_assets = mxd.select_metaxy_assets(feature="my/feature/key")
Use with asset jobs
metaxy_job = dg.define_asset_job(
    name="materialize_metaxy",
    selection=mxd.select_metaxy_assets(),
)
Combine with other selections
# All metaxy assets plus some other assets
combined = mxd.select_metaxy_assets() | dg.AssetSelection.keys("other_asset")

# Metaxy assets that are also in a specific group
filtered = mxd.select_metaxy_assets() & dg.AssetSelection.groups("my_group")
Source code in src/metaxy/ext/dagster/selection.py
def select_metaxy_assets(
    *,
    project: str | None = None,
    feature: mx.CoercibleToFeatureKey | None = None,
) -> dg.AssetSelection:
    """Select Metaxy assets by project and/or feature.

    This helper creates an `AssetSelection` that filters assets tagged by `@metaxify`.

    Args:
        project: Filter by project name. If None, uses `MetaxyConfig.get().project`.
        feature: Filter by specific feature key. If provided, further narrows the selection.

    Returns:
        An `AssetSelection` that can be used with `dg.define_asset_job`,
        `dg.materialize`, or `AssetSelection` operations like `|` and `&`.

    Example: Select all Metaxy assets in current project
        ```python
        import metaxy.ext.dagster as mxd

        all_metaxy = mxd.select_metaxy_assets()
        ```

    Example: Select assets for a specific project
        ```python
        prod_assets = mxd.select_metaxy_assets(project="production")
        ```

    Example: Select a specific feature's assets
        ```python
        feature_assets = mxd.select_metaxy_assets(feature="my/feature/key")
        ```

    Example: Use with asset jobs
        ```python
        metaxy_job = dg.define_asset_job(
            name="materialize_metaxy",
            selection=mxd.select_metaxy_assets(),
        )
        ```

    Example: Combine with other selections
        ```python
        # All metaxy assets plus some other assets
        combined = mxd.select_metaxy_assets() | dg.AssetSelection.keys("other_asset")

        # Metaxy assets that are also in a specific group
        filtered = mxd.select_metaxy_assets() & dg.AssetSelection.groups("my_group")
        ```
    """
    resolved_project = project if project is not None else mx.MetaxyConfig.get().project

    selection = dg.AssetSelection.tag(DAGSTER_METAXY_PROJECT_TAG_KEY, resolved_project)

    if feature is not None:
        feature_key = mx.coerce_to_feature_key(feature)
        selection = selection & dg.AssetSelection.tag(
            DAGSTER_METAXY_FEATURE_METADATA_KEY, str(feature_key)
        )

    return selection

Types

metaxy.ext.dagster.MetaxyOutput module-attribute

MetaxyOutput = IntoFrame | None

Constants

metaxy.ext.dagster.DAGSTER_METAXY_FEATURE_METADATA_KEY module-attribute

DAGSTER_METAXY_FEATURE_METADATA_KEY = 'metaxy/feature'

metaxy.ext.dagster.DAGSTER_METAXY_KIND module-attribute

DAGSTER_METAXY_KIND = 'metaxy'

metaxy.ext.dagster.DAGSTER_METAXY_INFO_METADATA_KEY module-attribute

DAGSTER_METAXY_INFO_METADATA_KEY = 'metaxy/info'

metaxy.ext.dagster.DAGSTER_METAXY_PARTITION_KEY module-attribute

DAGSTER_METAXY_PARTITION_KEY = 'partition_by'