Dagster API Reference¶
Decorators¶
metaxy.ext.dagster.metaxify.metaxify
¶
metaxify(_asset: _T | None = None, *, feature: CoercibleToFeatureKey | None = None, key: CoercibleToAssetKey | None = None, key_prefix: CoercibleToAssetKeyPrefix | None = None, inherit_feature_key_as_asset_key: bool = True, inject_metaxy_kind: bool = True, inject_code_version: bool = True, set_description: bool = True, inject_column_schema: bool = True, inject_column_lineage: bool = True)
Inject Metaxy metadata into a Dagster AssetsDefinition or AssetSpec.
Modifies assets that have metaxy/feature metadata set or when feature argument is provided.
Can be used with or without parentheses.
The decorated asset or spec is enriched with Metaxy:
-
Asset key is determined as follows:
-
If
keyargument is provided, it overrides everything. -
Otherwise, the key is resolved using existing logic:
- If
"dagster/attributes": {"asset_key": ...}is set on the feature spec, that's used. - Else if
inherit_feature_key_as_asset_keyis True, the Metaxy feature key is used. - Else the original Dagster asset key is kept.
- If
-
If
key_prefixis provided, it's prepended to the resolved key.
-
-
Dependencies for upstream Metaxy features are injected into
deps. The dep keys follow the same logic. -
Code version from the feature spec is appended to the asset's code version in the format
metaxy:<version>. -
Metadata from the feature spec is injected into the Dagster asset metadata.
-
Description from the feature class docstring is used if the asset spec doesn't have a description set.
-
Kind
"metaxy"is injected into asset kinds ifinject_metaxy_kindisTrueand there are less than 3 kinds currently. -
Tags
metaxy/featureandmetaxy/projectare injected into the asset tags. -
Arbitrary asset attributes from
"dagster/attributes"in the feature spec metadata (such asgroup_name,owners,tags) are applied to the asset spec (with replacement). -
Column schema from Pydantic fields is injected into the asset metadata under
dagster/column_schema. Field types are converted to strings, and field descriptions are used as column descriptions. If the asset already has a column schema defined, Metaxy columns are appended (user-defined columns take precedence for columns with the same name).
Warning
Pydantic feature schema may not match the corresponding table schema in the metadata store.
-
Column lineage is injected into the asset metadata under
dagster/column_lineage. Tracks which upstream columns each downstream column depends on by analyzing:-
Direct pass-through: Columns with the same name in both upstream and downstream features.
-
FeatureDep.rename: Renamed columns trace back to their original upstream column names. -
FeatureSpec.lineage: ID column relationships based on lineage type (identity, aggregation, expansion).
Column lineage is derived from Pydantic model fields on the feature class. If the asset already has column lineage defined, Metaxy lineage is merged with user-defined lineage (user-defined dependencies are appended to Metaxy-detected dependencies for each column).
-
Parameters:
-
feature(CoercibleToFeatureKey | None, default:None) –The Metaxy feature to associate with the asset. If both
featureandmetaxy/featuremetadata are set and don't match, an error is raised. -
key(CoercibleToAssetKey | None, default:None) –Explicit asset key that overrides all other key resolution logic. Cannot be used with
key_prefixor with multi-asset definitions that produce multiple outputs. -
key_prefix(CoercibleToAssetKeyPrefix | None, default:None) –Prefix to prepend to the resolved asset key. Also applied to upstream dependency keys. Cannot be used with
key. -
inherit_feature_key_as_asset_key(bool, default:True) –If True (default), use the Metaxy feature key as the Dagster asset key (unless
keyis provided ordagster/attributes.asset_keyis set). This ensures consistent key resolution between assets and their upstream dependencies. -
inject_metaxy_kind(bool, default:True) –Whether to inject
"metaxy"kind into asset kinds. Currently, kinds count is limited by 3, andmetaxifywill skip kind injection if there are already 3 kinds on the asset. -
inject_code_version(bool, default:True) –Whether to inject the Metaxy feature code version into the asset's code version. The version is appended in the format
metaxy:<version>. -
set_description(bool, default:True) –Whether to set the asset description from the feature class docstring if the asset doesn't already have a description.
-
inject_column_schema(bool, default:True) –Whether to inject Pydantic field definitions as Dagster column schema. Field types are converted to strings, and field descriptions are used as column descriptions.
-
inject_column_lineage(bool, default:True) –Whether to inject column-level lineage into the asset metadata under
dagster/column_lineage. Uses Pydantic model fields to track column provenance viaFeatureDep.rename,FeatureSpec.lineage, and direct pass-through.
Note
Multiple Dagster assets can contribute to the same Metaxy feature by setting the same
"metaxy/feature" metadata. This is a perfectly valid setup since Metaxy writes are append-only.
Using @metaxify with multi assets
The feature argument cannot be used with @dg.multi_asset that produces multiple outputs.
Instead, set "metaxy/feature" metadata on the right output's AssetSpec:
Apply to dagster.AssetDefinition
Apply to dagster.AssetSpec
Use "metaxy/feature" asset metadata key
Source code in src/metaxy/ext/dagster/metaxify.py
def __init__(
self,
_asset: "_T | None" = None,
*,
feature: mx.CoercibleToFeatureKey | None = None,
key: CoercibleToAssetKey | None = None,
key_prefix: CoercibleToAssetKeyPrefix | None = None,
inherit_feature_key_as_asset_key: bool = True,
inject_metaxy_kind: bool = True,
inject_code_version: bool = True,
set_description: bool = True,
inject_column_schema: bool = True,
inject_column_lineage: bool = True,
) -> None:
# Actual initialization happens in __new__, but we set defaults here for type checkers
self.feature = (
mx.coerce_to_feature_key(feature) if feature is not None else None
)
self.key = dg.AssetKey.from_coercible(key) if key is not None else None
self.key_prefix = (
dg.AssetKey.from_coercible(key_prefix) if key_prefix is not None else None
)
self.inherit_feature_key_as_asset_key = inherit_feature_key_as_asset_key
self.inject_metaxy_kind = inject_metaxy_kind
self.inject_code_version = inject_code_version
self.set_description = set_description
self.inject_column_schema = inject_column_schema
self.inject_column_lineage = inject_column_lineage
metaxy.ext.dagster.observable.observable_metaxy_asset
¶
observable_metaxy_asset(feature: CoercibleToFeatureKey, *, store_resource_key: str = 'store', inherit_feature_key_as_asset_key: bool = False, inject_metaxy_kind: bool = True, inject_code_version: bool = True, set_description: bool = True, **observable_kwargs: Any)
Decorator to create an observable source asset for a Metaxy feature.
The observation reads the feature's metadata from the store, counts rows,
and uses mean(metaxy_created_at) as the data version to track changes.
Using mean ensures that both additions and deletions are detected.
The decorated function receives (context, store, lazy_df) and can return
a dict of additional metadata to include in the observation.
Parameters:
-
feature(CoercibleToFeatureKey) –The Metaxy feature to observe.
-
store_resource_key(str, default:'store') –Resource key for the MetadataStore (default:
"store"). -
inherit_feature_key_as_asset_key(bool, default:False) –If True, use the Metaxy feature key as the Dagster asset key.
-
inject_metaxy_kind(bool, default:True) –Whether to inject
"metaxy"kind into asset kinds. -
inject_code_version(bool, default:True) –Whether to inject the Metaxy feature code version.
-
set_description(bool, default:True) –Whether to set description from feature class docstring.
-
**observable_kwargs(Any, default:{}) –Passed to
@observable_source_asset(key, group_name, tags, metadata, description, partitions_def, etc.)
Example
import metaxy.ext.dagster as mxd
from myproject.features import ExternalFeature
@mxd.observable_metaxy_asset(feature=ExternalFeature, key="external_data")
def external_data(context, store, lazy_df):
pass
# With custom metadata - return a dict
@mxd.observable_metaxy_asset(feature=ExternalFeature, key="external_data")
def external_data_with_metrics(context, store, lazy_df):
# Run aggregations in the database
total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
return {"custom/total": total}
Note
observable_source_asset does not support deps. Upstream Metaxy feature
dependencies from the feature spec are not propagated to the SourceAsset.
Source code in src/metaxy/ext/dagster/observable.py
def observable_metaxy_asset(
feature: mx.CoercibleToFeatureKey,
*,
store_resource_key: str = "store",
# metaxify kwargs
inherit_feature_key_as_asset_key: bool = False,
inject_metaxy_kind: bool = True,
inject_code_version: bool = True,
set_description: bool = True,
# observable_source_asset kwargs
**observable_kwargs: Any,
):
"""Decorator to create an observable source asset for a Metaxy feature.
The observation reads the feature's metadata from the store, counts rows,
and uses `mean(metaxy_created_at)` as the data version to track changes.
Using mean ensures that both additions and deletions are detected.
The decorated function receives `(context, store, lazy_df)` and can return
a dict of additional metadata to include in the observation.
Args:
feature: The Metaxy feature to observe.
store_resource_key: Resource key for the MetadataStore (default: `"store"`).
inherit_feature_key_as_asset_key: If True, use the Metaxy feature key as the
Dagster asset key.
inject_metaxy_kind: Whether to inject `"metaxy"` kind into asset kinds.
inject_code_version: Whether to inject the Metaxy feature code version.
set_description: Whether to set description from feature class docstring.
**observable_kwargs: Passed to `@observable_source_asset`
(key, group_name, tags, metadata, description, partitions_def, etc.)
Example:
```python
import metaxy.ext.dagster as mxd
from myproject.features import ExternalFeature
@mxd.observable_metaxy_asset(feature=ExternalFeature, key="external_data")
def external_data(context, store, lazy_df):
pass
# With custom metadata - return a dict
@mxd.observable_metaxy_asset(feature=ExternalFeature, key="external_data")
def external_data_with_metrics(context, store, lazy_df):
# Run aggregations in the database
total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
return {"custom/total": total}
```
Note:
`observable_source_asset` does not support `deps`. Upstream Metaxy feature
dependencies from the feature spec are not propagated to the SourceAsset.
"""
feature_key = mx.coerce_to_feature_key(feature)
def decorator(fn: Callable[..., Any]) -> dg.SourceAsset:
# Build an AssetSpec from kwargs and enrich with metaxify
spec = dg.AssetSpec(
key=observable_kwargs.pop("key", None) or fn.__name__,
group_name=observable_kwargs.pop("group_name", None),
tags=observable_kwargs.pop("tags", None),
metadata=observable_kwargs.pop("metadata", None),
description=observable_kwargs.pop("description", None),
)
enriched = metaxify(
feature=feature_key,
inherit_feature_key_as_asset_key=inherit_feature_key_as_asset_key,
inject_metaxy_kind=inject_metaxy_kind,
inject_code_version=inject_code_version,
set_description=set_description,
)(spec)
def _observe(context: dg.AssetExecutionContext) -> dg.ObserveResult:
store: mx.MetadataStore = getattr(context.resources, store_resource_key)
with store:
lazy_df = store.read_metadata(feature_key)
stats = compute_stats_from_lazy_frame(lazy_df)
# Call the user's function - it can return additional metadata
extra_metadata = fn(context, store, lazy_df) or {}
metadata: dict[str, Any] = {"dagster/row_count": stats.row_count}
metadata.update(extra_metadata)
return dg.ObserveResult(
data_version=stats.data_version,
metadata=metadata,
)
# Apply observable_source_asset decorator
return dg.observable_source_asset(
key=enriched.key,
description=enriched.description,
group_name=enriched.group_name,
tags=dict(enriched.tags) if enriched.tags else None,
metadata=dict(enriched.metadata) if enriched.metadata else None,
required_resource_keys={store_resource_key},
**observable_kwargs,
)(_observe)
return decorator
Dagster Types¶
metaxy.ext.dagster.dagster_type.feature_to_dagster_type
¶
feature_to_dagster_type(feature: CoercibleToFeatureKey, *, name: str | None = None, description: str | None = None, inject_column_schema: bool = True, inject_column_lineage: bool = True, metadata: Mapping[str, Any] | None = None) -> DagsterType
Build a Dagster type from a Metaxy feature.
Creates a dagster.DagsterType that validates outputs are
MetaxyOutput (i.e., narwhals-compatible
dataframes or None) and includes metadata derived from the feature's Pydantic
model fields.
Parameters:
-
feature(CoercibleToFeatureKey) –The Metaxy feature to create a type for. Can be a feature class, feature key, or string that can be coerced to a feature key.
-
name(str | None, default:None) –Optional custom name for the DagsterType. Defaults to the feature's table name (e.g., "project__feature_name").
-
description(str | None, default:None) –Optional custom description. Defaults to the feature class docstring or a generated description.
-
inject_column_schema(bool, default:True) –Whether to inject the column schema as metadata. The schema is derived from Pydantic model fields.
-
inject_column_lineage(bool, default:True) –Whether to inject column lineage as metadata. The lineage is derived from feature dependencies.
-
metadata(Mapping[str, Any] | None, default:None) –Optional custom metadata to inject into the DagsterType.
Returns:
-
DagsterType–A DagsterType configured for the Metaxy feature with appropriate
-
DagsterType–type checking and metadata.
Tip
This is automatically injected by @metaxify
Example
import dagster as dg
import polars as pl
import metaxy.ext.dagster as mxd
from myproject.features import MyFeature # Your Metaxy feature class
@mxd.metaxify(feature=MyFeature)
@dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
def my_asset():
return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
See also
metaxify: Decorator for injecting Metaxy metadata into Dagster assets.MetaxyOutput: The type alias for valid Metaxy outputs.
Source code in src/metaxy/ext/dagster/dagster_type.py
def feature_to_dagster_type(
feature: mx.CoercibleToFeatureKey,
*,
name: str | None = None,
description: str | None = None,
inject_column_schema: bool = True,
inject_column_lineage: bool = True,
metadata: Mapping[str, Any] | None = None,
) -> dg.DagsterType:
"""Build a Dagster type from a Metaxy feature.
Creates a `dagster.DagsterType` that validates outputs are
[`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput] (i.e., narwhals-compatible
dataframes or `None`) and includes metadata derived from the feature's Pydantic
model fields.
Args:
feature: The Metaxy feature to create a type for. Can be a feature class,
feature key, or string that can be coerced to a feature key.
name: Optional custom name for the DagsterType. Defaults to the feature's
table name (e.g., "project__feature_name").
description: Optional custom description. Defaults to the feature class
docstring or a generated description.
inject_column_schema: Whether to inject the column schema as metadata.
The schema is derived from Pydantic model fields.
inject_column_lineage: Whether to inject column lineage as metadata.
The lineage is derived from feature dependencies.
metadata: Optional custom metadata to inject into the DagsterType.
Returns:
A DagsterType configured for the Metaxy feature with appropriate
type checking and metadata.
!!! tip
This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]
Example:
```python
import dagster as dg
import polars as pl
import metaxy.ext.dagster as mxd
from myproject.features import MyFeature # Your Metaxy feature class
@mxd.metaxify(feature=MyFeature)
@dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
def my_asset():
return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
```
!!! info "See also"
- [`metaxify`][metaxy.ext.dagster.metaxify.metaxify]: Decorator for injecting
Metaxy metadata into Dagster assets.
- [`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput]: The type alias for valid
Metaxy outputs.
"""
from metaxy.ext.dagster.io_manager import MetaxyOutput
feature_key = mx.coerce_to_feature_key(feature)
feature_cls = mx.get_feature_by_key(feature_key)
# Determine name
type_name = name or feature_key.table_name
# Determine description
if description is None:
if feature_cls.__doc__:
import inspect
description = inspect.cleandoc(feature_cls.__doc__)
else:
description = f"Metaxy feature '{feature_key.to_string()}'."
# Build metadata - start with custom metadata if provided
final_metadata: dict[str, Any] = dict(metadata) if metadata else {}
final_metadata[DAGSTER_METAXY_INFO_METADATA_KEY] = build_feature_info_metadata(
feature_key
)
if inject_column_schema:
column_schema = build_column_schema(feature_cls)
if column_schema is not None:
final_metadata[DAGSTER_COLUMN_SCHEMA_METADATA_KEY] = column_schema
if inject_column_lineage:
column_lineage = build_column_lineage(feature_cls)
if column_lineage is not None:
final_metadata[DAGSTER_COLUMN_LINEAGE_METADATA_KEY] = column_lineage
dagster_type = dg.DagsterType(
type_check_fn=_create_type_check_fn(feature_key),
name=type_name,
description=description,
typing_type=MetaxyOutput,
metadata=final_metadata,
)
return dagster_type
Dagster Event Generators¶
metaxy.ext.dagster.utils.generate_materialize_results
¶
generate_materialize_results(context: AssetExecutionContext, store: MetadataStore | MetaxyStoreFromConfigResource, specs: Sequence[AssetSpec]) -> Iterator[MaterializeResult[None]]
Generate dagster.MaterializeResult events for assets in topological order.
Yields a MaterializeResult for each asset spec, sorted by their associated
Metaxy features in topological order (dependencies before dependents).
Each result includes the row count as "dagster/row_count" metadata.
Parameters:
-
context(AssetExecutionContext) –The Dagster asset execution context.
-
store(MetadataStore | MetaxyStoreFromConfigResource) –The Metaxy metadata store to read from.
-
specs(Sequence[AssetSpec]) –Sequence of asset specs with
"metaxy/feature"metadata set.
Yields:
-
MaterializeResult[None]–Materialization result for each asset in topological order.
Example
specs = [
dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]
@metaxify
@dg.multi_asset(specs=specs)
def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
# ... compute and write data ...
yield from generate_materialize_results(context, store, context.asset_defs.specs)
Source code in src/metaxy/ext/dagster/utils.py
def generate_materialize_results(
context: dg.AssetExecutionContext,
store: mx.MetadataStore | MetaxyStoreFromConfigResource,
specs: Sequence[dg.AssetSpec],
) -> Iterator[dg.MaterializeResult[None]]:
"""Generate `dagster.MaterializeResult` events for assets in topological order.
Yields a `MaterializeResult` for each asset spec, sorted by their associated
Metaxy features in topological order (dependencies before dependents).
Each result includes the row count as `"dagster/row_count"` metadata.
Args:
context: The Dagster asset execution context.
store: The Metaxy metadata store to read from.
specs: Sequence of asset specs with `"metaxy/feature"` metadata set.
Yields:
Materialization result for each asset in topological order.
Example:
```python
specs = [
dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]
@metaxify
@dg.multi_asset(specs=specs)
def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
# ... compute and write data ...
yield from generate_materialize_results(context, store, context.asset_defs.specs)
```
"""
# Build mapping from feature key to asset spec
spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
for spec in specs:
feature_key_raw = spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY)
if feature_key_raw is None:
raise ValueError(
f"AssetSpec {spec.key} missing '{DAGSTER_METAXY_FEATURE_METADATA_KEY}' metadata"
)
feature_key = mx.coerce_to_feature_key(feature_key_raw)
spec_by_feature_key[feature_key] = spec
# Sort by topological order of feature keys
graph = mx.FeatureGraph.get_active()
sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))
for key in sorted_keys:
asset_spec = spec_by_feature_key[key]
partition_filters = get_partition_filter(context, asset_spec)
with store:
# Get total stats (partition-filtered)
lazy_df = store.read_metadata(key, filters=partition_filters)
stats = compute_stats_from_lazy_frame(lazy_df)
# Get materialized-in-run count if materialization_id is set
materialized_in_run: int | None = None
if store.materialization_id is not None:
mat_filters = partition_filters + [
nw.col(METAXY_MATERIALIZATION_ID) == store.materialization_id
]
mat_df = store.read_metadata(key, filters=mat_filters)
materialized_in_run = mat_df.select(nw.len()).collect().item(0, 0)
metadata: dict[str, int] = {"dagster/row_count": stats.row_count}
if materialized_in_run is not None:
metadata["metaxy/materialized_in_run"] = materialized_in_run
yield dg.MaterializeResult(
value=None,
asset_key=asset_spec.key,
metadata=metadata,
data_version=stats.data_version,
)
metaxy.ext.dagster.utils.generate_observe_results
¶
generate_observe_results(context: AssetExecutionContext, store: MetadataStore | MetaxyStoreFromConfigResource, specs: Sequence[AssetSpec]) -> Iterator[ObserveResult]
Generate dagster.ObserveResult events for assets in topological order.
Yields an ObserveResult for each asset spec, sorted by their associated
Metaxy features in topological order.
Each result includes the row count as "dagster/row_count" metadata.
Parameters:
-
context(AssetExecutionContext) –The Dagster asset execution context.
-
store(MetadataStore | MetaxyStoreFromConfigResource) –The Metaxy metadata store to read from.
-
specs(Sequence[AssetSpec]) –Sequence of asset specs with
"metaxy/feature"metadata set.
Yields:
-
ObserveResult–Observation result for each asset in topological order.
Example
specs = [
dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]
@metaxify
@dg.multi_observable_source_asset(specs=specs)
def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
yield from generate_observe_results(context, store, context.asset_defs.specs)
Source code in src/metaxy/ext/dagster/utils.py
def generate_observe_results(
context: dg.AssetExecutionContext,
store: mx.MetadataStore | MetaxyStoreFromConfigResource,
specs: Sequence[dg.AssetSpec],
) -> Iterator[dg.ObserveResult]:
"""Generate `dagster.ObserveResult` events for assets in topological order.
Yields an `ObserveResult` for each asset spec, sorted by their associated
Metaxy features in topological order.
Each result includes the row count as `"dagster/row_count"` metadata.
Args:
context: The Dagster asset execution context.
store: The Metaxy metadata store to read from.
specs: Sequence of asset specs with `"metaxy/feature"` metadata set.
Yields:
Observation result for each asset in topological order.
Example:
```python
specs = [
dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]
@metaxify
@dg.multi_observable_source_asset(specs=specs)
def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
yield from generate_observe_results(context, store, context.asset_defs.specs)
```
"""
# Build mapping from feature key to asset spec
spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
for spec in specs:
feature_key_raw = spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY)
if feature_key_raw is None:
raise ValueError(
f"AssetSpec {spec.key} missing '{DAGSTER_METAXY_FEATURE_METADATA_KEY}' metadata"
)
feature_key = mx.coerce_to_feature_key(feature_key_raw)
spec_by_feature_key[feature_key] = spec
# Sort by topological order of feature keys
graph = mx.FeatureGraph.get_active()
sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))
for key in sorted_keys:
asset_spec = spec_by_feature_key[key]
filters = get_partition_filter(context, asset_spec)
with store:
lazy_df = store.read_metadata(key, filters=filters)
stats = compute_stats_from_lazy_frame(lazy_df)
yield dg.ObserveResult(
asset_key=asset_spec.key,
metadata={"dagster/row_count": stats.row_count},
data_version=stats.data_version,
)
metaxy.ext.dagster.utils.build_feature_info_metadata
¶
build_feature_info_metadata(feature: CoercibleToFeatureKey) -> dict[str, Any]
Build feature info metadata dict for Dagster assets.
Creates a dictionary with information about the Metaxy feature that can be
used as Dagster asset metadata under the "metaxy/feature_info" key.
Parameters:
-
feature(CoercibleToFeatureKey) –The Metaxy feature (class, key, or string).
Returns:
-
dict[str, Any]–A nested dictionary containing:
-
dict[str, Any]–feature: Feature informationproject: The project namespec: The full feature spec as a dict (viamodel_dump())version: The feature version stringtype: The feature class module path
-
dict[str, Any]–metaxy: Metaxy library informationversion: The metaxy library version
Tip
This is automatically injected by @metaxify
Example
from metaxy.ext.dagster.utils import build_feature_info_metadata
info = build_feature_info_metadata(MyFeature)
# {
# "feature": {
# "project": "my_project",
# "spec": {...}, # Full FeatureSpec model_dump()
# "version": "my__feature@abc123",
# "type": "myproject.features",
# },
# "metaxy": {
# "version": "0.1.0",
# },
# }
Source code in src/metaxy/ext/dagster/utils.py
def build_feature_info_metadata(
feature: mx.CoercibleToFeatureKey,
) -> dict[str, Any]:
"""Build feature info metadata dict for Dagster assets.
Creates a dictionary with information about the Metaxy feature that can be
used as Dagster asset metadata under the `"metaxy/feature_info"` key.
Args:
feature: The Metaxy feature (class, key, or string).
Returns:
A nested dictionary containing:
- `feature`: Feature information
- `project`: The project name
- `spec`: The full feature spec as a dict (via `model_dump()`)
- `version`: The feature version string
- `type`: The feature class module path
- `metaxy`: Metaxy library information
- `version`: The metaxy library version
!!! tip
This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]
Example:
```python
from metaxy.ext.dagster.utils import build_feature_info_metadata
info = build_feature_info_metadata(MyFeature)
# {
# "feature": {
# "project": "my_project",
# "spec": {...}, # Full FeatureSpec model_dump()
# "version": "my__feature@abc123",
# "type": "myproject.features",
# },
# "metaxy": {
# "version": "0.1.0",
# },
# }
```
"""
feature_key = mx.coerce_to_feature_key(feature)
feature_cls = mx.get_feature_by_key(feature_key)
return {
"feature": {
"project": feature_cls.project,
"spec": feature_cls.spec().model_dump(mode="json"),
"version": feature_cls.feature_version(),
"type": feature_cls.__module__,
},
"metaxy": {
"version": mx.__version__,
"plugins": mx.MetaxyConfig.get().plugins,
},
}
Resources¶
metaxy.ext.dagster.MetaxyIOManager
¶
Bases: ConfigurableIOManager
MetaxyIOManager is a Dagster IOManager that reads and writes data to/from Metaxy's MetadataStore.
It automatically attaches Metaxy feature and store metadata to Dagster materialization events and handles partitioned assets.
Always set "metaxy/feature" Dagster metadata
This IOManager is using "metaxy/feature" Dagster metadata key to map Dagster assets into Metaxy features.
It expects it to be set on the assets being loaded or materialized.
Defining Partitioned Assets
To tell Metaxy which column to use when filtering partitioned assets, set "partition_by" Dagster metadata key.
Example
This key is commonly used to configure partitioning behavior by various Dagster IO managers.
Functions¶
metaxy.ext.dagster.MetaxyIOManager.load_input
¶
load_input(context: InputContext) -> LazyFrame[Any]
Load feature metadata from MetadataStore.
Reads metadata for the feature specified in the asset's "metaxy/feature" metadata.
For partitioned assets, filters to the current partition using the column specified
in "partition_by" metadata.
Parameters:
-
context(InputContext) –Dagster input context containing asset metadata.
Returns:
Source code in src/metaxy/ext/dagster/io_manager.py
def load_input(self, context: "dg.InputContext") -> nw.LazyFrame[Any]:
"""Load feature metadata from [`MetadataStore`][metaxy.MetadataStore].
Reads metadata for the feature specified in the asset's `"metaxy/feature"` metadata.
For partitioned assets, filters to the current partition using the column specified
in `"partition_by"` metadata.
Args:
context: Dagster input context containing asset metadata.
Returns:
A narwhals LazyFrame with the feature metadata.
"""
with self.metadata_store:
context.log.debug(
f"Reading metadata for Metaxy feature {self._feature_key_from_context(context).to_string()} from {self.metadata_store.display()}"
)
# Build partition filter if applicable
partition_col = (
context.definition_metadata.get(DAGSTER_METAXY_PARTITION_KEY)
if context.has_asset_partitions
else None
)
partition_key = (
context.asset_partition_key if context.has_asset_partitions else None
)
filters = build_partition_filter(
partition_col, # pyright: ignore[reportArgumentType]
partition_key,
)
return self.metadata_store.read_metadata(
feature=self._feature_key_from_context(context),
filters=filters,
)
metaxy.ext.dagster.MetaxyIOManager.handle_output
¶
handle_output(context: OutputContext, obj: MetaxyOutput) -> None
Write feature metadata to MetadataStore.
Writes the output dataframe to the metadata store for the feature specified
in the asset's "metaxy/feature" metadata. Also logs metadata about the
feature and store to Dagster's materialization events.
If obj is None, only metadata logging is performed (no data is written).
Parameters:
-
context(OutputContext) –Dagster output context containing asset metadata.
-
obj(MetaxyOutput) –A narwhals-compatible dataframe to write, or None to skip writing.
Source code in src/metaxy/ext/dagster/io_manager.py
def handle_output(self, context: "dg.OutputContext", obj: MetaxyOutput) -> None:
"""Write feature metadata to [`MetadataStore`][metaxy.MetadataStore].
Writes the output dataframe to the metadata store for the feature specified
in the asset's `"metaxy/feature"` metadata. Also logs metadata about the
feature and store to Dagster's materialization events.
If `obj` is `None`, only metadata logging is performed (no data is written).
Args:
context: Dagster output context containing asset metadata.
obj: A narwhals-compatible dataframe to write, or None to skip writing.
"""
assert DAGSTER_METAXY_FEATURE_METADATA_KEY in context.definition_metadata, (
f'Missing `"{DAGSTER_METAXY_FEATURE_METADATA_KEY}"` key in asset metadata'
)
key = self._feature_key_from_context(context)
feature = mx.get_feature_by_key(key)
if obj is not None:
context.log.debug(
f'Writing metadata for Metaxy feature "{key.to_string()}" into {self.metadata_store.display()}'
)
with self.metadata_store.open("write"):
self.metadata_store.write_metadata(feature=feature, df=obj)
context.log.debug(
f'Metadata written for Metaxy feature "{key.to_string()}" into {self.metadata_store.display()}'
)
else:
context.log.debug(
f'The output corresponds to Metaxy feature "{key.to_string()}" stored in {self.metadata_store.display()}'
)
self._log_output_metadata(context)
metaxy.ext.dagster.MetaxyStoreFromConfigResource
¶
Bases: ConfigurableResource[MetadataStore]
This resource creates a metaxy.MetadataStore based on the current Metaxy configuration (metaxy.toml).
Functions¶
metaxy.ext.dagster.MetaxyStoreFromConfigResource.create_resource
¶
create_resource(context: InitResourceContext) -> MetadataStore
Create a MetadataStore from the Metaxy configuration.
Parameters:
-
context(InitResourceContext) –Dagster resource initialization context.
Returns:
-
MetadataStore–A MetadataStore configured with the Dagster run ID as the materialization ID.
Source code in src/metaxy/ext/dagster/resources.py
def create_resource(self, context: dg.InitResourceContext) -> mx.MetadataStore:
"""Create a MetadataStore from the Metaxy configuration.
Args:
context: Dagster resource initialization context.
Returns:
A MetadataStore configured with the Dagster run ID as the materialization ID.
"""
assert context.run is not None
return mx.MetaxyConfig.get().get_store(
self.name, materialization_id=context.run.run_id
)
Helpers¶
metaxy.ext.dagster.utils.FeatureStats
¶
metaxy.ext.dagster.selection.select_metaxy_assets
¶
select_metaxy_assets(*, project: str | None = None, feature: CoercibleToFeatureKey | None = None) -> AssetSelection
Select Metaxy assets by project and/or feature.
This helper creates an AssetSelection that filters assets tagged by @metaxify.
Parameters:
-
project(str | None, default:None) –Filter by project name. If None, uses
MetaxyConfig.get().project. -
feature(CoercibleToFeatureKey | None, default:None) –Filter by specific feature key. If provided, further narrows the selection.
Returns:
-
AssetSelection–An
AssetSelectionthat can be used withdg.define_asset_job, -
AssetSelection–dg.materialize, orAssetSelectionoperations like|and&.
Select all Metaxy assets in current project
Select a specific feature's assets
Use with asset jobs
Combine with other selections
Source code in src/metaxy/ext/dagster/selection.py
def select_metaxy_assets(
*,
project: str | None = None,
feature: mx.CoercibleToFeatureKey | None = None,
) -> dg.AssetSelection:
"""Select Metaxy assets by project and/or feature.
This helper creates an `AssetSelection` that filters assets tagged by `@metaxify`.
Args:
project: Filter by project name. If None, uses `MetaxyConfig.get().project`.
feature: Filter by specific feature key. If provided, further narrows the selection.
Returns:
An `AssetSelection` that can be used with `dg.define_asset_job`,
`dg.materialize`, or `AssetSelection` operations like `|` and `&`.
Example: Select all Metaxy assets in current project
```python
import metaxy.ext.dagster as mxd
all_metaxy = mxd.select_metaxy_assets()
```
Example: Select assets for a specific project
```python
prod_assets = mxd.select_metaxy_assets(project="production")
```
Example: Select a specific feature's assets
```python
feature_assets = mxd.select_metaxy_assets(feature="my/feature/key")
```
Example: Use with asset jobs
```python
metaxy_job = dg.define_asset_job(
name="materialize_metaxy",
selection=mxd.select_metaxy_assets(),
)
```
Example: Combine with other selections
```python
# All metaxy assets plus some other assets
combined = mxd.select_metaxy_assets() | dg.AssetSelection.keys("other_asset")
# Metaxy assets that are also in a specific group
filtered = mxd.select_metaxy_assets() & dg.AssetSelection.groups("my_group")
```
"""
resolved_project = project if project is not None else mx.MetaxyConfig.get().project
selection = dg.AssetSelection.tag(DAGSTER_METAXY_PROJECT_TAG_KEY, resolved_project)
if feature is not None:
feature_key = mx.coerce_to_feature_key(feature)
selection = selection & dg.AssetSelection.tag(
DAGSTER_METAXY_FEATURE_METADATA_KEY, str(feature_key)
)
return selection